diff --git a/server/action/client.go b/server/action/client.go new file mode 100644 index 0000000..22445fb --- /dev/null +++ b/server/action/client.go @@ -0,0 +1,34 @@ +package action + +import ( + "go.uber.org/zap/zapcore" +) + +// ClientHello is the action sent by the client when it first establishes the connection. +type ClientHello struct { + // Version is the protocol version the client is running. + Version int `json:"version"` +} + +// ClientRefresh is the action sent by the client when it needs the full state re-sent. +type ClientRefresh struct { +} + +// IDed contains a pair of ID and Action, as sent by the server. +type IDed struct { + // ID contains the arbitrary ID that was sent by the client, for identifying the action in future messages. + ID int `json:"id"` + // Action contains the action that was actually being sent. + Action Syncable `json:"action"` +} + +func (i IDed) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt("id", i.ID) + return encoder.AddObject("action", i.Action) +} + +// ClientSent is an action sent in order to deliver one or more Syncable actions to the server. +type ClientSent struct { + // Actions contains the actions the client wants to apply, in the order they should be applied. + Actions []IDed `json:"nested"` +} diff --git a/server/action/map.go b/server/action/map.go new file mode 100644 index 0000000..0e79a26 --- /dev/null +++ b/server/action/map.go @@ -0,0 +1,36 @@ +package action + +import ( + "go.uber.org/zap/zapcore" + "hexmap-server/state" +) + +// CellColor is the action sent when a cell of the map has been colored a different color. +type CellColor struct { + // At is the location of the cell in storage coordinates. + At state.StorageCoordinates `json:"at"` + // Color is the color the cell has been changed to. + Color state.HexColor `json:"color"` +} + +func (c CellColor) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + err := encoder.AddObject("at", c.At) + encoder.AddString("color", c.Color.String()) + return err +} + +// Apply sets the target cell's color, or returns ErrNoOp if it can't. +func (c CellColor) Apply(s *state.Synced) error { + if c.Color.A < 0xF { + return ErrNoTransparentColors + } + cell, err := s.Map.LineCells.GetCellAt(c.At) + if err != nil { + return err + } + if cell.Color == c.Color { + return ErrNoOp + } + cell.Color = c.Color + return nil +} diff --git a/server/action/server.go b/server/action/server.go new file mode 100644 index 0000000..454c207 --- /dev/null +++ b/server/action/server.go @@ -0,0 +1,42 @@ +package action + +import "hexmap-server/state" + +// ServerHello is the action sent to establish the current state of the server when a new client connects. +type ServerHello struct { + // Version is the protocol version the server is running. + Version int `json:"version"` + // State is the complete state of the server as of when the client joined. + State state.Synced `json:"state"` +} + +// ServerRefresh is the action sent to reestablish the current state of the server in response to ClientRefresh. +type ServerRefresh struct { + // State is the complete state of the server as of when the corresponding ClientRefresh was processed. + State state.Synced `json:"state"` +} + +// ServerOK is the action sent when one or more client actions have been accepted and applied. +type ServerOK struct { + // IDs contains the IDs of the actions which were accepted and applied, in the order they were accepted and applied. + // This is the same as the order they were received, though other actions may have been between these that were + // rejected. + IDs []int `json:"ids"` +} + +// ServerFailed is the action sent when one or more client actions have been rejected. +type ServerFailed struct { + // IDs contains the IDs of the actions which were rejected, in the order they were rejected. + // This is the same as the order they were received, though other actions may have been between these that were + // accepted and applied. + IDs []int `json:"ids"` + // Error contains the error text sent from the server about why these actions failed. + Error string `json:"error"` +} + +// ServerSent is the action sent when one or more client actions from other clients have been accepted and applied. +// The client's own actions will never be included in this action. +type ServerSent struct { + // Actions contains the actions that are now being applied. + Actions []Syncable `json:"actions"` +} diff --git a/server/actions/syncable/action.go b/server/action/syncable.go similarity index 63% rename from server/actions/syncable/action.go rename to server/action/syncable.go index 4b2f701..e3581b4 100644 --- a/server/actions/syncable/action.go +++ b/server/action/syncable.go @@ -1,18 +1,20 @@ -package syncable +package action import ( "errors" + "go.uber.org/zap/zapcore" "hexmap-server/state" ) -var ErrorNoOp error = errors.New("action's effects were already applied, or it's an empty action") +var ErrNoOp error = errors.New("action's effects were already applied, or it's an empty action") -// Action is the interface for actions that can be shared. -type Action interface { +// Syncable is the interface for action that can be shared. +type Syncable interface { + zapcore.ObjectMarshaler // Apply causes the action's effects to be applied to s, mutating it in place. // All syncable.Actions must conform to the standard that if an action can't be correctly applied, or if it would // have no effect, it returns an error without changing s. - // If an action can be correctly applied but would have no effect, it should return ErrorNoOp. + // If an action can be correctly applied but would have no effect, it should return ErrNoOp. // If an action is correctly applied and has an effect, it should return nil. Apply(s *state.Synced) error } diff --git a/server/action/user.go b/server/action/user.go new file mode 100644 index 0000000..cca2f67 --- /dev/null +++ b/server/action/user.go @@ -0,0 +1,34 @@ +package action + +import ( + "errors" + "go.uber.org/zap/zapcore" + "hexmap-server/state" +) + +// ErrNoTransparentColors is returned when a user tries to set their active color or a cell color to transparent. +// Transparent here is defined as having an alpha component of less than 15 (0xF). +var ErrNoTransparentColors error = errors.New("transparent colors not allowed") + +// UserActiveColor is the action sent when the user's current color, the one being painted with, changes. +type UserActiveColor struct { + // Color is the color that is now active. + Color state.HexColor `json:"color"` +} + +func (c UserActiveColor) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("color", c.Color.String()) + return nil +} + +// Apply sets the user's active color, or returns ErrNoOp if it can't. +func (c UserActiveColor) Apply(s *state.Synced) error { + if c.Color.A < 0xF { + return ErrNoTransparentColors + } + if s.User.ActiveColor == c.Color { + return ErrNoOp + } + s.User.ActiveColor = c.Color + return nil +} diff --git a/server/actions/client/client.go b/server/actions/client/client.go deleted file mode 100644 index b391d6e..0000000 --- a/server/actions/client/client.go +++ /dev/null @@ -1,19 +0,0 @@ -package client - -import "hexmap-server/actions/syncable" - -type Hello struct { - Version int `json:"version"` -} - -type Refresh struct { -} - -type SentAction struct { - Id int `json:"id"` - Action syncable.Action `json:"action"` -} - -type SentActions struct { - Nested []SentAction `json:"nested"` -} diff --git a/server/actions/server/server.go b/server/actions/server/server.go deleted file mode 100644 index abb4e43..0000000 --- a/server/actions/server/server.go +++ /dev/null @@ -1 +0,0 @@ -package server diff --git a/server/go.mod b/server/go.mod index bdf7d9b..fbf60c8 100644 --- a/server/go.mod +++ b/server/go.mod @@ -2,4 +2,7 @@ module hexmap-server go 1.16 -require github.com/rs/xid v1.3.0 // indirect +require ( + github.com/rs/xid v1.3.0 + go.uber.org/zap v1.18.1 +) diff --git a/server/go.sum b/server/go.sum index 47ea790..bc03dce 100644 --- a/server/go.sum +++ b/server/go.sum @@ -1,2 +1,50 @@ +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.18.1 h1:CSUJ2mjFszzEWt4CdKISEuChVIXGBn3lAPwkRGyVrc4= +go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/room/actor.go b/server/room/actor.go new file mode 100644 index 0000000..d75dbe8 --- /dev/null +++ b/server/room/actor.go @@ -0,0 +1,273 @@ +package room + +import ( + "github.com/rs/xid" + "go.uber.org/zap" + "hexmap-server/action" + "hexmap-server/state" +) + +// act is the meat and potatoes of the room - it's responsible for actually running the room. +// It will gracefully remove all clients before shutting down. +func (r *room) act() { + defer r.gracefulShutdown() + r.logger.Info("Room starting up, listening for incoming actions") + for { + raw := <-r.incomingChannel + client := raw.ClientID() + msgLogger := r.logger.With(zap.Stringer("client", client)) + msgLogger.Debug("Message received, handling", zap.Object("message", raw)) + switch msg := raw.(type) { + case JoinRequest: + r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel) + r.acknowledgeJoin(msg.id, msg.wantCurrentState) + case RefreshRequest: + r.sendRefresh(msg.id) + case ApplyRequest: + msgLogger.Debug("Received action to apply from client", zap.Int("actionId", msg.action.ID)) + result := r.applyAction(msg.action.Action) + if result != nil { + r.broadcastAction(client, msg.action.ID, msg.action.Action) + } + r.acknowledgeAction(client, msg.action.ID, result) + case LeaveRequest: + // So long, then. We can close immediately here; they promised not to send any more messages after this + // unless we were shutting down, which we're not. + r.acknowledgeLeave(client) + r.closeClient(client) + case StopRequest: + // As requested, we shut down. Our deferred gracefulShutdown will catch us as we fall. + msgLogger.Info("Received StopRequest from client, shutting down") + return + case ShutdownResponse: + // Uh... thank... you. I'm not... Never mind. I guess this means you're leaving? + msgLogger.Error("Received unexpected ShutdownResponse from client while not shutting down") + r.closeClient(client) + default: + msgLogger.Warn("Ignoring unhandled message", zap.Object("message", msg)) + } + msgLogger.Debug("Message handled, resuming listening") + } +} + +// addClient records a client's presence in the client map. +func (r *room) addClient(id xid.ID, returnChannel chan<- Message, broadcast bool, privateChannel bool) { + logger := r.logger.With(zap.Stringer("client", id)) + logger.Debug("Adding client") + if client, ok := r.clients[id]; ok { + if client.outgoingChannel == returnChannel { + if broadcast == client.broadcast && privateChannel == client.privateChannel { + logger.Warn("Already have client when adding client") + } else { + logger.Error("Already have client but with different settings when adding client") + } + } else { + logger.Error("Already have a different client with the same id when adding client") + } + return + } + r.clients[id] = internalClient{ + id: id, + outgoingChannel: returnChannel, + broadcast: broadcast, + privateChannel: privateChannel, + } +} + +// stateCopy creates and returns a fresh copy of the current state. +// This avoids concurrent modification of the state while clients are reading it. +func (r *room) stateCopy() *state.Synced { + s := r.currentState.Copy() + return &s +} + +// acknowledgeJoin composes and sends a JoinResponse to the given client. +func (r *room) acknowledgeJoin(id xid.ID, includeState bool) { + logger := r.logger.With(zap.Stringer("client", id)) + client, ok := r.clients[id] + if !ok { + logger.Error("No such client when acknowledging join") + return + } + var s *state.Synced = nil + if includeState { + logger.Debug("Preparing state copy for client") + s = r.stateCopy() + } + logger.Debug("Sending JoinResponse to client") + client.outgoingChannel <- JoinResponse{ + id: r.id, + currentState: s, + } +} + +// sendRefresh composes and sends a RefreshResponse to the given client. +func (r *room) sendRefresh(id xid.ID) { + logger := r.logger.With(zap.Stringer("client", id)) + client, ok := r.clients[id] + if !ok { + logger.Error("No such client when sending refresh") + return + } + var s *state.Synced = nil + logger.Debug("Preparing state copy for client") + s = r.stateCopy() + logger.Debug("Sending RefreshResponse to client") + client.outgoingChannel <- RefreshResponse{ + id: r.id, + currentState: s, + } +} + +// applyAction applies an action to the state and returns the result of it. +func (r *room) applyAction(action action.Syncable) error { + r.logger.Debug("Applying action", zap.Object("action", action)) + return action.Apply(&r.currentState) +} + +// broadcastAction sends an action to everyone other than the original client which requested it. +func (r *room) broadcastAction(originalClientID xid.ID, originalActionID int, action action.Syncable) { + logger := r.logger.With(zap.Stringer("originalClient", originalClientID), zap.Int("actionID", originalActionID), zap.Object("action", action)) + broadcast := ActionBroadcast{ + id: r.id, + originalClientID: originalClientID, + originalActionID: originalActionID, + action: action, + } + logger.Debug("Broadcasting action to all clients") + for id, client := range r.clients { + if id.Compare(originalClientID) != 0 { + logger.Debug("Sending ActionBroadcast to client", zap.Stringer("client", id)) + client.outgoingChannel <- broadcast + } + } +} + +// acknowledgeAction sends a response to the original client which requested an action. +func (r *room) acknowledgeAction(id xid.ID, actionID int, error error) { + logger := r.logger.With(zap.Stringer("id", id), zap.Int("actionId", actionID), zap.Error(error)) + logger.Debug("Responding to client with the status of its action") + client, ok := r.clients[id] + if !ok { + logger.Error("No such client when acknowledging action") + return + } + logger.Debug("Sending ApplyResponse to client") + client.outgoingChannel <- ApplyResponse{ + id: id, + actionID: actionID, + result: error, + } +} + +// acknowledgeLeave causes the room to signal to the client that it has received and acknowledged the client's LeaveRequest, +// and will not send any further messages. +func (r *room) acknowledgeLeave(id xid.ID) { + logger := r.logger.With(zap.Stringer("client", id)) + logger.Debug("Acknowledging client's leave request") + client, ok := r.clients[id] + if !ok { + logger.Error("No such client when acknowledging leave request") + return + } + logger.Debug("Sending LeaveResponse to client") + client.outgoingChannel <- LeaveResponse{id: r.id} +} + +// closeClient causes the room to remove the client with the given id from its clients. +// This should only be used after a shutdown handshake between this client and the room has taken place. +func (r *room) closeClient(id xid.ID) { + logger := r.logger.With(zap.Stringer("client", id)) + logger.Debug("Closing client") + client, ok := r.clients[id] + if !ok { + logger.Error("Attempted to close a client that didn't exist") + return + } + if client.privateChannel { + logger.Debug("Closing outgoingChannel, as client has a privateChannel") + close(client.outgoingChannel) + } + delete(r.clients, id) +} + +// gracefulShutdown causes the room to shut down cleanly, making sure that all clients have been removed. +func (r *room) gracefulShutdown() { + defer r.finalShutdown() + if len(r.clients) == 0 { + // Nothing to do, we've already won. + r.logger.Debug("No remaining clients, so just shutting down") + return + } + r.requestShutdown() + for len(r.clients) > 0 { + raw := <-r.incomingChannel + client := raw.ClientID() + msgLogger := r.logger.With(zap.Stringer("client", client)) + msgLogger.Debug("Post-shutdown message received, handling", zap.Object("message", raw)) + switch msg := raw.(type) { + case JoinRequest: + // Don't you hate it when someone comes to the desk right as you're getting ready to pack up? + // Can't ignore them - they have our channel, and they might be sending things to it. We have to add them + // and then immediately send them a ShutdownRequest and wait for them to answer it. + msgLogger.Debug("Received join request from client while shutting down") + r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel) + r.requestShutdownFrom(client) + case RefreshRequest: + // Ugh, seriously, now? Fine. You can have this - you might be our friend the persistence actor. + r.sendRefresh(client) + case LeaveRequest: + // We sent them a shutdown already, so unfortunately, we can't close them immediately. We have to wait for + // them to tell us they've heard that we're shutting down. + msgLogger.Debug("Received leave request from client while shutting down") + r.acknowledgeLeave(client) + case StopRequest: + // Yes. We're doing that. Check your inbox, I already sent you the shutdown. + msgLogger.Debug("Received stop request from client while shutting down") + case ShutdownResponse: + // The only way we would be getting one of these is if the client knows it doesn't have to send us anything + // else. Therefore, we can remove them now. + // Similarly, we know that they'll receive the LeaveResponse they need and shut down. + // Like us, even if it sent a LeaveRequest before realizing we were shutting down, it would have gotten our + // ShutdownRequest and sent this before it could read the LeaveResponse, but it will wait for the + // LeaveResponse regardless. + msgLogger.Debug("Received shutdown confirmation from client") + r.closeClient(client) + default: + msgLogger.Debug("Ignoring irrelevant message from client while shutting down", zap.Object("message", raw)) + } + msgLogger.Debug("Message handled, resuming listening and waiting to be safe to shut down", zap.Int("clientsLeft", len(r.clients))) + } + r.logger.Debug("All clients have acknowledged the ShutdownRequest, now shutting down") +} + +// requestShutdown produces a ShutdownRequest and sends it to all clients to indicate that the room is shutting down. +func (r *room) requestShutdown() { + r.logger.Debug("Alerting clients that shutdown is in progress", zap.Int("clientsLeft", len(r.clients))) + for id := range r.clients { + r.requestShutdownFrom(id) + } +} + +// requestShutdownFrom produces a ShutdownRequest and sends it to the client with id to indicate that the room is +// shutting down. +func (r *room) requestShutdownFrom(id xid.ID) { + clientField := zap.Stringer("client", id) + r.logger.Debug("Alerting client that shutdown is in progress", clientField) + shutdown := ShutdownRequest{ + id: r.id, + } + client, ok := r.clients[id] + if !ok { + r.logger.Error("No such client when requesting shutdown from client", clientField) + } + client.outgoingChannel <- shutdown +} + +// finalShutdown causes the room to do any final cleanup not involving its clients before stopping. +// Use gracefulShutdown instead, which calls this once it's safe to do so. +func (r *room) finalShutdown() { + r.logger.Debug("Closing incoming channel") + close(r.incomingChannel) + r.logger.Info("Shut down") +} diff --git a/server/room/client.go b/server/room/client.go new file mode 100644 index 0000000..d226986 --- /dev/null +++ b/server/room/client.go @@ -0,0 +1,165 @@ +package room + +import ( + "github.com/rs/xid" + "go.uber.org/zap" +) + +// internalClient is used by the room itself to track information about a client. +type internalClient struct { + // id is the id that the client identifies itself with in all clientMessage instances it sends. + id xid.ID + // outgoingChannel is a channel that the room can send messages to the client on. + outgoingChannel chan<- Message + // privateChannel is true iff the room can close the outgoingChannel when the client and room have completed their + // close handshake. + privateChannel bool + // broadcast is true iff the client requested to be included on broadcasts on creation. + broadcast bool +} + +type NewClientOptions struct { + // IncomingChannel is the channel to use as the room's channel to send messages to - the new Client's IncomingChannel. + // If this is non-nil, the room will not automatically close the IncomingChannel after a shutdown is negotiated. + // If this is nil, a new channel will be allocated on join and closed on shutdown. + IncomingChannel chan Message + // If AcceptBroadcasts is true, the room will send all broadcasts originating from other clients to this client. + AcceptBroadcasts bool + // If RequestStartingState is true, the room will send a copy of the current state as of when the JoinRequest was + // received in the JoinResponse that will be the first message the Client receives. + RequestStartingState bool + // If sets, + Logger *zap.Logger +} + +// Client is the structure used by clients external to the Room package to communicate with the Room. +// It is not expected to be parallel-safe; to run it in parallel, use the NewClient method and send the new client to +// the new goroutine. +type Client struct { + // id is the ClientID used by the client for all communications. + id xid.ID + // roomId is the unique ID of the room (not its map). + roomId xid.ID + // incomingChannel is the channel that this client receives messages on. + incomingChannel <-chan Message + // outgoingChannel is the channel that this client sends messages on. + // Becomes nil if the client has been completely shut down. + outgoingChannel chan<- ClientMessage + // Once Leave or AcknowledgeShutdown have been triggered, this flag is set, preventing use of other messages. + shuttingDown bool +} + +// ID is the ID used by this client to identify itself to the Room. +func (c *Client) ID() xid.ID { + return c.id +} + +// RoomID is the ID used by the room to differentiate itself from other rooms. +// It is not the map's internal ID. +func (c *Client) RoomID() xid.ID { + return c.id +} + +// IncomingChannel is the channel the client can listen on for messages from the room. +func (c *Client) IncomingChannel() <-chan Message { + return c.incomingChannel +} + +// OutgoingChannel is the channel the client can send messages to the room on. +func (c *Client) OutgoingChannel() chan<- ClientMessage { + if c.outgoingChannel == nil { + panic("Already finished shutting down; no new messages should be sent") + } + return c.outgoingChannel +} + +// newClientForRoom uses the necessary parameters to create and join a Client for the given room. +func newClientForRoom(roomId xid.ID, outgoingChannel chan<- ClientMessage, opts NewClientOptions) *Client { + var privateChannel bool + var incomingChannel chan Message + if opts.IncomingChannel != nil { + incomingChannel = opts.IncomingChannel + privateChannel = false + } else { + incomingChannel = make(chan Message, 1) + privateChannel = true + } + result := Client{ + id: xid.New(), + roomId: roomId, + incomingChannel: incomingChannel, + outgoingChannel: outgoingChannel, + shuttingDown: false, + } + result.outgoingChannel <- JoinRequest{ + id: result.id, + returnChannel: incomingChannel, + privateChannel: privateChannel, + broadcast: opts.AcceptBroadcasts, + wantCurrentState: opts.RequestStartingState, + } + return &result +} + +// NewClient creates a new client belonging to the same room as this client with a random ID. +// The new client will be automatically joined to the channel. +func (c *Client) NewClient(opts NewClientOptions) *Client { + if c.shuttingDown { + panic("Already started shutting down; no new messages should be sent") + } + return newClientForRoom(c.roomId, c.outgoingChannel, opts) +} + +// The message created by Refresh causes the client to request a fresh copy of the state. +func (c *Client) Refresh() RefreshRequest { + if c.shuttingDown { + panic("Already started shutting down; no new messages should be sent") + } + return RefreshRequest{ + id: c.id, + } +} + +// The message created by Leave causes the local client to signal that it is shutting down. +// It is important to Leave to avoid dangling clients having messages sent to nothing. +// After sending Leave, the client must confirm that it has been removed by waiting for a LeaveResponse, accompanied by +// the closing of the Client's IncomingChannel if it was a private channel. +// No further messages should be sent after Leave except AcknowledgeShutdown if Leave and requestShutdown crossed paths in midair. +func (c *Client) Leave() LeaveRequest { + if c.shuttingDown { + panic("Already started shutting down; no new messages should be sent") + } + c.shuttingDown = true + return LeaveRequest{ + id: c.id, + } +} + +// The message created by Stop causes the local client to signal that it is shutting down. +// It is important to Stop when the room needs to be shut down. +// After sending Stop, the client must confirm that it has been removed by waiting for a ShutdownRequest, which should +// be handled normally. +// No further messages should be sent after Stop except AcknowledgeShutdown. +func (c *Client) Stop() StopRequest { + if c.shuttingDown { + panic("Already started shutting down; no new messages should be sent") + } + c.shuttingDown = true + return StopRequest{ + id: c.id, + } +} + +// AcknowledgeShutdown causes the local client to signal that it has acknowledged that the room is shutting down. +// No further messages can be sent after AcknowledgeShutdown; attempting to do so will block forever, as the +// OutgoingChannel has become nil. +func (c *Client) AcknowledgeShutdown() ShutdownResponse { + if c.outgoingChannel == nil { + panic("Already finished shutting down; no new messages should be sent") + } + c.shuttingDown = true + c.outgoingChannel = nil + return ShutdownResponse{ + id: c.id, + } +} diff --git a/server/room/clientmessage.go b/server/room/clientmessage.go new file mode 100644 index 0000000..8238d64 --- /dev/null +++ b/server/room/clientmessage.go @@ -0,0 +1,124 @@ +package room + +import ( + "github.com/rs/xid" + "go.uber.org/zap/zapcore" + "hexmap-server/action" +) + +// ClientMessage marks messages coming from clients to the room. +type ClientMessage interface { + zapcore.ObjectMarshaler + // SourceID is the id of the client sending the message. + ClientID() xid.ID +} + +// JoinRequest is the message sent on the room's IncomingChannel by a new client joining the room. +type JoinRequest struct { + // id is the SourceID the client will use to identify itself in future messages. + id xid.ID + // returnChannel is a buffered channel the client is ready to receive messages from the room on. + // This becomes the Client's OutgoingChannel. + returnChannel chan<- Message + // privateChannel is true iff the room can close returnChannel after completing a shutdown handshake. + // This permits extra safety by causing channels that somehow leak into other contexts to become noticeable by + // causing panics. + privateChannel bool + // broadcast is true iff the room should send action.Syncable from other clients to this one. + broadcast bool + // wantCurrentState indicates that the client would like the room to include a copy of the current state of the room + // when it joins. + wantCurrentState bool +} + +func (j JoinRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "JoinRequest") + encoder.AddString("id", j.id.String()) + encoder.AddBool("broadcast", j.broadcast) + encoder.AddBool("wantCurrentState", j.wantCurrentState) + return nil +} + +func (j JoinRequest) ClientID() xid.ID { + return j.id +} + +// RefreshRequest is the message sent on the room's IncomingChannel by a client which needs the current value. +type RefreshRequest struct { + id xid.ID +} + +func (r RefreshRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "RefreshRequest") + encoder.AddString("id", r.id.String()) + return nil +} + +func (r RefreshRequest) ClientID() xid.ID { + return r.id +} + +// ApplyRequest is the message sent on the room's IncomingChannel by a client which has received an action from the +// websocket. +type ApplyRequest struct { + id xid.ID + action action.IDed +} + +func (f ApplyRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "ApplyRequest") + encoder.AddString("id", f.id.String()) + return encoder.AddObject("action", f.action) +} + +func (f ApplyRequest) ClientID() xid.ID { + return f.id +} + +// LeaveRequest is the message sent on the room's IncomingChannel by a client which is shutting down. +// The client is indicating that it will send no messages except a possible ShutdownResponse, in the event that a +// LeaveRequest and a ShutdownRequest cross paths midflight. +type LeaveRequest struct { + id xid.ID +} + +func (l LeaveRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "LeaveRequest") + encoder.AddString("id", l.id.String()) + return nil +} + +func (l LeaveRequest) ClientID() xid.ID { + return l.id +} + +// StopRequest is the message sent on the room's IncomingChannel by a client which wants to make the room shut down. +// The response to a StopRequest is a ShutdownRequest, which should be handled as normal. +type StopRequest struct { + id xid.ID +} + +func (s StopRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "StopRequest") + encoder.AddString("id", s.id.String()) + return nil +} + +func (s StopRequest) ClientID() xid.ID { + return s.id +} + +// ShutdownResponse is the message sent on the room's IncomingChannel by a client which has accepted the room's ShutdownRequest. +type ShutdownResponse struct { + id xid.ID +} + +func (s ShutdownResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "ShutdownResponse") + encoder.AddString("id", s.id.String()) + return nil +} + +func (s ShutdownResponse) ClientID() xid.ID { + return s.id +} diff --git a/server/room/message.go b/server/room/message.go new file mode 100644 index 0000000..aa2aac5 --- /dev/null +++ b/server/room/message.go @@ -0,0 +1,147 @@ +package room + +import ( + "github.com/rs/xid" + "go.uber.org/zap/zapcore" + "hexmap-server/action" + "hexmap-server/state" +) + +// Message marks messages going to clients from the room. +type Message interface { + zapcore.ObjectMarshaler + // RoomID marks the ID of the room this Message originated from, in case the Client has a shared IncomingChannel. + RoomID() xid.ID +} + +// JoinResponse is the message sent by the room on a new client's IncomingChannel after it joins. +type JoinResponse struct { + id xid.ID + // currentState is a pointer to a copy of the room's current state. + // If the client refused the room state in its join message, this will be a nil pointer instead. + currentState *state.Synced +} + +func (j JoinResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "JoinResponse") + encoder.AddString("id", j.id.String()) + return encoder.AddObject("currentState", j.currentState) +} + +func (j JoinResponse) RoomID() xid.ID { + return j.id +} + +// CurrentState returns the state of the room as of when the JoinRequest was processed. +func (j JoinResponse) CurrentState() *state.Synced { + return j.currentState +} + +// RefreshResponse is the message sent by the room after a client requests it, or immediately on join. +type RefreshResponse struct { + id xid.ID + // currentState is a pointer to a copy of the room's current state. + currentState *state.Synced +} + +func (r RefreshResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "JoinResponse") + encoder.AddString("id", r.id.String()) + return encoder.AddObject("currentState", r.currentState) +} + +func (r RefreshResponse) RoomID() xid.ID { + return r.id +} + +// CurrentState returns the state of the room as of when the RefreshRequest was processed. +func (r RefreshResponse) CurrentState() *state.Synced { + return r.currentState +} + +// ApplyResponse returns the result of an action to _only_ the one that sent the ApplyRequest. +type ApplyResponse struct { + id xid.ID + // actionID is the ID of the action that completed or failed. + actionID int + // result is nil if the action was completed, or an error if it failed. + result error +} + +func (a ApplyResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "ApplyResponse") + encoder.AddString("id", a.id.String()) + encoder.AddInt("actionId", a.actionID) + encoder.AddBool("success", a.result == nil) + if a.result != nil { + encoder.AddString("failure", a.result.Error()) + } + return nil +} + +func (a ApplyResponse) RoomID() xid.ID { + return a.id +} + +// Success returns true if the action succeeded, false if it failed. +func (a ApplyResponse) Success() bool { + return a.result == nil +} + +// Failure returns the error if the action failed, or nil if it succeeded. +func (a ApplyResponse) Failure() error { + return a.result +} + +// ActionBroadcast is sent to all clients _other_ than the one that sent the ApplyRequest when an action succeeds. +type ActionBroadcast struct { + id xid.ID + // originalClientID is the client that sent the action in the first place. + originalClientID xid.ID + // originalActionID is the ID that the client that sent the action sent. + originalActionID int + // action is the action that succeeded. + action action.Syncable +} + +func (a ActionBroadcast) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "ActionBroadcast") + encoder.AddString("id", a.id.String()) + encoder.AddString("originalClientId", a.originalClientID.String()) + encoder.AddInt("originalActionId", a.originalActionID) + return encoder.AddObject("action", a.action) +} + +func (a ActionBroadcast) RoomID() xid.ID { + return a.id +} + +// LeaveResponse is the message sent by the room when it has accepted that a client has left, and will send it no further messages. +type LeaveResponse struct { + id xid.ID +} + +func (l LeaveResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "LeaveResponse") + encoder.AddString("id", l.id.String()) + return nil +} + +func (l LeaveResponse) RoomID() xid.ID { + return l.id +} + +// ShutdownRequest is the message sent by the room when something causes it to shut down. It will send the client no further messages except a possible LeaveResponse. +type ShutdownRequest struct { + id xid.ID +} + +func (s ShutdownRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("type", "ShutdownRequest") + encoder.AddString("id", s.id.String()) + return nil +} + +func (s ShutdownRequest) RoomID() xid.ID { + return s.id +} diff --git a/server/room/room.go b/server/room/room.go new file mode 100644 index 0000000..7ff1cd3 --- /dev/null +++ b/server/room/room.go @@ -0,0 +1,54 @@ +package room + +import ( + "github.com/rs/xid" + "go.uber.org/zap" + "hexmap-server/state" +) + +// NewOptions is the set of information used to control what a room starts with. +type NewOptions struct { + // BaseLogger is the logger that the room should attach its data to. + BaseLogger *zap.Logger + // StartingState is a state.Synced that defines the state of the room on creation. + StartingState state.Synced + // StartingClientOptions sets the configuration of the first client to be created, the one that will be returned + // from New. + StartingClientOptions NewClientOptions +} + +// room is a room as seen from within - the information needed for the room process to do its duties. +type room struct { + // id is the room's internal ID - not to be confused with the ID of the map it is serving. + id xid.ID + // incomingChannel is the channel the room uses to receive messages. The room itself owns this channel, + // so when the clients map is empty, it can be closed. + incomingChannel chan ClientMessage + // clients contains the map of active clients, each of which is known to have a reference to this room. + clients map[xid.ID]internalClient + // currentState contains the active state being used by actions right now. + currentState state.Synced + // logger is the logger that this room will use. It contains context fields for the room's important fields. + logger *zap.Logger +} + +// Creates and starts up a new room, joins a new client to it and returns that client. +func New(opts NewOptions) *Client { + logger := opts.BaseLogger.Named("Room") + id := xid.New() + r := room{ + id: id, + incomingChannel: make(chan ClientMessage), + clients: make(map[xid.ID]internalClient), + currentState: opts.StartingState, + logger: logger, + } + go r.act() + return r.newClient(opts.StartingClientOptions) +} + +// newClient creates a new client belonging to this room with a random ID. +// The new client will be automatically joined to the channel. +func (r *room) newClient(opts NewClientOptions) *Client { + return newClientForRoom(r.id, r.incomingChannel, opts) +} diff --git a/server/state/coordinates.go b/server/state/coordinates.go index 4608586..6ca05e1 100644 --- a/server/state/coordinates.go +++ b/server/state/coordinates.go @@ -1,5 +1,7 @@ package state +import "go.uber.org/zap/zapcore" + // StorageCoordinates gives the coordinates of a cell in a form optimized for storage. type StorageCoordinates struct { // Line is the index from 0 to Lines - 1 of the HexLine in the HexLayer. @@ -7,3 +9,9 @@ type StorageCoordinates struct { // Cell is the index from 0 to CellsPerLine - 1 of the HexCell in the HexLine. Cell uint8 `json:"cell"` } + +func (s StorageCoordinates) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddUint8("line", s.Line) + encoder.AddUint8("cell", s.Cell) + return nil +} diff --git a/server/state/hexmap.go b/server/state/hexmap.go index b9282cb..24e7291 100644 --- a/server/state/hexmap.go +++ b/server/state/hexmap.go @@ -3,6 +3,7 @@ package state import ( "fmt" "github.com/rs/xid" + "go.uber.org/zap/zapcore" ) // HexMapRepresentation combines HexOrientation and LineParity to represent a map's display mode. @@ -11,15 +12,37 @@ type HexMapRepresentation struct { IndentedLines LineParity `json:"indentedLines"` } +func (h HexMapRepresentation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("orientation", h.Orientation.String()) + encoder.AddString("indentedLines", h.IndentedLines.String()) + return nil +} + // HexCell contains data for a single cell of the map. type HexCell struct { // Color contains the color of the cell, in hex notation. Color HexColor `json:"color"` } +func (h HexCell) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("color", h.Color.String()) + return nil +} + // HexLine is a line of cells which are adjacent by flat sides in a vertical or horizontal direction. type HexLine []HexCell +func (l HexLine) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + var finalErr error + for _, cell := range l { + err := encoder.AppendObject(cell) + if err != nil && finalErr == nil { + finalErr = err + } + } + return finalErr +} + // Copy creates a deep copy of this HexLine. func (l HexLine) Copy() HexLine { duplicate := make(HexLine, len(l)) @@ -32,7 +55,19 @@ func (l HexLine) Copy() HexLine { // HexLayer is a two-dimensional plane of cells which are arranged into lines. type HexLayer []HexLine +func (l HexLayer) MarshalLogArray(encoder zapcore.ArrayEncoder) error { + var finalErr error + for _, line := range l { + err := encoder.AppendArray(line) + if err != nil && finalErr == nil { + finalErr = err + } + } + return finalErr +} + // GetCellAt returns a reference to the cell at the given coordinates. +// If the coordinates are out of bounds for this map, an error will be returned. func (l HexLayer) GetCellAt(c StorageCoordinates) (*HexCell, error) { if int(c.Line) > len(l) { return nil, fmt.Errorf("line %d out of bounds (%d)", c.Line, len(l)) @@ -55,8 +90,8 @@ func (l HexLayer) Copy() HexLayer { // HexMap contains the data for a map instance. type HexMap struct { - // Xid is the unique ID of the HexMap, used to encourage clients not to blindly interact with a different map. - Xid xid.ID `json:"xid"` + // XID is the unique id of the HexMap, used to encourage clients not to blindly interact with a different map. + XID xid.ID `json:"xid"` // Lines is the rough number of rows (in PointyTop orientation) or columns (in FlatTop orientation) in the map. // Because different lines will be staggered, it's somewhat hard to see. Lines uint8 `json:"lines"` @@ -71,10 +106,23 @@ type HexMap struct { LineCells HexLayer `json:"lineCells"` } +func (m HexMap) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("id", m.XID.String()) + encoder.AddUint8("lines", m.Lines) + encoder.AddUint8("cellsPerLine", m.CellsPerLine) + displayModeErr := encoder.AddObject("displayMode", m.DisplayMode) + lineCellsErr := encoder.AddArray("lineCells", m.LineCells) + if displayModeErr != nil { + return displayModeErr + } else { + return lineCellsErr + } +} + // Copy creates a deep copy of this HexMap. func (m HexMap) Copy() HexMap { return HexMap{ - Xid: m.Xid, + XID: m.XID, Lines: m.Lines, CellsPerLine: m.CellsPerLine, DisplayMode: m.DisplayMode, diff --git a/server/state/synced.go b/server/state/synced.go index 5a98c02..cf38641 100644 --- a/server/state/synced.go +++ b/server/state/synced.go @@ -1,11 +1,25 @@ package state +import ( + "go.uber.org/zap/zapcore" +) + // Synced contains all state that is synced between the server and its clients. type Synced struct { Map HexMap `json:"map"` User UserData `json:"user"` } +func (s Synced) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + mapErr := encoder.AddObject("map", s.Map) + userErr := encoder.AddObject("user", s.User) + if mapErr != nil { + return mapErr + } else { + return userErr + } +} + // Copy creates a deep copy of this Synced instance. func (s Synced) Copy() Synced { return Synced{ diff --git a/server/state/user.go b/server/state/user.go index 9d585c4..8724b6a 100644 --- a/server/state/user.go +++ b/server/state/user.go @@ -1,9 +1,16 @@ package state +import "go.uber.org/zap/zapcore" + // UserData contains data about clients that is synced between client and server. // Unlike the map, UserData is not persisted to disk, and all UserData is lost on shutdown. type UserData struct { - ActiveColor HexColor `json:"active_color"` + ActiveColor HexColor `json:"activeColor"` +} + +func (u UserData) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddString("activeColor", u.ActiveColor.String()) + return nil } // Copy creates a deep copy of this UserData.