package room import ( "git.reya.zone/reya/hexmap/server/action" "git.reya.zone/reya/hexmap/server/state" "github.com/rs/xid" "go.uber.org/zap" ) // act is the meat and potatoes of the room - it's responsible for actually running the room. // It will gracefully remove all clients before shutting down. func (r *room) act() { defer r.gracefulShutdown() r.logger.Info("Room starting up, listening for incoming actions") for { raw := <-r.incomingChannel client := raw.ClientID() msgLogger := r.logger.With(zap.Stringer("client", client)) msgLogger.Debug("Message received, handling", zap.Object("message", raw)) switch msg := raw.(type) { case JoinRequest: r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel) r.acknowledgeJoin(msg.id, msg.wantCurrentState) case RefreshRequest: r.sendRefresh(msg.id) case ApplyRequest: msgLogger.Debug("Received action to apply from client", zap.Int("actionId", msg.action.ID)) result := r.applyAction(msg.action.Action) if result != nil { r.broadcastAction(client, msg.action.ID, msg.action.Action) } r.acknowledgeAction(client, msg.action.ID, result) case LeaveRequest: // So long, then. We can close immediately here; they promised not to send any more messages after this // unless we were shutting down, which we're not. r.acknowledgeLeave(client) r.closeClient(client) case StopRequest: // As requested, we shut down. Our deferred gracefulShutdown will catch us as we fall. msgLogger.Info("Received StopRequest from client, shutting down") return case ShutdownResponse: // Uh... thank... you. I'm not... Never mind. I guess this means you're leaving? msgLogger.Error("Received unexpected ShutdownResponse from client while not shutting down") r.closeClient(client) default: msgLogger.Warn("Ignoring unhandled message", zap.Object("message", msg)) } msgLogger.Debug("Message handled, resuming listening") } } // addClient records a client's presence in the client map. func (r *room) addClient(id xid.ID, returnChannel chan<- Message, broadcast bool, privateChannel bool) { logger := r.logger.With(zap.Stringer("client", id)) logger.Debug("Adding client") if client, ok := r.clients[id]; ok { if client.outgoingChannel == returnChannel { if broadcast == client.broadcast && privateChannel == client.privateChannel { logger.Warn("Already have client when adding client") } else { logger.Error("Already have client but with different settings when adding client") } } else { logger.Error("Already have a different client with the same id when adding client") } return } r.clients[id] = internalClient{ id: id, outgoingChannel: returnChannel, broadcast: broadcast, privateChannel: privateChannel, } } // stateCopy creates and returns a fresh copy of the current state. // This avoids concurrent modification of the state while clients are reading it. func (r *room) stateCopy() *state.Synced { s := r.currentState.Copy() return &s } // acknowledgeJoin composes and sends a JoinResponse to the given client. func (r *room) acknowledgeJoin(id xid.ID, includeState bool) { logger := r.logger.With(zap.Stringer("client", id)) client, ok := r.clients[id] if !ok { logger.Error("No such client when acknowledging join") return } var s *state.Synced = nil if includeState { logger.Debug("Preparing state copy for client") s = r.stateCopy() } logger.Debug("Sending JoinResponse to client") client.outgoingChannel <- JoinResponse{ id: r.id, currentState: s, } } // sendRefresh composes and sends a RefreshResponse to the given client. func (r *room) sendRefresh(id xid.ID) { logger := r.logger.With(zap.Stringer("client", id)) client, ok := r.clients[id] if !ok { logger.Error("No such client when sending refresh") return } var s *state.Synced = nil logger.Debug("Preparing state copy for client") s = r.stateCopy() logger.Debug("Sending RefreshResponse to client") client.outgoingChannel <- RefreshResponse{ id: r.id, currentState: s, } } // applyAction applies an action to the state and returns the result of it. func (r *room) applyAction(action action.Syncable) error { r.logger.Debug("Applying action", zap.Object("action", action)) return action.Apply(&r.currentState) } // broadcastAction sends an action to everyone other than the original client which requested it. func (r *room) broadcastAction(originalClientID xid.ID, originalActionID int, action action.Syncable) { logger := r.logger.With(zap.Stringer("originalClient", originalClientID), zap.Int("actionID", originalActionID), zap.Object("action", action)) broadcast := ActionBroadcast{ id: r.id, originalClientID: originalClientID, originalActionID: originalActionID, action: action, } logger.Debug("Broadcasting action to all clients") for id, client := range r.clients { if id.Compare(originalClientID) != 0 { logger.Debug("Sending ActionBroadcast to client", zap.Stringer("client", id)) client.outgoingChannel <- broadcast } } } // acknowledgeAction sends a response to the original client which requested an action. func (r *room) acknowledgeAction(id xid.ID, actionID int, error error) { logger := r.logger.With(zap.Stringer("id", id), zap.Int("actionId", actionID), zap.Error(error)) logger.Debug("Responding to client with the status of its action") client, ok := r.clients[id] if !ok { logger.Error("No such client when acknowledging action") return } logger.Debug("Sending ApplyResponse to client") client.outgoingChannel <- ApplyResponse{ id: id, actionID: actionID, result: error, } } // acknowledgeLeave causes the room to signal to the client that it has received and acknowledged the client's LeaveRequest, // and will not send any further messages. func (r *room) acknowledgeLeave(id xid.ID) { logger := r.logger.With(zap.Stringer("client", id)) logger.Debug("Acknowledging client's leave request") client, ok := r.clients[id] if !ok { logger.Error("No such client when acknowledging leave request") return } logger.Debug("Sending LeaveResponse to client") client.outgoingChannel <- LeaveResponse{id: r.id} } // closeClient causes the room to remove the client with the given id from its clients. // This should only be used after a shutdown handshake between this client and the room has taken place. func (r *room) closeClient(id xid.ID) { logger := r.logger.With(zap.Stringer("client", id)) logger.Debug("Closing client") client, ok := r.clients[id] if !ok { logger.Error("Attempted to close a client that didn't exist") return } if client.privateChannel { logger.Debug("Closing outgoingChannel, as client has a privateChannel") close(client.outgoingChannel) } delete(r.clients, id) } // gracefulShutdown causes the room to shut down cleanly, making sure that all clients have been removed. func (r *room) gracefulShutdown() { defer r.finalShutdown() if len(r.clients) == 0 { // Nothing to do, we've already won. r.logger.Debug("No remaining clients, so just shutting down") return } r.requestShutdown() for len(r.clients) > 0 { raw := <-r.incomingChannel client := raw.ClientID() msgLogger := r.logger.With(zap.Stringer("client", client)) msgLogger.Debug("Post-shutdown message received, handling", zap.Object("message", raw)) switch msg := raw.(type) { case JoinRequest: // Don't you hate it when someone comes to the desk right as you're getting ready to pack up? // Can't ignore them - they have our channel, and they might be sending things to it. We have to add them // and then immediately send them a ShutdownRequest and wait for them to answer it. msgLogger.Debug("Received join request from client while shutting down") r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel) r.requestShutdownFrom(client) case RefreshRequest: // Ugh, seriously, now? Fine. You can have this - you might be our friend the persistence actor. r.sendRefresh(client) case LeaveRequest: // We sent them a shutdown already, so unfortunately, we can't close them immediately. We have to wait for // them to tell us they've heard that we're shutting down. msgLogger.Debug("Received leave request from client while shutting down") r.acknowledgeLeave(client) case StopRequest: // Yes. We're doing that. Check your inbox, I already sent you the shutdown. msgLogger.Debug("Received stop request from client while shutting down") case ShutdownResponse: // The only way we would be getting one of these is if the client knows it doesn't have to send us anything // else. Therefore, we can remove them now. // Similarly, we know that they'll receive the LeaveResponse they need and shut down. // Like us, even if it sent a LeaveRequest before realizing we were shutting down, it would have gotten our // ShutdownRequest and sent this before it could read the LeaveResponse, but it will wait for the // LeaveResponse regardless. msgLogger.Debug("Received shutdown confirmation from client") r.closeClient(client) default: msgLogger.Debug("Ignoring irrelevant message from client while shutting down", zap.Object("message", raw)) } msgLogger.Debug("Message handled, resuming listening and waiting to be safe to shut down", zap.Int("clientsLeft", len(r.clients))) } r.logger.Debug("All clients have acknowledged the ShutdownRequest, now shutting down") } // requestShutdown produces a ShutdownRequest and sends it to all clients to indicate that the room is shutting down. func (r *room) requestShutdown() { r.logger.Debug("Alerting clients that shutdown is in progress", zap.Int("clientsLeft", len(r.clients))) for id := range r.clients { r.requestShutdownFrom(id) } } // requestShutdownFrom produces a ShutdownRequest and sends it to the client with id to indicate that the room is // shutting down. func (r *room) requestShutdownFrom(id xid.ID) { clientField := zap.Stringer("client", id) r.logger.Debug("Alerting client that shutdown is in progress", clientField) shutdown := ShutdownRequest{ id: r.id, } client, ok := r.clients[id] if !ok { r.logger.Error("No such client when requesting shutdown from client", clientField) } client.outgoingChannel <- shutdown } // finalShutdown causes the room to do any final cleanup not involving its clients before stopping. // Use gracefulShutdown instead, which calls this once it's safe to do so. func (r *room) finalShutdown() { r.logger.Debug("Closing incoming channel") close(r.incomingChannel) r.logger.Info("Shut down") }