Finish implementing the room actor and its client

main
Mari 3 years ago
parent abaeb8d609
commit ef9e4c9da1
  1. 34
      server/action/client.go
  2. 36
      server/action/map.go
  3. 42
      server/action/server.go
  4. 12
      server/action/syncable.go
  5. 34
      server/action/user.go
  6. 19
      server/actions/client/client.go
  7. 1
      server/actions/server/server.go
  8. 5
      server/go.mod
  9. 48
      server/go.sum
  10. 273
      server/room/actor.go
  11. 165
      server/room/client.go
  12. 124
      server/room/clientmessage.go
  13. 147
      server/room/message.go
  14. 54
      server/room/room.go
  15. 8
      server/state/coordinates.go
  16. 54
      server/state/hexmap.go
  17. 14
      server/state/synced.go
  18. 9
      server/state/user.go

@ -0,0 +1,34 @@
package action
import (
"go.uber.org/zap/zapcore"
)
// ClientHello is the action sent by the client when it first establishes the connection.
type ClientHello struct {
// Version is the protocol version the client is running.
Version int `json:"version"`
}
// ClientRefresh is the action sent by the client when it needs the full state re-sent.
type ClientRefresh struct {
}
// IDed contains a pair of ID and Action, as sent by the server.
type IDed struct {
// ID contains the arbitrary ID that was sent by the client, for identifying the action in future messages.
ID int `json:"id"`
// Action contains the action that was actually being sent.
Action Syncable `json:"action"`
}
func (i IDed) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddInt("id", i.ID)
return encoder.AddObject("action", i.Action)
}
// ClientSent is an action sent in order to deliver one or more Syncable actions to the server.
type ClientSent struct {
// Actions contains the actions the client wants to apply, in the order they should be applied.
Actions []IDed `json:"nested"`
}

@ -0,0 +1,36 @@
package action
import (
"go.uber.org/zap/zapcore"
"hexmap-server/state"
)
// CellColor is the action sent when a cell of the map has been colored a different color.
type CellColor struct {
// At is the location of the cell in storage coordinates.
At state.StorageCoordinates `json:"at"`
// Color is the color the cell has been changed to.
Color state.HexColor `json:"color"`
}
func (c CellColor) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
err := encoder.AddObject("at", c.At)
encoder.AddString("color", c.Color.String())
return err
}
// Apply sets the target cell's color, or returns ErrNoOp if it can't.
func (c CellColor) Apply(s *state.Synced) error {
if c.Color.A < 0xF {
return ErrNoTransparentColors
}
cell, err := s.Map.LineCells.GetCellAt(c.At)
if err != nil {
return err
}
if cell.Color == c.Color {
return ErrNoOp
}
cell.Color = c.Color
return nil
}

@ -0,0 +1,42 @@
package action
import "hexmap-server/state"
// ServerHello is the action sent to establish the current state of the server when a new client connects.
type ServerHello struct {
// Version is the protocol version the server is running.
Version int `json:"version"`
// State is the complete state of the server as of when the client joined.
State state.Synced `json:"state"`
}
// ServerRefresh is the action sent to reestablish the current state of the server in response to ClientRefresh.
type ServerRefresh struct {
// State is the complete state of the server as of when the corresponding ClientRefresh was processed.
State state.Synced `json:"state"`
}
// ServerOK is the action sent when one or more client actions have been accepted and applied.
type ServerOK struct {
// IDs contains the IDs of the actions which were accepted and applied, in the order they were accepted and applied.
// This is the same as the order they were received, though other actions may have been between these that were
// rejected.
IDs []int `json:"ids"`
}
// ServerFailed is the action sent when one or more client actions have been rejected.
type ServerFailed struct {
// IDs contains the IDs of the actions which were rejected, in the order they were rejected.
// This is the same as the order they were received, though other actions may have been between these that were
// accepted and applied.
IDs []int `json:"ids"`
// Error contains the error text sent from the server about why these actions failed.
Error string `json:"error"`
}
// ServerSent is the action sent when one or more client actions from other clients have been accepted and applied.
// The client's own actions will never be included in this action.
type ServerSent struct {
// Actions contains the actions that are now being applied.
Actions []Syncable `json:"actions"`
}

@ -1,18 +1,20 @@
package syncable package action
import ( import (
"errors" "errors"
"go.uber.org/zap/zapcore"
"hexmap-server/state" "hexmap-server/state"
) )
var ErrorNoOp error = errors.New("action's effects were already applied, or it's an empty action") var ErrNoOp error = errors.New("action's effects were already applied, or it's an empty action")
// Action is the interface for actions that can be shared. // Syncable is the interface for action that can be shared.
type Action interface { type Syncable interface {
zapcore.ObjectMarshaler
// Apply causes the action's effects to be applied to s, mutating it in place. // Apply causes the action's effects to be applied to s, mutating it in place.
// All syncable.Actions must conform to the standard that if an action can't be correctly applied, or if it would // All syncable.Actions must conform to the standard that if an action can't be correctly applied, or if it would
// have no effect, it returns an error without changing s. // have no effect, it returns an error without changing s.
// If an action can be correctly applied but would have no effect, it should return ErrorNoOp. // If an action can be correctly applied but would have no effect, it should return ErrNoOp.
// If an action is correctly applied and has an effect, it should return nil. // If an action is correctly applied and has an effect, it should return nil.
Apply(s *state.Synced) error Apply(s *state.Synced) error
} }

