You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hexmap/server/room/actor.go

274 lines
10 KiB

package room
import (
"github.com/rs/xid"
"go.uber.org/zap"
"hexmap-server/action"
"hexmap-server/state"
)
// act is the meat and potatoes of the room - it's responsible for actually running the room.
// It will gracefully remove all clients before shutting down.
func (r *room) act() {
defer r.gracefulShutdown()
r.logger.Info("Room starting up, listening for incoming actions")
for {
raw := <-r.incomingChannel
client := raw.ClientID()
msgLogger := r.logger.With(zap.Stringer("client", client))
msgLogger.Debug("Message received, handling", zap.Object("message", raw))
switch msg := raw.(type) {
case JoinRequest:
r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel)
r.acknowledgeJoin(msg.id, msg.wantCurrentState)
case RefreshRequest:
r.sendRefresh(msg.id)
case ApplyRequest:
msgLogger.Debug("Received action to apply from client", zap.Int("actionId", msg.action.ID))
result := r.applyAction(msg.action.Action)
if result != nil {
r.broadcastAction(client, msg.action.ID, msg.action.Action)
}
r.acknowledgeAction(client, msg.action.ID, result)
case LeaveRequest:
// So long, then. We can close immediately here; they promised not to send any more messages after this
// unless we were shutting down, which we're not.
r.acknowledgeLeave(client)
r.closeClient(client)
case StopRequest:
// As requested, we shut down. Our deferred gracefulShutdown will catch us as we fall.
msgLogger.Info("Received StopRequest from client, shutting down")
return
case ShutdownResponse:
// Uh... thank... you. I'm not... Never mind. I guess this means you're leaving?
msgLogger.Error("Received unexpected ShutdownResponse from client while not shutting down")
r.closeClient(client)
default:
msgLogger.Warn("Ignoring unhandled message", zap.Object("message", msg))
}
msgLogger.Debug("Message handled, resuming listening")
}
}
// addClient records a client's presence in the client map.
func (r *room) addClient(id xid.ID, returnChannel chan<- Message, broadcast bool, privateChannel bool) {
logger := r.logger.With(zap.Stringer("client", id))
logger.Debug("Adding client")
if client, ok := r.clients[id]; ok {
if client.outgoingChannel == returnChannel {
if broadcast == client.broadcast && privateChannel == client.privateChannel {
logger.Warn("Already have client when adding client")
} else {
logger.Error("Already have client but with different settings when adding client")
}
} else {
logger.Error("Already have a different client with the same id when adding client")
}
return
}
r.clients[id] = internalClient{
id: id,
outgoingChannel: returnChannel,
broadcast: broadcast,
privateChannel: privateChannel,
}
}
// stateCopy creates and returns a fresh copy of the current state.
// This avoids concurrent modification of the state while clients are reading it.
func (r *room) stateCopy() *state.Synced {
s := r.currentState.Copy()
return &s
}
// acknowledgeJoin composes and sends a JoinResponse to the given client.
func (r *room) acknowledgeJoin(id xid.ID, includeState bool) {
logger := r.logger.With(zap.Stringer("client", id))
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when acknowledging join")
return
}
var s *state.Synced = nil
if includeState {
logger.Debug("Preparing state copy for client")
s = r.stateCopy()
}
logger.Debug("Sending JoinResponse to client")
client.outgoingChannel <- JoinResponse{
id: r.id,
currentState: s,
}
}
// sendRefresh composes and sends a RefreshResponse to the given client.
func (r *room) sendRefresh(id xid.ID) {
logger := r.logger.With(zap.Stringer("client", id))
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when sending refresh")
return
}
var s *state.Synced = nil
logger.Debug("Preparing state copy for client")
s = r.stateCopy()
logger.Debug("Sending RefreshResponse to client")
client.outgoingChannel <- RefreshResponse{
id: r.id,
currentState: s,
}
}
// applyAction applies an action to the state and returns the result of it.
func (r *room) applyAction(action action.Syncable) error {
r.logger.Debug("Applying action", zap.Object("action", action))
return action.Apply(&r.currentState)
}
// broadcastAction sends an action to everyone other than the original client which requested it.
func (r *room) broadcastAction(originalClientID xid.ID, originalActionID int, action action.Syncable) {
logger := r.logger.With(zap.Stringer("originalClient", originalClientID), zap.Int("actionID", originalActionID), zap.Object("action", action))
broadcast := ActionBroadcast{
id: r.id,
originalClientID: originalClientID,
originalActionID: originalActionID,
action: action,
}
logger.Debug("Broadcasting action to all clients")
for id, client := range r.clients {
if id.Compare(originalClientID) != 0 {
logger.Debug("Sending ActionBroadcast to client", zap.Stringer("client", id))
client.outgoingChannel <- broadcast
}
}
}
// acknowledgeAction sends a response to the original client which requested an action.
func (r *room) acknowledgeAction(id xid.ID, actionID int, error error) {
logger := r.logger.With(zap.Stringer("id", id), zap.Int("actionId", actionID), zap.Error(error))
logger.Debug("Responding to client with the status of its action")
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when acknowledging action")
return
}
logger.Debug("Sending ApplyResponse to client")
client.outgoingChannel <- ApplyResponse{
id: id,
actionID: actionID,
result: error,
}
}
// acknowledgeLeave causes the room to signal to the client that it has received and acknowledged the client's LeaveRequest,
// and will not send any further messages.
func (r *room) acknowledgeLeave(id xid.ID) {
logger := r.logger.With(zap.Stringer("client", id))
logger.Debug("Acknowledging client's leave request")
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when acknowledging leave request")
return
}
logger.Debug("Sending LeaveResponse to client")
client.outgoingChannel <- LeaveResponse{id: r.id}
}
// closeClient causes the room to remove the client with the given id from its clients.
// This should only be used after a shutdown handshake between this client and the room has taken place.
func (r *room) closeClient(id xid.ID) {
logger := r.logger.With(zap.Stringer("client", id))
logger.Debug("Closing client")
client, ok := r.clients[id]
if !ok {
logger.Error("Attempted to close a client that didn't exist")
return
}
if client.privateChannel {
logger.Debug("Closing outgoingChannel, as client has a privateChannel")
close(client.outgoingChannel)
}
delete(r.clients, id)
}
// gracefulShutdown causes the room to shut down cleanly, making sure that all clients have been removed.
func (r *room) gracefulShutdown() {
defer r.finalShutdown()
if len(r.clients) == 0 {
// Nothing to do, we've already won.
r.logger.Debug("No remaining clients, so just shutting down")
return
}
r.requestShutdown()
for len(r.clients) > 0 {
raw := <-r.incomingChannel
client := raw.ClientID()
msgLogger := r.logger.With(zap.Stringer("client", client))
msgLogger.Debug("Post-shutdown message received, handling", zap.Object("message", raw))
switch msg := raw.(type) {
case JoinRequest:
// Don't you hate it when someone comes to the desk right as you're getting ready to pack up?
// Can't ignore them - they have our channel, and they might be sending things to it. We have to add them
// and then immediately send them a ShutdownRequest and wait for them to answer it.
msgLogger.Debug("Received join request from client while shutting down")
r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel)
r.requestShutdownFrom(client)
case RefreshRequest:
// Ugh, seriously, now? Fine. You can have this - you might be our friend the persistence actor.
r.sendRefresh(client)
case LeaveRequest:
// We sent them a shutdown already, so unfortunately, we can't close them immediately. We have to wait for
// them to tell us they've heard that we're shutting down.
msgLogger.Debug("Received leave request from client while shutting down")
r.acknowledgeLeave(client)
case StopRequest:
// Yes. We're doing that. Check your inbox, I already sent you the shutdown.
msgLogger.Debug("Received stop request from client while shutting down")
case ShutdownResponse:
// The only way we would be getting one of these is if the client knows it doesn't have to send us anything
// else. Therefore, we can remove them now.
// Similarly, we know that they'll receive the LeaveResponse they need and shut down.
// Like us, even if it sent a LeaveRequest before realizing we were shutting down, it would have gotten our
// ShutdownRequest and sent this before it could read the LeaveResponse, but it will wait for the
// LeaveResponse regardless.
msgLogger.Debug("Received shutdown confirmation from client")
r.closeClient(client)
default:
msgLogger.Debug("Ignoring irrelevant message from client while shutting down", zap.Object("message", raw))
}
msgLogger.Debug("Message handled, resuming listening and waiting to be safe to shut down", zap.Int("clientsLeft", len(r.clients)))
}
r.logger.Debug("All clients have acknowledged the ShutdownRequest, now shutting down")
}
// requestShutdown produces a ShutdownRequest and sends it to all clients to indicate that the room is shutting down.
func (r *room) requestShutdown() {
r.logger.Debug("Alerting clients that shutdown is in progress", zap.Int("clientsLeft", len(r.clients)))
for id := range r.clients {
r.requestShutdownFrom(id)
}
}
// requestShutdownFrom produces a ShutdownRequest and sends it to the client with id to indicate that the room is
// shutting down.
func (r *room) requestShutdownFrom(id xid.ID) {
clientField := zap.Stringer("client", id)
r.logger.Debug("Alerting client that shutdown is in progress", clientField)
shutdown := ShutdownRequest{
id: r.id,
}
client, ok := r.clients[id]
if !ok {
r.logger.Error("No such client when requesting shutdown from client", clientField)
}
client.outgoingChannel <- shutdown
}
// finalShutdown causes the room to do any final cleanup not involving its clients before stopping.
// Use gracefulShutdown instead, which calls this once it's safe to do so.
func (r *room) finalShutdown() {
r.logger.Debug("Closing incoming channel")
close(r.incomingChannel)
r.logger.Info("Shut down")
}