@ -0,0 +1,34 @@
package action
import (
"errors"
"go.uber.org/zap/zapcore"
"hexmap-server/state"
)
// ErrNoTransparentColors is returned when a user tries to set their active color or a cell color to transparent.
// Transparent here is defined as having an alpha component of less than 15 (0xF).
var ErrNoTransparentColors error = errors.New("transparent colors not allowed")
// UserActiveColor is the action sent when the user's current color, the one being painted with, changes.
type UserActiveColor struct {
// Color is the color that is now active.
Color state.HexColor `json:"color"`
}
func (c UserActiveColor) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("color", c.Color.String())
return nil
}
// Apply sets the user's active color, or returns ErrNoOp if it can't.
func (c UserActiveColor) Apply(s *state.Synced) error {
if c.Color.A < 0xF {
return ErrNoTransparentColors
}
if s.User.ActiveColor == c.Color {
return ErrNoOp
}
s.User.ActiveColor = c.Color
return nil
}

@ -1,19 +0,0 @@
package client
import "hexmap-server/actions/syncable"
type Hello struct {
Version int `json:"version"`
}
type Refresh struct {
}
type SentAction struct {
Id int `json:"id"`
Action syncable.Action `json:"action"`
}
type SentActions struct {
Nested []SentAction `json:"nested"`
}

@ -2,4 +2,7 @@ module hexmap-server
go 1.16 go 1.16
require github.com/rs/xid v1.3.0 // indirect require (
github.com/rs/xid v1.3.0
go.uber.org/zap v1.18.1
)

@ -1,2 +1,50 @@
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4= github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/zap v1.18.1 h1:CSUJ2mjFszzEWt4CdKISEuChVIXGBn3lAPwkRGyVrc4=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

@ -0,0 +1,273 @@
package room
import (
"github.com/rs/xid"
"go.uber.org/zap"
"hexmap-server/action"
"hexmap-server/state"
)
// act is the meat and potatoes of the room - it's responsible for actually running the room.
// It will gracefully remove all clients before shutting down.
func (r *room) act() {
defer r.gracefulShutdown()
r.logger.Info("Room starting up, listening for incoming actions")
for {
raw := <-r.incomingChannel
client := raw.ClientID()
msgLogger := r.logger.With(zap.Stringer("client", client))
msgLogger.Debug("Message received, handling", zap.Object("message", raw))
switch msg := raw.(type) {
case JoinRequest:
r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel)
r.acknowledgeJoin(msg.id, msg.wantCurrentState)
case RefreshRequest:
r.sendRefresh(msg.id)
case ApplyRequest:
msgLogger.Debug("Received action to apply from client", zap.Int("actionId", msg.action.ID))
result := r.applyAction(msg.action.Action)
if result != nil {
r.broadcastAction(client, msg.action.ID, msg.action.Action)
}
r.acknowledgeAction(client, msg.action.ID, result)
case LeaveRequest:
// So long, then. We can close immediately here; they promised not to send any more messages after this
// unless we were shutting down, which we're not.
r.acknowledgeLeave(client)
r.closeClient(client)
case StopRequest:
// As requested, we shut down. Our deferred gracefulShutdown will catch us as we fall.
msgLogger.Info("Received StopRequest from client, shutting down")
return
case ShutdownResponse:
// Uh... thank... you. I'm not... Never mind. I guess this means you're leaving?
msgLogger.Error("Received unexpected ShutdownResponse from client while not shutting down")
r.closeClient(client)
default:
msgLogger.Warn("Ignoring unhandled message", zap.Object("message", msg))
}
msgLogger.Debug("Message handled, resuming listening")
}
}
// addClient records a client's presence in the client map.
func (r *room) addClient(id xid.ID, returnChannel chan<- Message, broadcast bool, privateChannel bool) {
logger := r.logger.With(zap.Stringer("client", id))
logger.Debug("Adding client")
if client, ok := r.clients[id]; ok {
if client.outgoingChannel == returnChannel {
if broadcast == client.broadcast && privateChannel == client.privateChannel {
logger.Warn("Already have client when adding client")
} else {
logger.Error("Already have client but with different settings when adding client")
}
} else {
logger.Error("Already have a different client with the same id when adding client")
}
return
}
r.clients[id] = internalClient{
id: id,
outgoingChannel: returnChannel,
broadcast: broadcast,
privateChannel: privateChannel,
}
}
// stateCopy creates and returns a fresh copy of the current state.
// This avoids concurrent modification of the state while clients are reading it.
func (r *room) stateCopy() *state.Synced {
s := r.currentState.Copy()
return &s
}
// acknowledgeJoin composes and sends a JoinResponse to the given client.
func (r *room) acknowledgeJoin(id xid.ID, includeState bool) {
logger := r.logger.With(zap.Stringer("client", id))
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when acknowledging join")
return
}
var s *state.Synced = nil
if includeState {
logger.Debug("Preparing state copy for client")
s = r.stateCopy()
}
logger.Debug("Sending JoinResponse to client")
client.outgoingChannel <- JoinResponse{
id: r.id,
currentState: s,
}
}
// sendRefresh composes and sends a RefreshResponse to the given client.
func (r *room) sendRefresh(id xid.ID) {
logger := r.logger.With(zap.Stringer("client", id))
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when sending refresh")
return
}
var s *state.Synced = nil
logger.Debug("Preparing state copy for client")
s = r.stateCopy()
logger.Debug("Sending RefreshResponse to client")
client.outgoingChannel <- RefreshResponse{
id: r.id,
currentState: s,
}
}
// applyAction applies an action to the state and returns the result of it.
func (r *room) applyAction(action action.Syncable) error {
r.logger.Debug("Applying action", zap.Object("action", action))
return action.Apply(&r.currentState)
}
// broadcastAction sends an action to everyone other than the original client which requested it.
func (r *room) broadcastAction(originalClientID xid.ID, originalActionID int, action action.Syncable) {
logger := r.logger.With(zap.Stringer("originalClient", originalClientID), zap.Int("actionID", originalActionID), zap.Object("action", action))
broadcast := ActionBroadcast{
id: r.id,
originalClientID: originalClientID,
originalActionID: originalActionID,
action: action,
}
logger.Debug("Broadcasting action to all clients")
for id, client := range r.clients {
if id.Compare(originalClientID) != 0 {
logger.Debug("Sending ActionBroadcast to client", zap.Stringer("client", id))
client.outgoingChannel <- broadcast
}
}
}
// acknowledgeAction sends a response to the original client which requested an action.
func (r *room) acknowledgeAction(id xid.ID, actionID int, error error) {
logger := r.logger.With(zap.Stringer("id", id), zap.Int("actionId", actionID), zap.Error(error))
logger.Debug("Responding to client with the status of its action")
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when acknowledging action")
return
}
logger.Debug("Sending ApplyResponse to client")
client.outgoingChannel <- ApplyResponse{
id: id,
actionID: actionID,
result: error,
}
}
// acknowledgeLeave causes the room to signal to the client that it has received and acknowledged the client's LeaveRequest,
// and will not send any further messages.
func (r *room) acknowledgeLeave(id xid.ID) {
logger := r.logger.With(zap.Stringer("client", id))
logger.Debug("Acknowledging client's leave request")
client, ok := r.clients[id]
if !ok {
logger.Error("No such client when acknowledging leave request")
return
}
logger.Debug("Sending LeaveResponse to client")
client.outgoingChannel <- LeaveResponse{id: r.id}
}
// closeClient causes the room to remove the client with the given id from its clients.
// This should only be used after a shutdown handshake between this client and the room has taken place.
func (r *room) closeClient(id xid.ID) {
logger := r.logger.With(zap.Stringer("client", id))
logger.Debug("Closing client")
client, ok := r.clients[id]
if !ok {
logger.Error("Attempted to close a client that didn't exist")
return
}
if client.privateChannel {
logger.Debug("Closing outgoingChannel, as client has a privateChannel")
close(client.outgoingChannel)
}
delete(r.clients, id)
}
// gracefulShutdown causes the room to shut down cleanly, making sure that all clients have been removed.
func (r *room) gracefulShutdown() {
defer r.finalShutdown()
if len(r.clients) == 0 {
// Nothing to do, we've already won.
r.logger.Debug("No remaining clients, so just shutting down")
return
}
r.requestShutdown()
for len(r.clients) > 0 {
raw := <-r.incomingChannel
client := raw.ClientID()
msgLogger := r.logger.With(zap.Stringer("client", client))
msgLogger.Debug("Post-shutdown message received, handling", zap.Object("message", raw))
switch msg := raw.(type) {
case JoinRequest:
// Don't you hate it when someone comes to the desk right as you're getting ready to pack up?
// Can't ignore them - they have our channel, and they might be sending things to it. We have to add them
// and then immediately send them a ShutdownRequest and wait for them to answer it.
msgLogger.Debug("Received join request from client while shutting down")
r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel)
r.requestShutdownFrom(client)
case RefreshRequest:
// Ugh, seriously, now? Fine. You can have this - you might be our friend the persistence actor.
r.sendRefresh(client)
case LeaveRequest:
// We sent them a shutdown already, so unfortunately, we can't close them immediately. We have to wait for
// them to tell us they've heard that we're shutting down.
msgLogger.Debug("Received leave request from client while shutting down")
r.acknowledgeLeave(client)
case StopRequest:
// Yes. We're doing that. Check your inbox, I already sent you the shutdown.
msgLogger.Debug("Received stop request from client while shutting down")
case ShutdownResponse:
// The only way we would be getting one of these is if the client knows it doesn't have to send us anything
// else. Therefore, we can remove them now.
// Similarly, we know that they'll receive the LeaveResponse they need and shut down.
// Like us, even if it sent a LeaveRequest before realizing we were shutting down, it would have gotten our
// ShutdownRequest and sent this before it could read the LeaveResponse, but it will wait for the
// LeaveResponse regardless.
msgLogger.Debug("Received shutdown confirmation from client")
r.closeClient(client)
default:
msgLogger.Debug("Ignoring irrelevant message from client while shutting down", zap.Object("message", raw))
}
msgLogger.Debug("Message handled, resuming listening and waiting to be safe to shut down", zap.Int("clientsLeft", len(r.clients)))
}
r.logger.Debug("All clients have acknowledged the ShutdownRequest, now shutting down")
}
// requestShutdown produces a ShutdownRequest and sends it to all clients to indicate that the room is shutting down.
func (r *room) requestShutdown() {
r.logger.Debug("Alerting clients that shutdown is in progress", zap.Int("clientsLeft", len(r.clients)))
for id := range r.clients {
r.requestShutdownFrom(id)
}
}
// requestShutdownFrom produces a ShutdownRequest and sends it to the client with id to indicate that the room is
// shutting down.
func (r *room) requestShutdownFrom(id xid.ID) {
clientField := zap.Stringer("client", id)
r.logger.Debug("Alerting client that shutdown is in progress", clientField)
shutdown := ShutdownRequest{
id: r.id,
}
client, ok := r.clients[id]
if !ok {
r.logger.Error("No such client when requesting shutdown from client", clientField)
}
client.outgoingChannel <- shutdown
}
// finalShutdown causes the room to do any final cleanup not involving its clients before stopping.
// Use gracefulShutdown instead, which calls this once it's safe to do so.
func (r *room) finalShutdown() {
r.logger.Debug("Closing incoming channel")
close(r.incomingChannel)
r.logger.Info("Shut down")
}

@ -0,0 +1,165 @@
package room
import (
"github.com/rs/xid"
"go.uber.org/zap"
)
// internalClient is used by the room itself to track information about a client.
type internalClient struct {
// id is the id that the client identifies itself with in all clientMessage instances it sends.
id xid.ID
// outgoingChannel is a channel that the room can send messages to the client on.
outgoingChannel chan<- Message
// privateChannel is true iff the room can close the outgoingChannel when the client and room have completed their
// close handshake.
privateChannel bool
// broadcast is true iff the client requested to be included on broadcasts on creation.
broadcast bool
}
type NewClientOptions struct {
// IncomingChannel is the channel to use as the room's channel to send messages to - the new Client's IncomingChannel.
// If this is non-nil, the room will not automatically close the IncomingChannel after a shutdown is negotiated.
// If this is nil, a new channel will be allocated on join and closed on shutdown.
IncomingChannel chan Message
// If AcceptBroadcasts is true, the room will send all broadcasts originating from other clients to this client.
AcceptBroadcasts bool
// If RequestStartingState is true, the room will send a copy of the current state as of when the JoinRequest was
// received in the JoinResponse that will be the first message the Client receives.
RequestStartingState bool
// If sets,
Logger *zap.Logger
}
// Client is the structure used by clients external to the Room package to communicate with the Room.
// It is not expected to be parallel-safe; to run it in parallel, use the NewClient method and send the new client to
// the new goroutine.
type Client struct {
// id is the ClientID used by the client for all communications.
id xid.ID
// roomId is the unique ID of the room (not its map).
roomId xid.ID
// incomingChannel is the channel that this client receives messages on.
incomingChannel <-chan Message
// outgoingChannel is the channel that this client sends messages on.
// Becomes nil if the client has been completely shut down.
outgoingChannel chan<- ClientMessage
// Once Leave or AcknowledgeShutdown have been triggered, this flag is set, preventing use of other messages.
shuttingDown bool
}
// ID is the ID used by this client to identify itself to the Room.
func (c *Client) ID() xid.ID {
return c.id
}
// RoomID is the ID used by the room to differentiate itself from other rooms.
// It is not the map's internal ID.
func (c *Client) RoomID() xid.ID {
return c.id
}
// IncomingChannel is the channel the client can listen on for messages from the room.
func (c *Client) IncomingChannel() <-chan Message {
return c.incomingChannel
}
// OutgoingChannel is the channel the client can send messages to the room on.
func (c *Client) OutgoingChannel() chan<- ClientMessage {
if c.outgoingChannel == nil {
panic("Already finished shutting down; no new messages should be sent")
}
return c.outgoingChannel
}
// newClientForRoom uses the necessary parameters to create and join a Client for the given room.
func newClientForRoom(roomId xid.ID, outgoingChannel chan<- ClientMessage, opts NewClientOptions) *Client {
var privateChannel bool
var incomingChannel chan Message
if opts.IncomingChannel != nil {
incomingChannel = opts.IncomingChannel
privateChannel = false
} else {
incomingChannel = make(chan Message, 1)
privateChannel = true
}
result := Client{
id: xid.New(),
roomId: roomId,
incomingChannel: incomingChannel,
outgoingChannel: outgoingChannel,
shuttingDown: false,
}
result.outgoingChannel <- JoinRequest{
id: result.id,
returnChannel: incomingChannel,
privateChannel: privateChannel,
broadcast: opts.AcceptBroadcasts,
wantCurrentState: opts.RequestStartingState,
}
return &result
}
// NewClient creates a new client belonging to the same room as this client with a random ID.
// The new client will be automatically joined to the channel.
func (c *Client) NewClient(opts NewClientOptions) *Client {
if c.shuttingDown {
panic("Already started shutting down; no new messages should be sent")
}
return newClientForRoom(c.roomId, c.outgoingChannel, opts)
}
// The message created by Refresh causes the client to request a fresh copy of the state.
func (c *Client) Refresh() RefreshRequest {
if c.shuttingDown {
panic("Already started shutting down; no new messages should be sent")
}
return RefreshRequest{
id: c.id,
}
}
// The message created by Leave causes the local client to signal that it is shutting down.
// It is important to Leave to avoid dangling clients having messages sent to nothing.
// After sending Leave, the client must confirm that it has been removed by waiting for a LeaveResponse, accompanied by
// the closing of the Client's IncomingChannel if it was a private channel.
// No further messages should be sent after Leave except AcknowledgeShutdown if Leave and requestShutdown crossed paths in midair.
func (c *Client) Leave() LeaveRequest {
if c.shuttingDown {
panic("Already started shutting down; no new messages should be sent")
}
c.shuttingDown = true
return LeaveRequest{
id: c.id,
}
}
// The message created by Stop causes the local client to signal that it is shutting down.
// It is important to Stop when the room needs to be shut down.
// After sending Stop, the client must confirm that it has been removed by waiting for a ShutdownRequest, which should
// be handled normally.
// No further messages should be sent after Stop except AcknowledgeShutdown.
func (c *Client) Stop() StopRequest {
if c.shuttingDown {
panic("Already started shutting down; no new messages should be sent")
}
c.shuttingDown = true
return StopRequest{
id: c.id,
}
}
// AcknowledgeShutdown causes the local client to signal that it has acknowledged that the room is shutting down.
// No further messages can be sent after AcknowledgeShutdown; attempting to do so will block forever, as the
// OutgoingChannel has become nil.
func (c *Client) AcknowledgeShutdown() ShutdownResponse {
if c.outgoingChannel == nil {
panic("Already finished shutting down; no new messages should be sent")
}
c.shuttingDown = true
c.outgoingChannel = nil
return ShutdownResponse{
id: c.id,
}
}

@ -0,0 +1,124 @@
package room
import (
"github.com/rs/xid"
"go.uber.org/zap/zapcore"
"hexmap-server/action"
)
// ClientMessage marks messages coming from clients to the room.
type ClientMessage interface {
zapcore.ObjectMarshaler
// SourceID is the id of the client sending the message.
ClientID() xid.ID
}
// JoinRequest is the message sent on the room's IncomingChannel by a new client joining the room.
type JoinRequest struct {
// id is the SourceID the client will use to identify itself in future messages.
id xid.ID
// returnChannel is a buffered channel the client is ready to receive messages from the room on.
// This becomes the Client's OutgoingChannel.
returnChannel chan<- Message
// privateChannel is true iff the room can close returnChannel after completing a shutdown handshake.
// This permits extra safety by causing channels that somehow leak into other contexts to become noticeable by
// causing panics.
privateChannel bool
// broadcast is true iff the room should send action.Syncable from other clients to this one.
broadcast bool
// wantCurrentState indicates that the client would like the room to include a copy of the current state of the room
// when it joins.
wantCurrentState bool
}
func (j JoinRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "JoinRequest")
encoder.AddString("id", j.id.String())
encoder.AddBool("broadcast", j.broadcast)
encoder.AddBool("wantCurrentState", j.wantCurrentState)
return nil
}
func (j JoinRequest) ClientID() xid.ID {
return j.id
}
// RefreshRequest is the message sent on the room's IncomingChannel by a client which needs the current value.
type RefreshRequest struct {
id xid.ID
}
func (r RefreshRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "RefreshRequest")
encoder.AddString("id", r.id.String())
return nil
}
func (r RefreshRequest) ClientID() xid.ID {
return r.id
}
// ApplyRequest is the message sent on the room's IncomingChannel by a client which has received an action from the
// websocket.
type ApplyRequest struct {
id xid.ID
action action.IDed
}
func (f ApplyRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "ApplyRequest")
encoder.AddString("id", f.id.String())
return encoder.AddObject("action", f.action)
}
func (f ApplyRequest) ClientID() xid.ID {
return f.id
}
// LeaveRequest is the message sent on the room's IncomingChannel by a client which is shutting down.
// The client is indicating that it will send no messages except a possible ShutdownResponse, in the event that a
// LeaveRequest and a ShutdownRequest cross paths midflight.
type LeaveRequest struct {
id xid.ID
}
func (l LeaveRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "LeaveRequest")
encoder.AddString("id", l.id.String())
return nil
}
func (l LeaveRequest) ClientID() xid.ID {
return l.id
}
// StopRequest is the message sent on the room's IncomingChannel by a client which wants to make the room shut down.
// The response to a StopRequest is a ShutdownRequest, which should be handled as normal.
type StopRequest struct {
id xid.ID
}
func (s StopRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "StopRequest")
encoder.AddString("id", s.id.String())
return nil
}
func (s StopRequest) ClientID() xid.ID {
return s.id
}
// ShutdownResponse is the message sent on the room's IncomingChannel by a client which has accepted the room's ShutdownRequest.
type ShutdownResponse struct {
id xid.ID
}
func (s ShutdownResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "ShutdownResponse")
encoder.AddString("id", s.id.String())
return nil
}
func (s ShutdownResponse) ClientID() xid.ID {
return s.id
}

@ -0,0 +1,147 @@
package room
import (
"github.com/rs/xid"
"go.uber.org/zap/zapcore"
"hexmap-server/action"
"hexmap-server/state"
)
// Message marks messages going to clients from the room.
type Message interface {
zapcore.ObjectMarshaler
// RoomID marks the ID of the room this Message originated from, in case the Client has a shared IncomingChannel.
RoomID() xid.ID
}
// JoinResponse is the message sent by the room on a new client's IncomingChannel after it joins.
type JoinResponse struct {
id xid.ID
// currentState is a pointer to a copy of the room's current state.
// If the client refused the room state in its join message, this will be a nil pointer instead.
currentState *state.Synced
}
func (j JoinResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "JoinResponse")
encoder.AddString("id", j.id.String())
return encoder.AddObject("currentState", j.currentState)
}
func (j JoinResponse) RoomID() xid.ID {
return j.id
}
// CurrentState returns the state of the room as of when the JoinRequest was processed.
func (j JoinResponse) CurrentState() *state.Synced {
return j.currentState
}
// RefreshResponse is the message sent by the room after a client requests it, or immediately on join.
type RefreshResponse struct {
id xid.ID
// currentState is a pointer to a copy of the room's current state.
currentState *state.Synced
}
func (r RefreshResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "JoinResponse")
encoder.AddString("id", r.id.String())
return encoder.AddObject("currentState", r.currentState)
}
func (r RefreshResponse) RoomID() xid.ID {
return r.id
}
// CurrentState returns the state of the room as of when the RefreshRequest was processed.
func (r RefreshResponse) CurrentState() *state.Synced {
return r.currentState
}
// ApplyResponse returns the result of an action to _only_ the one that sent the ApplyRequest.
type ApplyResponse struct {
id xid.ID
// actionID is the ID of the action that completed or failed.
actionID int
// result is nil if the action was completed, or an error if it failed.
result error
}
func (a ApplyResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "ApplyResponse")
encoder.AddString("id", a.id.String())
encoder.AddInt("actionId", a.actionID)
encoder.AddBool("success", a.result == nil)
if a.result != nil {
encoder.AddString("failure", a.result.Error())
}
return nil
}
func (a ApplyResponse) RoomID() xid.ID {
return a.id
}
// Success returns true if the action succeeded, false if it failed.
func (a ApplyResponse) Success() bool {
return a.result == nil
}
// Failure returns the error if the action failed, or nil if it succeeded.
func (a ApplyResponse) Failure() error {
return a.result
}
// ActionBroadcast is sent to all clients _other_ than the one that sent the ApplyRequest when an action succeeds.
type ActionBroadcast struct {
id xid.ID
// originalClientID is the client that sent the action in the first place.
originalClientID xid.ID
// originalActionID is the ID that the client that sent the action sent.
originalActionID int
// action is the action that succeeded.
action action.Syncable
}
func (a ActionBroadcast) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "ActionBroadcast")
encoder.AddString("id", a.id.String())
encoder.AddString("originalClientId", a.originalClientID.String())
encoder.AddInt("originalActionId", a.originalActionID)
return encoder.AddObject("action", a.action)
}
func (a ActionBroadcast) RoomID() xid.ID {
return a.id
}
// LeaveResponse is the message sent by the room when it has accepted that a client has left, and will send it no further messages.
type LeaveResponse struct {
id xid.ID
}
func (l LeaveResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "LeaveResponse")
encoder.AddString("id", l.id.String())
return nil
}
func (l LeaveResponse) RoomID() xid.ID {
return l.id
}
// ShutdownRequest is the message sent by the room when something causes it to shut down. It will send the client no further messages except a possible LeaveResponse.
type ShutdownRequest struct {
id xid.ID
}
func (s ShutdownRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("type", "ShutdownRequest")
encoder.AddString("id", s.id.String())
return nil
}
func (s ShutdownRequest) RoomID() xid.ID {
return s.id
}

@ -0,0 +1,54 @@
package room
import (
"github.com/rs/xid"
"go.uber.org/zap"
"hexmap-server/state"
)
// NewOptions is the set of information used to control what a room starts with.
type NewOptions struct {
// BaseLogger is the logger that the room should attach its data to.
BaseLogger *zap.Logger
// StartingState is a state.Synced that defines the state of the room on creation.
StartingState state.Synced
// StartingClientOptions sets the configuration of the first client to be created, the one that will be returned
// from New.
StartingClientOptions NewClientOptions
}
// room is a room as seen from within - the information needed for the room process to do its duties.
type room struct {
// id is the room's internal ID - not to be confused with the ID of the map it is serving.
id xid.ID
// incomingChannel is the channel the room uses to receive messages. The room itself owns this channel,
// so when the clients map is empty, it can be closed.
incomingChannel chan ClientMessage
// clients contains the map of active clients, each of which is known to have a reference to this room.
clients map[xid.ID]internalClient
// currentState contains the active state being used by actions right now.
currentState state.Synced
// logger is the logger that this room will use. It contains context fields for the room's important fields.
logger *zap.Logger
}
// Creates and starts up a new room, joins a new client to it and returns that client.
func New(opts NewOptions) *Client {
logger := opts.BaseLogger.Named("Room")
id := xid.New()
r := room{
id: id,
incomingChannel: make(chan ClientMessage),
clients: make(map[xid.ID]internalClient),
currentState: opts.StartingState,
logger: logger,
}
go r.act()
return r.newClient(opts.StartingClientOptions)
}
// newClient creates a new client belonging to this room with a random ID.
// The new client will be automatically joined to the channel.
func (r *room) newClient(opts NewClientOptions) *Client {
return newClientForRoom(r.id, r.incomingChannel, opts)
}

@ -1,5 +1,7 @@
package state package state
import "go.uber.org/zap/zapcore"
// StorageCoordinates gives the coordinates of a cell in a form optimized for storage. // StorageCoordinates gives the coordinates of a cell in a form optimized for storage.
type StorageCoordinates struct { type StorageCoordinates struct {
// Line is the index from 0 to Lines - 1 of the HexLine in the HexLayer. // Line is the index from 0 to Lines - 1 of the HexLine in the HexLayer.
@ -7,3 +9,9 @@ type StorageCoordinates struct {
// Cell is the index from 0 to CellsPerLine - 1 of the HexCell in the HexLine. // Cell is the index from 0 to CellsPerLine - 1 of the HexCell in the HexLine.
Cell uint8 `json:"cell"` Cell uint8 `json:"cell"`
} }
func (s StorageCoordinates) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddUint8("line", s.Line)
encoder.AddUint8("cell", s.Cell)
return nil
}

@ -3,6 +3,7 @@ package state
import ( import (
"fmt" "fmt"
"github.com/rs/xid" "github.com/rs/xid"
"go.uber.org/zap/zapcore"
) )
// HexMapRepresentation combines HexOrientation and LineParity to represent a map's display mode. // HexMapRepresentation combines HexOrientation and LineParity to represent a map's display mode.
@ -11,15 +12,37 @@ type HexMapRepresentation struct {
IndentedLines LineParity `json:"indentedLines"` IndentedLines LineParity `json:"indentedLines"`
} }
func (h HexMapRepresentation) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("orientation", h.Orientation.String())
encoder.AddString("indentedLines", h.IndentedLines.String())
return nil
}
// HexCell contains data for a single cell of the map. // HexCell contains data for a single cell of the map.
type HexCell struct { type HexCell struct {
// Color contains the color of the cell, in hex notation. // Color contains the color of the cell, in hex notation.
Color HexColor `json:"color"` Color HexColor `json:"color"`
} }
func (h HexCell) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("color", h.Color.String())
return nil
}
// HexLine is a line of cells which are adjacent by flat sides in a vertical or horizontal direction. // HexLine is a line of cells which are adjacent by flat sides in a vertical or horizontal direction.
type HexLine []HexCell type HexLine []HexCell
func (l HexLine) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
var finalErr error
for _, cell := range l {
err := encoder.AppendObject(cell)
if err != nil && finalErr == nil {
finalErr = err
}
}
return finalErr
}
// Copy creates a deep copy of this HexLine. // Copy creates a deep copy of this HexLine.
func (l HexLine) Copy() HexLine { func (l HexLine) Copy() HexLine {
duplicate := make(HexLine, len(l)) duplicate := make(HexLine, len(l))
@ -32,7 +55,19 @@ func (l HexLine) Copy() HexLine {
// HexLayer is a two-dimensional plane of cells which are arranged into lines. // HexLayer is a two-dimensional plane of cells which are arranged into lines.
type HexLayer []HexLine type HexLayer []HexLine
func (l HexLayer) MarshalLogArray(encoder zapcore.ArrayEncoder) error {
var finalErr error
for _, line := range l {
err := encoder.AppendArray(line)
if err != nil && finalErr == nil {
finalErr = err
}
}
return finalErr
}
// GetCellAt returns a reference to the cell at the given coordinates. // GetCellAt returns a reference to the cell at the given coordinates.
// If the coordinates are out of bounds for this map, an error will be returned.
func (l HexLayer) GetCellAt(c StorageCoordinates) (*HexCell, error) { func (l HexLayer) GetCellAt(c StorageCoordinates) (*HexCell, error) {
if int(c.Line) > len(l) { if int(c.Line) > len(l) {
return nil, fmt.Errorf("line %d out of bounds (%d)", c.Line, len(l)) return nil, fmt.Errorf("line %d out of bounds (%d)", c.Line, len(l))
@ -55,8 +90,8 @@ func (l HexLayer) Copy() HexLayer {
// HexMap contains the data for a map instance. // HexMap contains the data for a map instance.
type HexMap struct { type HexMap struct {
// Xid is the unique ID of the HexMap, used to encourage clients not to blindly interact with a different map. // XID is the unique id of the HexMap, used to encourage clients not to blindly interact with a different map.
Xid xid.ID `json:"xid"` XID xid.ID `json:"xid"`
// Lines is the rough number of rows (in PointyTop orientation) or columns (in FlatTop orientation) in the map. // Lines is the rough number of rows (in PointyTop orientation) or columns (in FlatTop orientation) in the map.
// Because different lines will be staggered, it's somewhat hard to see. // Because different lines will be staggered, it's somewhat hard to see.
Lines uint8 `json:"lines"` Lines uint8 `json:"lines"`
@ -71,10 +106,23 @@ type HexMap struct {
LineCells HexLayer `json:"lineCells"` LineCells HexLayer `json:"lineCells"`
} }
func (m HexMap) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("id", m.XID.String())
encoder.AddUint8("lines", m.Lines)
encoder.AddUint8("cellsPerLine", m.CellsPerLine)
displayModeErr := encoder.AddObject("displayMode", m.DisplayMode)
lineCellsErr := encoder.AddArray("lineCells", m.LineCells)
if displayModeErr != nil {
return displayModeErr
} else {
return lineCellsErr
}
}
// Copy creates a deep copy of this HexMap. // Copy creates a deep copy of this HexMap.
func (m HexMap) Copy() HexMap { func (m HexMap) Copy() HexMap {
return HexMap{ return HexMap{
Xid: m.Xid, XID: m.XID,
Lines: m.Lines, Lines: m.Lines,
CellsPerLine: m.CellsPerLine, CellsPerLine: m.CellsPerLine,
DisplayMode: m.DisplayMode, DisplayMode: m.DisplayMode,

@ -1,11 +1,25 @@
package state package state
import (
"go.uber.org/zap/zapcore"
)
// Synced contains all state that is synced between the server and its clients. // Synced contains all state that is synced between the server and its clients.
type Synced struct { type Synced struct {
Map HexMap `json:"map"` Map HexMap `json:"map"`
User UserData `json:"user"` User UserData `json:"user"`
} }
func (s Synced) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
mapErr := encoder.AddObject("map", s.Map)
userErr := encoder.AddObject("user", s.User)
if mapErr != nil {
return mapErr
} else {
return userErr
}
}
// Copy creates a deep copy of this Synced instance. // Copy creates a deep copy of this Synced instance.
func (s Synced) Copy() Synced { func (s Synced) Copy() Synced {
return Synced{ return Synced{

@ -1,9 +1,16 @@
package state package state
import "go.uber.org/zap/zapcore"
// UserData contains data about clients that is synced between client and server. // UserData contains data about clients that is synced between client and server.
// Unlike the map, UserData is not persisted to disk, and all UserData is lost on shutdown. // Unlike the map, UserData is not persisted to disk, and all UserData is lost on shutdown.
type UserData struct { type UserData struct {
ActiveColor HexColor `json:"active_color"` ActiveColor HexColor `json:"activeColor"`
}
func (u UserData) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddString("activeColor", u.ActiveColor.String())
return nil
} }
// Copy creates a deep copy of this UserData. // Copy creates a deep copy of this UserData.

Loading…
Cancel
Save