From ab29ec20beeacb46d308bea4bc690e74669a3a7a Mon Sep 17 00:00:00 2001 From: Mari Date: Sun, 18 Jul 2021 15:05:26 -0400 Subject: [PATCH] Hacky TURBO CODING MODE minimum viable project --- client/src/App.tsx | 4 +- client/src/actions/ServerAction.ts | 4 +- client/src/reducers/ServerReducer.ts | 1 + client/src/state/Coordinates.ts | 14 +- client/src/state/HexMap.ts | 12 +- client/src/ui/HexColorPicker.tsx | 4 +- client/src/websocket/MapFromPb.ts | 4 +- .../src/websocket/WebsocketReactAdapter.tsx | 13 +- client/src/websocket/WebsocketTranslator.ts | 5 +- server/action/action.go | 13 + server/host/HttpServer.go | 193 ++++++++++++ server/room/actor.go | 38 +-- server/room/client.go | 32 +- server/room/clientmessage.go | 30 +- server/room/message.go | 47 +-- server/state/map.go | 35 ++- server/websocket/connection.go | 37 --- server/{websocket => ws}/client.go | 17 +- server/{websocket => ws}/client.pbconv.go | 49 ++-- server/ws/connection.go | 276 ++++++++++++++++++ server/{websocket => ws}/reader.go | 21 +- server/{websocket => ws}/server.go | 2 +- server/{websocket => ws}/server.pbconv.go | 40 +-- server/{websocket => ws}/shared.go | 15 +- server/{websocket => ws}/writer.go | 16 +- 25 files changed, 701 insertions(+), 221 deletions(-) create mode 100644 server/host/HttpServer.go delete mode 100644 server/websocket/connection.go rename server/{websocket => ws}/client.go (80%) rename server/{websocket => ws}/client.pbconv.go (59%) create mode 100644 server/ws/connection.go rename server/{websocket => ws}/reader.go (87%) rename server/{websocket => ws}/server.go (99%) rename server/{websocket => ws}/server.pbconv.go (71%) rename server/{websocket => ws}/shared.go (83%) rename server/{websocket => ws}/writer.go (94%) diff --git a/client/src/App.tsx b/client/src/App.tsx index 963ecc9..aec3eca 100644 --- a/client/src/App.tsx +++ b/client/src/App.tsx @@ -34,11 +34,11 @@ function App() { top: 10, left: 10, size: 50, - displayMode: map.layout + layout: map.layout } : null), [map]) const mapElement = !!offsets && !!map ? : null; const {width, height} = useMemo(() => !!offsets && !!map ? sizeFromLinesAndCells({ - bottomMargin: 10, cells: map.cellsPerLine, lines: map.lines, offsets: offsets, rightMargin: 10 + bottomMargin: 10, cells: map.cellsPerLine, lines: map.lines, offsets: offsets, rightMargin: 10, }) : {width: 1, height: 1}, [map, offsets]); const colorPickerElement = !!user ? dispatch({ type: USER_ACTIVE_COLOR, diff --git a/client/src/actions/ServerAction.ts b/client/src/actions/ServerAction.ts index 3f1a3ed..7d2a117 100644 --- a/client/src/actions/ServerAction.ts +++ b/client/src/actions/ServerAction.ts @@ -75,7 +75,7 @@ export enum SocketState { OPEN = "OPEN", } export const SERVER_SOCKET_STARTUP = "SERVER_SOCKET_STARTUP" -/** Synthesized when the websocket begins connecting, i.e., enters the Connecting or Open states.. */ +/** Synthesized when the ws begins connecting, i.e., enters the Connecting or Open states.. */ export interface ServerSocketStartupAction extends BaseAction { readonly type: typeof SERVER_SOCKET_STARTUP readonly state: SocketState @@ -95,7 +95,7 @@ export function isServerActCommand(action: AppAction): action is ServerActComman } export const SERVER_MALFORMED = "SERVER_MALFORMED" -/** Synthesized when the client can't understand a command the server sent, or when an error event appears on the websocket. */ +/** Synthesized when the client can't understand a command the server sent, or when an error event appears on the ws. */ export interface ServerMalformedAction extends BaseAction { readonly type: typeof SERVER_MALFORMED, readonly error: Error | null, diff --git a/client/src/reducers/ServerReducer.ts b/client/src/reducers/ServerReducer.ts index ea8d32b..eec6323 100644 --- a/client/src/reducers/ServerReducer.ts +++ b/client/src/reducers/ServerReducer.ts @@ -239,6 +239,7 @@ function serverActReducer(oldState: AppState, action: ServerActCommand): AppStat } function serverMalformedReducer(oldState: NetworkState, action: ServerMalformedAction): NetworkState { + console.log("Got serverMalformed", action.error) return clientReducer(oldState, { type: CLIENT_GOODBYE, code: 1002, // protocol error diff --git a/client/src/state/Coordinates.ts b/client/src/state/Coordinates.ts index af12fcd..a00d9a5 100644 --- a/client/src/state/Coordinates.ts +++ b/client/src/state/Coordinates.ts @@ -54,7 +54,7 @@ export interface RenderOffsets { /** How big each hexagon should be. The "radius" from the center to any vertex. */ readonly size: number /** The way the hex map should be displayed. Usually the same as the origin map. */ - readonly displayMode: HexLayout + readonly layout: HexLayout } export interface RenderSize { @@ -66,7 +66,7 @@ export interface RenderSize { const POINTY_AXIS_FACTOR = 2; const FLAT_AXIS_FACTOR = Math.sqrt(3); export function storageCoordinatesToRenderCoordinates({line, cell}: StorageCoordinates, renderOffsets: RenderOffsets): RenderCoordinates { - const {orientation, indentedLines} = renderOffsets.displayMode; + const {orientation, indentedLines} = renderOffsets.layout; const flatLength = renderOffsets.size * FLAT_AXIS_FACTOR; const pointyLength = renderOffsets.size * POINTY_AXIS_FACTOR; @@ -105,7 +105,7 @@ export function sizeFromLinesAndCells({offsets, lines, cells, rightMargin = 0, b const { top: topMargin, left: leftMargin, - displayMode: { + layout: { orientation, indentedLines } @@ -118,12 +118,12 @@ export function sizeFromLinesAndCells({offsets, lines, cells, rightMargin = 0, b const hasIndents = lines > 1 || indentedLines === LineParity.EVEN - // Every line will be 3/4 of a cell length apart in the pointy direction; + // Every cell will be 3/4 of a cell length apart in the pointy direction; // however, the last line is still one full pointy cell long. - const pointyLength = pointyMargins + ((cells - 1) * pointyCellLength * 3 / 4) + pointyCellLength; - // Every cell will be one full cell length apart in the flat direction; + const pointyLength = pointyMargins + ((lines - 1) * pointyCellLength * 3 / 4) + pointyCellLength; + // Every line will be one full cell length apart in the flat direction; // however, if there are indents, another half cell is needed to accommodate them. - const flatLength = flatMargins + lines * flatCellLength + (hasIndents ? flatCellLength / 2 : 0); + const flatLength = flatMargins + cells * flatCellLength + (hasIndents ? flatCellLength / 2 : 0); return orientation === HexagonOrientation.FLAT_TOP ? { width: pointyLength, height: flatLength } : { width: flatLength, height: pointyLength } } diff --git a/client/src/state/HexMap.ts b/client/src/state/HexMap.ts index 1c0f37b..d826c36 100644 --- a/client/src/state/HexMap.ts +++ b/client/src/state/HexMap.ts @@ -71,20 +71,20 @@ export interface HexMap { readonly xid: string } -export function initializeMap({lines, cellsPerLine, displayMode, xid}: {lines: number, cellsPerLine: number, displayMode: HexLayout, xid: string}): HexMap { - const lineCells: HexLine[] = []; +export function initializeMap({lines, cellsPerLine, layout, xid}: {lines: number, cellsPerLine: number, layout: HexLayout, xid: string}): HexMap { + const layer: HexLine[] = []; const emptyLine: HexCell[] = []; for (let cell = 0; cell < cellsPerLine; cell += 1) { emptyLine.push(EMPTY_CELL) } for (let line = 0; line < lines; line += 1) { - lineCells.push(emptyLine) + layer.push(emptyLine) } return { lines, - cellsPerLine: cellsPerLine, - layout: displayMode, - layer: lineCells, + cellsPerLine, + layout, + layer, xid } } diff --git a/client/src/ui/HexColorPicker.tsx b/client/src/ui/HexColorPicker.tsx index 7970a37..760c6c4 100644 --- a/client/src/ui/HexColorPicker.tsx +++ b/client/src/ui/HexColorPicker.tsx @@ -20,7 +20,7 @@ function HexSwatch({color, index, offsets, classNames, onClick}: {color: string, } const ACTIVE_OFFSETS: RenderOffsets = { - displayMode: { + layout: { orientation: HexagonOrientation.POINTY_TOP, indentedLines: LineParity.ODD }, @@ -30,7 +30,7 @@ const ACTIVE_OFFSETS: RenderOffsets = { } const SWATCH_OFFSETS: RenderOffsets = { - displayMode: { + layout: { orientation: HexagonOrientation.FLAT_TOP, indentedLines: LineParity.EVEN }, diff --git a/client/src/websocket/MapFromPb.ts b/client/src/websocket/MapFromPb.ts index 0529830..0aab76e 100644 --- a/client/src/websocket/MapFromPb.ts +++ b/client/src/websocket/MapFromPb.ts @@ -65,8 +65,8 @@ function mapFromPb(map: HexMapPB): HexMap { } return { xid: encode(map.xid), - lines: 0, - cellsPerLine: 0, + lines: map.lines, + cellsPerLine: map.cellsPerLine, layout: layoutFromPb(map.layout), layer: map.layer.lines.map((line): HexCell[] => { return line.cells.map(cellFromPb) diff --git a/client/src/websocket/WebsocketReactAdapter.tsx b/client/src/websocket/WebsocketReactAdapter.tsx index 11d4a74..773a6f7 100644 --- a/client/src/websocket/WebsocketReactAdapter.tsx +++ b/client/src/websocket/WebsocketReactAdapter.tsx @@ -5,9 +5,9 @@ import { ClientActCommand, ClientGoodbyeAction, ClientHelloCommand, - ClientRefreshCommand, - isClientCommand, - isClientGoodbyeCommand, + ClientRefreshCommand, isClientGoodbyeCommand, + isClientHelloCommand, + isClientRefreshCommand, SendableAction, SentAction } from "../actions/ClientAction"; @@ -58,10 +58,11 @@ export function WebsocketReactAdapter({url, protocols = ["v1.hexmap.deliciousrey } }, [nextID, dispatch, pendingMessages, state]) useEffect(() => { - if (state === ServerConnectionState.CONNECTED && specialMessage !== null && specialMessage !== lastSpecialMessage) { - if (isClientCommand(specialMessage)) { + if (specialMessage !== null && specialMessage !== lastSpecialMessage) { + if ((state === ServerConnectionState.AWAITING_HELLO && isClientHelloCommand(specialMessage)) + || (state === ServerConnectionState.AWAITING_REFRESH && isClientRefreshCommand(specialMessage))) { connector.current.send(specialMessage); - } else if (isClientGoodbyeCommand(specialMessage)) { + } else if (state === ServerConnectionState.AWAITING_GOODBYE && isClientGoodbyeCommand(specialMessage)) { connector.current.close(specialMessage.code, specialMessage.reason) } setLastSpecialMessage(specialMessage); diff --git a/client/src/websocket/WebsocketTranslator.ts b/client/src/websocket/WebsocketTranslator.ts index 371e14f..961f84a 100644 --- a/client/src/websocket/WebsocketTranslator.ts +++ b/client/src/websocket/WebsocketTranslator.ts @@ -1,4 +1,4 @@ -/** Translates between websocket messages and Commands. */ +/** Translates between ws messages and Commands. */ import {ClientCommand} from "../actions/ClientAction"; import { SERVER_GOODBYE, @@ -71,6 +71,9 @@ export class WebsocketTranslator { } close(code: number, reason: string) { + if (this.socket === null || this.socket.readyState !== WebSocket.OPEN) { + return + } this.socket?.close(code, reason) } diff --git a/server/action/action.go b/server/action/action.go index 5ae920c..52d3bd8 100644 --- a/server/action/action.go +++ b/server/action/action.go @@ -110,3 +110,16 @@ func (c UserActiveColor) Apply(s *state.Synced) error { s.User.ActiveColor = c.Color return nil } + +// IDed contains a pair of ID and Action, as sent by the client. +type IDed struct { + // ID contains the arbitrary ID that was sent by the client, for identifying the action in future messages. + ID uint32 `json:"id"` + // Action contains the action that was actually being sent. + Action Client `json:"action"` +} + +func (i IDed) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddUint32("id", i.ID) + return encoder.AddObject("action", i.Action) +} diff --git a/server/host/HttpServer.go b/server/host/HttpServer.go new file mode 100644 index 0000000..8469e86 --- /dev/null +++ b/server/host/HttpServer.go @@ -0,0 +1,193 @@ +package main + +import ( + "context" + "git.reya.zone/reya/hexmap/server/room" + "git.reya.zone/reya/hexmap/server/state" + "git.reya.zone/reya/hexmap/server/ws" + "github.com/gorilla/websocket" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "strconv" + "time" +) + +const SaveDir = "/home/reya/hexmaps" + +func save(m state.HexMap, l *zap.Logger) error { + filename := filepath.Join(SaveDir, "map."+strconv.FormatInt(time.Now().Unix(), 16)) + l.Debug("Saving to file", zap.String("filename", filename)) + marshaled, err := proto.Marshal(m.ToPB()) + l.Debug("Marshaled proto") + if err != nil { + return err + } + l.Debug("Opening file") + file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0x644) + if err != nil { + return err + } + l.Debug("Writing to file") + _, err = file.Write(marshaled) + if err != nil { + return err + } + l.Debug("Closing file") + err = file.Close() + if err != nil { + return err + } + l.Info("Saved to file", zap.String("filename", filename)) + return nil +} + +func load(l *zap.Logger) (*state.HexMap, error) { + filename := filepath.Join(SaveDir, "map.LOAD") + l.Debug("Loading from file", zap.String("filename", filename)) + file, err := os.Open(filename) + if err != nil { + return nil, err + } + l.Debug("Reading file") + marshaled, err := ioutil.ReadAll(file) + if err != nil { + return nil, err + } + pb := &state.HexMapPB{} + l.Debug("Extracting protobuf from file") + err = proto.Unmarshal(marshaled, pb) + if err != nil { + return nil, err + } + l.Debug("Closing file") + err = file.Close() + if err != nil { + return nil, err + } + m, err := pb.ToGo() + if err != nil { + return nil, err + } + l.Info("Loaded from file", zap.String("filename", filename)) + return &m, nil +} + +func BackupMap(client *room.Client, l *zap.Logger) { + var err error + myState := &state.Synced{} + l.Info("Starting backup system") + for { + msg := <-client.IncomingChannel() + switch typedMsg := msg.(type) { + case *room.JoinResponse: + myState = typedMsg.CurrentState() + err := save(myState.Map, l) + if err != nil { + l.Error("Failed saving during join response", zap.Error(err)) + } + case *room.ActionBroadcast: + err = typedMsg.Action().Apply(myState) + if err == nil { + err = save(myState.Map, l) + if err != nil { + l.Error("Failed saving during action broadcast", zap.Error(err)) + } + } + case *room.ShutdownRequest: + client.OutgoingChannel() <- client.AcknowledgeShutdown() + return + } + } +} + +func ServeWS(logger *zap.Logger) (err error) { + m := http.NewServeMux() + httpLogger := logger.Named("HTTP") + hexes, err := load(logger) + if err != nil { + hexes = state.NewHexMap(state.Layout{ + Orientation: state.PointyTop, + IndentedLines: state.EvenLines, + }, 25, 10) + } + rm := room.New(room.NewOptions{ + BaseLogger: logger.Named("Room"), + StartingState: &state.Synced{ + Map: *hexes, + User: state.UserState{ + ActiveColor: state.Color{ + R: 0, + G: 0, + B: 0, + A: 255, + }, + }, + }, + StartingClientOptions: room.NewClientOptions{ + IncomingChannel: nil, + AcceptBroadcasts: true, + RequestStartingState: true, + }, + }) + go BackupMap(rm, logger.Named("BackupMap")) + m.Handle("/map", &ws.HTTPHandler{ + Upgrader: websocket.Upgrader{ + Subprotocols: []string{"v1.hexmap.deliciousreya.net"}, + CheckOrigin: func(r *http.Request) bool { + return r.Header.Get("Origin") == "https://hexmap.deliciousreya.net" + }, + }, + Logger: logger.Named("WS"), + Room: rm, + }) + srv := http.Server{ + Addr: "127.0.0.1:5238", + Handler: m, + ErrorLog: zap.NewStdLog(httpLogger), + } + m.HandleFunc("/exit", func(writer http.ResponseWriter, request *http.Request) { + // Some light dissuasion of accidental probing. + // To keep good people out. + if request.FormValue("superSecretPassword") != "Gesture/Retrial5/Untrained/Countable/Extrude/Jeep/Cheese/Carbon" { + writer.WriteHeader(403) + _, err = writer.Write([]byte("... What are you trying to pull?")) + return + } + writer.WriteHeader(200) + _, err := writer.Write([]byte("OK, shutting down, bye!")) + if err != nil { + logger.Warn("Error while writing goodbye response", zap.Error(err)) + } + time.AfterFunc(500*time.Millisecond, func() { + err := srv.Shutdown(context.Background()) + if err != nil { + logger.Error("Error while shutting down the server", zap.Error(err)) + } + }) + }) + err = srv.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + return err + } + rm.OutgoingChannel() <- rm.Stop() + for { + msg := <-rm.IncomingChannel() + switch msg.(type) { + case *room.ShutdownRequest: + rm.OutgoingChannel() <- rm.AcknowledgeShutdown() + return nil + } + } +} + +func main() { + logger, err := zap.NewDevelopment() + err = ServeWS(logger) + if err != nil { + logger.Fatal("Error while serving HTTP", zap.Error(err)) + } +} diff --git a/server/room/actor.go b/server/room/actor.go index 7809484..828b58b 100644 --- a/server/room/actor.go +++ b/server/room/actor.go @@ -18,28 +18,28 @@ func (r *room) act() { msgLogger := r.logger.With(zap.Stringer("client", client)) msgLogger.Debug("Message received, handling", zap.Object("message", raw)) switch msg := raw.(type) { - case JoinRequest: + case *JoinRequest: r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel) r.acknowledgeJoin(msg.id, msg.wantCurrentState) - case RefreshRequest: + case *RefreshRequest: r.sendRefresh(msg.id) - case ApplyRequest: + case *ApplyRequest: msgLogger.Debug("Received action to apply from client", zap.Uint32("actionId", msg.action.ID)) result := r.applyAction(msg.action.Action) - if result != nil { + if result == nil { r.broadcastAction(client, msg.action.ID, msg.action.Action) } r.acknowledgeAction(client, msg.action.ID, result) - case LeaveRequest: + case *LeaveRequest: // So long, then. We can close immediately here; they promised not to send any more messages after this // unless we were shutting down, which we're not. r.acknowledgeLeave(client) r.closeClient(client) - case StopRequest: + case *StopRequest: // As requested, we shut down. Our deferred gracefulShutdown will catch us as we fall. msgLogger.Info("Received StopRequest from client, shutting down") return - case ShutdownResponse: + case *ShutdownResponse: // Uh... thank... you. I'm not... Never mind. I guess this means you're leaving? msgLogger.Error("Received unexpected ShutdownResponse from client while not shutting down") r.closeClient(client) @@ -95,7 +95,7 @@ func (r *room) acknowledgeJoin(id xid.ID, includeState bool) { s = r.stateCopy() } logger.Debug("Sending JoinResponse to client") - client.outgoingChannel <- JoinResponse{ + client.outgoingChannel <- &JoinResponse{ id: r.id, currentState: s, } @@ -113,7 +113,7 @@ func (r *room) sendRefresh(id xid.ID) { logger.Debug("Preparing state copy for client") s = r.stateCopy() logger.Debug("Sending RefreshResponse to client") - client.outgoingChannel <- RefreshResponse{ + client.outgoingChannel <- &RefreshResponse{ id: r.id, currentState: s, } @@ -126,7 +126,7 @@ func (r *room) applyAction(action action.Action) error { } // broadcastAction sends an action to everyone other than the original client which requested it. -func (r *room) broadcastAction(originalClientID xid.ID, originalActionID uint32, action action.Action) { +func (r *room) broadcastAction(originalClientID xid.ID, originalActionID uint32, action action.Server) { logger := r.logger.With(zap.Stringer("originalClient", originalClientID), zap.Uint32("actionID", originalActionID), zap.Object("action", action)) broadcast := ActionBroadcast{ id: r.id, @@ -138,7 +138,7 @@ func (r *room) broadcastAction(originalClientID xid.ID, originalActionID uint32, for id, client := range r.clients { if id.Compare(originalClientID) != 0 { logger.Debug("Sending ActionBroadcast to client", zap.Stringer("client", id)) - client.outgoingChannel <- broadcast + client.outgoingChannel <- &broadcast } } } @@ -153,7 +153,7 @@ func (r *room) acknowledgeAction(id xid.ID, actionID uint32, error error) { return } logger.Debug("Sending ApplyResponse to client") - client.outgoingChannel <- ApplyResponse{ + client.outgoingChannel <- &ApplyResponse{ id: id, actionID: actionID, result: error, @@ -171,7 +171,7 @@ func (r *room) acknowledgeLeave(id xid.ID) { return } logger.Debug("Sending LeaveResponse to client") - client.outgoingChannel <- LeaveResponse{id: r.id} + client.outgoingChannel <- &LeaveResponse{id: r.id} } // closeClient causes the room to remove the client with the given id from its clients. @@ -206,25 +206,25 @@ func (r *room) gracefulShutdown() { msgLogger := r.logger.With(zap.Stringer("client", client)) msgLogger.Debug("Post-shutdown message received, handling", zap.Object("message", raw)) switch msg := raw.(type) { - case JoinRequest: + case *JoinRequest: // Don't you hate it when someone comes to the desk right as you're getting ready to pack up? // Can't ignore them - they have our channel, and they might be sending things to it. We have to add them // and then immediately send them a ShutdownRequest and wait for them to answer it. msgLogger.Debug("Received join request from client while shutting down") r.addClient(msg.id, msg.returnChannel, msg.broadcast, msg.privateChannel) r.requestShutdownFrom(client) - case RefreshRequest: + case *RefreshRequest: // Ugh, seriously, now? Fine. You can have this - you might be our friend the persistence actor. r.sendRefresh(client) - case LeaveRequest: + case *LeaveRequest: // We sent them a shutdown already, so unfortunately, we can't close them immediately. We have to wait for // them to tell us they've heard that we're shutting down. msgLogger.Debug("Received leave request from client while shutting down") r.acknowledgeLeave(client) - case StopRequest: + case *StopRequest: // Yes. We're doing that. Check your inbox, I already sent you the shutdown. msgLogger.Debug("Received stop request from client while shutting down") - case ShutdownResponse: + case *ShutdownResponse: // The only way we would be getting one of these is if the client knows it doesn't have to send us anything // else. Therefore, we can remove them now. // Similarly, we know that they'll receive the LeaveResponse they need and shut down. @@ -261,7 +261,7 @@ func (r *room) requestShutdownFrom(id xid.ID) { if !ok { r.logger.Error("No such client when requesting shutdown from client", clientField) } - client.outgoingChannel <- shutdown + client.outgoingChannel <- &shutdown } // finalShutdown causes the room to do any final cleanup not involving its clients before stopping. diff --git a/server/room/client.go b/server/room/client.go index faf9af1..2afeeb6 100644 --- a/server/room/client.go +++ b/server/room/client.go @@ -1,8 +1,8 @@ package room import ( + "git.reya.zone/reya/hexmap/server/action" "github.com/rs/xid" - "go.uber.org/zap" ) // internalClient is used by the room itself to track information about a client. @@ -28,8 +28,6 @@ type NewClientOptions struct { // If RequestStartingState is true, the room will send a copy of the current state as of when the JoinRequest was // received in the JoinResponse that will be the first message the Client receives. RequestStartingState bool - // If sets, - Logger *zap.Logger } // Client is the structure used by clients external to the Room package to communicate with the Room. @@ -91,7 +89,7 @@ func newClientForRoom(roomId xid.ID, outgoingChannel chan<- ClientMessage, opts outgoingChannel: outgoingChannel, shuttingDown: false, } - result.outgoingChannel <- JoinRequest{ + result.outgoingChannel <- &JoinRequest{ id: result.id, returnChannel: incomingChannel, privateChannel: privateChannel, @@ -111,26 +109,36 @@ func (c *Client) NewClient(opts NewClientOptions) *Client { } // Refresh creates a message which causes the client to request a fresh copy of the state. -func (c *Client) Refresh() RefreshRequest { +func (c *Client) Refresh() *RefreshRequest { if c.shuttingDown { panic("Already started shutting down; no new messages should be sent") } - return RefreshRequest{ + return &RefreshRequest{ id: c.id, } } +func (c *Client) Apply(a action.IDed) *ApplyRequest { + if c.shuttingDown { + panic("Already started shutting down; no new messages should be sent") + } + return &ApplyRequest{ + id: c.id, + action: a, + } +} + // Leave creates a message which causes the local client to signal that it is shutting down. // It is important to Leave to avoid dangling clients having messages sent to nothing. // After sending Leave, the client must confirm that it has been removed by waiting for a LeaveResponse, accompanied by // the closing of the Client's IncomingChannel if it was a private channel. // No further messages should be sent after Leave except AcknowledgeShutdown if Leave and requestShutdown crossed paths in midair. -func (c *Client) Leave() LeaveRequest { +func (c *Client) Leave() *LeaveRequest { if c.shuttingDown { panic("Already started shutting down; no new messages should be sent") } c.shuttingDown = true - return LeaveRequest{ + return &LeaveRequest{ id: c.id, } } @@ -140,12 +148,12 @@ func (c *Client) Leave() LeaveRequest { // After sending Stop, the client must confirm that it has been removed by waiting for a ShutdownRequest, which should // be handled normally. // No further messages should be sent after Stop except AcknowledgeShutdown. -func (c *Client) Stop() StopRequest { +func (c *Client) Stop() *StopRequest { if c.shuttingDown { panic("Already started shutting down; no new messages should be sent") } c.shuttingDown = true - return StopRequest{ + return &StopRequest{ id: c.id, } } @@ -153,13 +161,13 @@ func (c *Client) Stop() StopRequest { // AcknowledgeShutdown causes the local client to signal that it has acknowledged that the room is shutting down. // No further messages can be sent after AcknowledgeShutdown; attempting to do so will block forever, as the // OutgoingChannel has become nil. -func (c *Client) AcknowledgeShutdown() ShutdownResponse { +func (c *Client) AcknowledgeShutdown() *ShutdownResponse { if c.outgoingChannel == nil { panic("Already finished shutting down; no new messages should be sent") } c.shuttingDown = true c.outgoingChannel = nil - return ShutdownResponse{ + return &ShutdownResponse{ id: c.id, } } diff --git a/server/room/clientmessage.go b/server/room/clientmessage.go index 062dae9..e1e670b 100644 --- a/server/room/clientmessage.go +++ b/server/room/clientmessage.go @@ -1,7 +1,7 @@ package room import ( - "git.reya.zone/reya/hexmap/server/websocket" + action2 "git.reya.zone/reya/hexmap/server/action" "github.com/rs/xid" "go.uber.org/zap/zapcore" ) @@ -31,7 +31,7 @@ type JoinRequest struct { wantCurrentState bool } -func (j JoinRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (j *JoinRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "JoinRequest") encoder.AddString("id", j.id.String()) encoder.AddBool("broadcast", j.broadcast) @@ -39,7 +39,7 @@ func (j JoinRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } -func (j JoinRequest) ClientID() xid.ID { +func (j *JoinRequest) ClientID() xid.ID { return j.id } @@ -48,30 +48,30 @@ type RefreshRequest struct { id xid.ID } -func (r RefreshRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (r *RefreshRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "RefreshRequest") encoder.AddString("id", r.id.String()) return nil } -func (r RefreshRequest) ClientID() xid.ID { +func (r *RefreshRequest) ClientID() xid.ID { return r.id } // ApplyRequest is the message sent on the room's IncomingChannel by a client which has received an action from the -// websocket. +// ws. type ApplyRequest struct { id xid.ID - action websocket.IDed + action action2.IDed } -func (f ApplyRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (f *ApplyRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "ApplyRequest") encoder.AddString("id", f.id.String()) return encoder.AddObject("action", f.action) } -func (f ApplyRequest) ClientID() xid.ID { +func (f *ApplyRequest) ClientID() xid.ID { return f.id } @@ -82,13 +82,13 @@ type LeaveRequest struct { id xid.ID } -func (l LeaveRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (l *LeaveRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "LeaveRequest") encoder.AddString("id", l.id.String()) return nil } -func (l LeaveRequest) ClientID() xid.ID { +func (l *LeaveRequest) ClientID() xid.ID { return l.id } @@ -98,13 +98,13 @@ type StopRequest struct { id xid.ID } -func (s StopRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (s *StopRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "StopRequest") encoder.AddString("id", s.id.String()) return nil } -func (s StopRequest) ClientID() xid.ID { +func (s *StopRequest) ClientID() xid.ID { return s.id } @@ -113,12 +113,12 @@ type ShutdownResponse struct { id xid.ID } -func (s ShutdownResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (s *ShutdownResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "ShutdownResponse") encoder.AddString("id", s.id.String()) return nil } -func (s ShutdownResponse) ClientID() xid.ID { +func (s *ShutdownResponse) ClientID() xid.ID { return s.id } diff --git a/server/room/message.go b/server/room/message.go index 9595d64..0d1026c 100644 --- a/server/room/message.go +++ b/server/room/message.go @@ -22,18 +22,18 @@ type JoinResponse struct { currentState *state.Synced } -func (j JoinResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (j *JoinResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "JoinResponse") encoder.AddString("id", j.id.String()) return encoder.AddObject("currentState", j.currentState) } -func (j JoinResponse) RoomID() xid.ID { +func (j *JoinResponse) RoomID() xid.ID { return j.id } // CurrentState returns the state of the room as of when the JoinRequest was processed. -func (j JoinResponse) CurrentState() *state.Synced { +func (j *JoinResponse) CurrentState() *state.Synced { return j.currentState } @@ -44,18 +44,18 @@ type RefreshResponse struct { currentState *state.Synced } -func (r RefreshResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (r *RefreshResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "JoinResponse") encoder.AddString("id", r.id.String()) return encoder.AddObject("currentState", r.currentState) } -func (r RefreshResponse) RoomID() xid.ID { +func (r *RefreshResponse) RoomID() xid.ID { return r.id } // CurrentState returns the state of the room as of when the RefreshRequest was processed. -func (r RefreshResponse) CurrentState() *state.Synced { +func (r *RefreshResponse) CurrentState() *state.Synced { return r.currentState } @@ -68,7 +68,7 @@ type ApplyResponse struct { result error } -func (a ApplyResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (a *ApplyResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "ApplyResponse") encoder.AddString("id", a.id.String()) encoder.AddUint32("actionId", a.actionID) @@ -79,17 +79,21 @@ func (a ApplyResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } -func (a ApplyResponse) RoomID() xid.ID { +func (a *ApplyResponse) RoomID() xid.ID { return a.id } +func (a *ApplyResponse) ActionID() uint32 { + return a.actionID +} + // Success returns true if the action succeeded, false if it failed. -func (a ApplyResponse) Success() bool { +func (a *ApplyResponse) Success() bool { return a.result == nil } // Failure returns the error if the action failed, or nil if it succeeded. -func (a ApplyResponse) Failure() error { +func (a *ApplyResponse) Failure() error { return a.result } @@ -101,10 +105,10 @@ type ActionBroadcast struct { // originalActionID is the ID that the client that sent the action sent. originalActionID uint32 // action is the action that succeeded. - action action.Action + action action.Server } -func (a ActionBroadcast) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (a *ActionBroadcast) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "ActionBroadcast") encoder.AddString("id", a.id.String()) encoder.AddString("originalClientId", a.originalClientID.String()) @@ -112,22 +116,31 @@ func (a ActionBroadcast) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return encoder.AddObject("action", a.action) } -func (a ActionBroadcast) RoomID() xid.ID { +func (a *ActionBroadcast) RoomID() xid.ID { return a.id } +func (a *ActionBroadcast) OriginalClientID() xid.ID { + return a.originalClientID +} +func (a *ActionBroadcast) OriginalActionID() uint32 { + return a.originalActionID +} +func (a *ActionBroadcast) Action() action.Server { + return a.action +} // LeaveResponse is the message sent by the room when it has accepted that a client has left, and will send it no further messages. type LeaveResponse struct { id xid.ID } -func (l LeaveResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (l *LeaveResponse) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "LeaveResponse") encoder.AddString("id", l.id.String()) return nil } -func (l LeaveResponse) RoomID() xid.ID { +func (l *LeaveResponse) RoomID() xid.ID { return l.id } @@ -136,12 +149,12 @@ type ShutdownRequest struct { id xid.ID } -func (s ShutdownRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (s *ShutdownRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("type", "ShutdownRequest") encoder.AddString("id", s.id.String()) return nil } -func (s ShutdownRequest) RoomID() xid.ID { +func (s *ShutdownRequest) RoomID() xid.ID { return s.id } diff --git a/server/state/map.go b/server/state/map.go index fa1a2ba..33244c2 100644 --- a/server/state/map.go +++ b/server/state/map.go @@ -119,11 +119,11 @@ func (l HexLayer) MarshalLogArray(encoder zapcore.ArrayEncoder) error { // GetCellAt returns a reference to the cell at the given coordinates. // If the coordinates are out of bounds for this map, an error will be returned. func (l HexLayer) GetCellAt(c StorageCoordinates) (*HexCell, error) { - if int(c.Line) > len(l) { + if int(c.Line) >= len(l) { return nil, fmt.Errorf("line %d out of bounds (%d)", c.Line, len(l)) } line := l[c.Line] - if int(c.Cell) > len(line) { + if int(c.Cell) >= len(line) { return nil, fmt.Errorf("cell %d out of bounds (%d)", c.Cell, len(line)) } return &(line[c.Cell]), nil @@ -149,18 +149,43 @@ type HexMap struct { // This is the number of cells joined together, flat-edge to flat-edge, in each line. CellsPerLine uint8 `json:"cellsPerLine"` // Layout is the orientation and line parity used to display the map. - Layout Layout `json:"displayMode"` + Layout Layout `json:"layout"` // Layer contains the actual map data. // Layer itself is a slice with Lines elements, each of which is a line; // each of those lines is a slice of CellsPerLine cells. - Layer HexLayer `json:"lineCells"` + Layer HexLayer `json:"layer"` +} + +func NewHexMap(layout Layout, lines uint8, cellsPerLine uint8) *HexMap { + layer := make(HexLayer, lines) + for index := range layer { + line := make(HexLine, cellsPerLine) + for index := range line { + line[index] = HexCell{ + Color: Color{ + R: 255, + G: 255, + B: 255, + A: 255, + }, + } + } + layer[index] = line + } + return &HexMap{ + XID: xid.New(), + Layout: layout, + Lines: lines, + CellsPerLine: cellsPerLine, + Layer: layer, + } } func (m HexMap) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddString("id", m.XID.String()) encoder.AddUint8("lines", m.Lines) encoder.AddUint8("cellsPerLine", m.CellsPerLine) - displayModeErr := encoder.AddObject("displayMode", m.Layout) + displayModeErr := encoder.AddObject("layout", m.Layout) lineCellsErr := encoder.AddArray("lineCells", m.Layer) if displayModeErr != nil { return displayModeErr diff --git a/server/websocket/connection.go b/server/websocket/connection.go deleted file mode 100644 index 3bdc46f..0000000 --- a/server/websocket/connection.go +++ /dev/null @@ -1,37 +0,0 @@ -package websocket - -import ( - "github.com/gorilla/websocket" - "time" -) - -const ( - // ReadTimeLimit is the maximum time the server is willing to wait after receiving a message before receiving another one. - ReadTimeLimit = 60 * time.Second - // WriteTimeLimit is the maximum time the server is willing to wait to send a message. - WriteTimeLimit = 10 * time.Second - // ControlTimeLimit is the maximum time the server is willing to wait to send a control message like Ping or Close. - ControlTimeLimit = (WriteTimeLimit * 5) / 10 - // PingDelay is the time between pings. - // It must be less than ReadTimeLimit to account for latency and delays on either side. - PingDelay = (ReadTimeLimit * 7) / 10 -) - -// A Connection corresponds to a pair of actors. -type Connection struct { - conn *websocket.Conn - r reader - w writer -} - -// ReadChannel returns the channel that can be used to read client messages from the connection. -// After receiving SocketClosed, the reader will close its channel. -func (c *Connection) ReadChannel() <-chan ClientCommand { - return c.r.channel -} - -// WriteChannel returns the channel that can be used to send server messages on the connection. -// After sending SocketClosed, the writer will close its channel; do not send any further messages on the channel. -func (c *Connection) WriteChannel() chan<- ServerCommand { - return c.w.channel -} diff --git a/server/websocket/client.go b/server/ws/client.go similarity index 80% rename from server/websocket/client.go rename to server/ws/client.go index 653ce13..6fe8943 100644 --- a/server/websocket/client.go +++ b/server/ws/client.go @@ -1,4 +1,4 @@ -package websocket +package ws import ( "git.reya.zone/reya/hexmap/server/action" @@ -36,20 +36,7 @@ func (c ClientRefresh) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } -// IDed contains a pair of ID and Action, as sent by the client. -type IDed struct { - // ID contains the arbitrary ID that was sent by the client, for identifying the action in future messages. - ID uint32 `json:"id"` - // Action contains the action that was actually being sent. - Action action.Client `json:"action"` -} - -func (i IDed) MarshalLogObject(encoder zapcore.ObjectEncoder) error { - encoder.AddUint32("id", i.ID) - return encoder.AddObject("action", i.Action) -} - -type IDPairs []IDed +type IDPairs []action.IDed func (a IDPairs) MarshalLogArray(encoder zapcore.ArrayEncoder) error { var finalErr error = nil diff --git a/server/websocket/client.pbconv.go b/server/ws/client.pbconv.go similarity index 59% rename from server/websocket/client.pbconv.go rename to server/ws/client.pbconv.go index b8f14d3..3159de7 100644 --- a/server/websocket/client.pbconv.go +++ b/server/ws/client.pbconv.go @@ -1,6 +1,7 @@ -package websocket +package ws import ( + "git.reya.zone/reya/hexmap/server/action" "git.reya.zone/reya/hexmap/server/state" ) @@ -19,13 +20,13 @@ func (x *ClientCommandPB) ToGo() (ClientCommand, error) { } } -func (x *ClientHelloPB) ToGo() ClientHello { - return ClientHello{ +func (x *ClientHelloPB) ToGo() *ClientHello { + return &ClientHello{ Version: x.Version, } } -func (c ClientHello) ToClientPB() *ClientCommandPB { +func (c *ClientHello) ToClientPB() *ClientCommandPB { return &ClientCommandPB{ Command: &ClientCommandPB_Hello{ Hello: &ClientHelloPB{ @@ -35,11 +36,11 @@ func (c ClientHello) ToClientPB() *ClientCommandPB { } } -func (*ClientRefreshPB) ToGo() ClientRefresh { - return ClientRefresh{} +func (*ClientRefreshPB) ToGo() *ClientRefresh { + return &ClientRefresh{} } -func (c ClientRefresh) ToClientPB() *ClientCommandPB { +func (c *ClientRefresh) ToClientPB() *ClientCommandPB { return &ClientCommandPB{ Command: &ClientCommandPB_Refresh{ Refresh: &ClientRefreshPB{}, @@ -47,42 +48,38 @@ func (c ClientRefresh) ToClientPB() *ClientCommandPB { } } -func (x *ClientActPB_IDed) ToGo() (IDed, error) { - action, err := x.Action.ToGo() +func (x *ClientActPB_IDed) ToGo() (action.IDed, error) { + act, err := x.Action.ToGo() if err != nil { - return IDed{}, nil + return action.IDed{}, nil } - return IDed{ + return action.IDed{ ID: x.Id, - Action: action, + Action: act, }, nil } -func (i IDed) ToPB() *ClientActPB_IDed { - return &ClientActPB_IDed{ - Id: i.ID, - Action: i.Action.ToClientPB(), - } -} - -func (x *ClientActPB) ToGo() (ClientAct, error) { +func (x *ClientActPB) ToGo() (*ClientAct, error) { actions := make(IDPairs, len(x.Actions)) for index, ided := range x.Actions { action, err := ided.ToGo() if err != nil { - return ClientAct{}, err + return nil, err } actions[index] = action } - return ClientAct{ + return &ClientAct{ Actions: actions, }, nil } -func (c ClientAct) ToClientPB() *ClientCommandPB { +func (c *ClientAct) ToClientPB() *ClientCommandPB { actions := make([]*ClientActPB_IDed, len(c.Actions)) for index, ided := range c.Actions { - actions[index] = ided.ToPB() + actions[index] = &ClientActPB_IDed{ + Id: ided.ID, + Action: ided.Action.ToClientPB(), + } } return &ClientCommandPB{ Command: &ClientCommandPB_Act{ @@ -93,10 +90,10 @@ func (c ClientAct) ToClientPB() *ClientCommandPB { } } -func (ClientMalformed) ToClientPB() *ClientCommandPB { +func (*ClientMalformed) ToClientPB() *ClientCommandPB { return nil } -func (SocketClosed) ToClientPB() *ClientCommandPB { +func (*SocketClosed) ToClientPB() *ClientCommandPB { return nil } diff --git a/server/ws/connection.go b/server/ws/connection.go new file mode 100644 index 0000000..4d9da6b --- /dev/null +++ b/server/ws/connection.go @@ -0,0 +1,276 @@ +package ws + +import ( + "git.reya.zone/reya/hexmap/server/action" + "git.reya.zone/reya/hexmap/server/room" + "github.com/gorilla/websocket" + "go.uber.org/zap" + "net/http" + "time" +) + +const ( + // ReadTimeLimit is the maximum time the server is willing to wait after receiving a message before receiving another one. + ReadTimeLimit = 60 * time.Second + // WriteTimeLimit is the maximum time the server is willing to wait to send a message. + WriteTimeLimit = 10 * time.Second + // ControlTimeLimit is the maximum time the server is willing to wait to send a control message like Ping or Close. + ControlTimeLimit = (WriteTimeLimit * 5) / 10 + // PingDelay is the time between pings. + // It must be less than ReadTimeLimit to account for latency and delays on either side. + PingDelay = (ReadTimeLimit * 7) / 10 +) + +type HTTPHandler struct { + Upgrader websocket.Upgrader + Logger *zap.Logger + Room *room.Client +} + +func destroyBadProtocolSocket(c *websocket.Conn, logger *zap.Logger) { + err := c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseProtocolError, "Invalid subprotocols"), time.Now().Add(ControlTimeLimit)) + if err != nil { + logger.Error("Failed to write close message") + } + err = c.SetReadDeadline(time.Now().Add(ControlTimeLimit)) + if err != nil { + logger.Error("Failed to set read deadline") + } + for { + _, _, err := c.ReadMessage() + if err != nil { + if !websocket.IsCloseError(err, websocket.CloseProtocolError) { + logger.Error("Websocket connection shut down ignominiously", zap.Error(err)) + } + return + } + } +} + +func (h *HTTPHandler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) { + c, err := h.Upgrader.Upgrade(responseWriter, request, http.Header{}) + if err != nil { + h.Logger.Error("Failed to upgrade ws connection", zap.Error(err)) + return + } + if c.Subprotocol() == "" { + h.Logger.Error("No matching subprotocol", zap.String("clientProtocols", request.Header.Get("Sec-Websocket-Protocol"))) + go destroyBadProtocolSocket(c, h.Logger) + return + } + result := NewConnection(c, h.Logger.Named("Connection")) + exchange(result, h.Logger.Named("Link"), func(o room.NewClientOptions) *room.Client { + return h.Room.NewClient(o) + }) +} + +func exchange(c *Connection, l *zap.Logger, clientMaker func(options room.NewClientOptions) *room.Client) { + wsr := c.ReadChannel() + wsw := c.WriteChannel() + + l.Info("Connection established") + closeWith := &SocketClosed{ + Code: websocket.CloseAbnormalClosure, + Text: "I don't know what happened. But goodbye!", + } + defer func() { + l.Info("Shutting down") + // Wait for the websocket connection to shut down. + wsw <- closeWith + if wsr != nil { + for { + msg := <-wsr + switch msg.(type) { + case *SocketClosed: + return + } + } + } + }() + // State 1: Waiting for a hello. + // Anything else is death. + cmd := <-wsr + switch typedCmd := cmd.(type) { + case *ClientHello: + if typedCmd.Version != ProtocolVersion { + l.Warn("Bad Hello version") + // Disgusting. I can't even look at you. + closeWith = &SocketClosed{ + Code: websocket.CloseProtocolError, + Text: "Wrong protocol version", + } + return + } + l.Info("Got Hello") + default: + l.Warn("Got NON-hello") + closeWith = &SocketClosed{ + Code: websocket.CloseProtocolError, + Text: "You don't even say hello?", + } + return + } + l.Info("Waiting for room.") + // State 2: Waiting for the room to notice us. + rm := clientMaker(room.NewClientOptions{ + AcceptBroadcasts: true, + RequestStartingState: true, + }) + rmr := rm.IncomingChannel() + rmw := rm.OutgoingChannel() + var leaveWith room.ClientMessage = nil + defer func() { + l.Info("Leaving room") + if leaveWith == nil { + leaveWith = rm.Leave() + } + rmw <- leaveWith + if _, ok := leaveWith.(*room.ShutdownResponse); ok { + // The room was already shutting down. + return + } + for { + msg := <-rmr + switch msg.(type) { + case *room.LeaveResponse: + return + case *room.ShutdownRequest: + rmw <- rm.AcknowledgeShutdown() + return + } + } + }() + l.Info("Waiting for JoinResponse") + msg := <-rmr + switch typedMsg := msg.(type) { + case *room.JoinResponse: + l.Info("Got JoinResponse") + wsw <- &ServerHello{ + Version: ProtocolVersion, + State: typedMsg.CurrentState(), + } + case *room.ShutdownRequest: + l.Info("Got ShutdownRequest") + // Room was shutting down when we joined, oops! + closeWith = &SocketClosed{ + Code: websocket.CloseGoingAway, + Text: "Shutting down right as you joined. Sorry!", + } + return + default: + l.Info("Got non-JoinResponse/ShutdownRequest") + // Uh. That's concerning. We don't have anything to send our client. + // Let's just give up. + return + } + l.Info("Waiting for messages") + for { + select { + case cmd := <-wsr: + switch typedCmd := cmd.(type) { + case *ClientHello: + l.Info("Got unnecessary ClientHello") + // Huh??? + closeWith = &SocketClosed{ + Code: websocket.CloseProtocolError, + Text: "Enough hellos. Goodbye.", + } + return + case *ClientRefresh: + l.Info("Got ClientRefresh") + rmw <- rm.Refresh() + case *ClientAct: + l.Info("Got ClientAct") + for _, act := range typedCmd.Actions { + rmw <- rm.Apply(act) + } + case *SocketClosed: + l.Info("Got SocketClosed", zap.Object("close", typedCmd)) + closeWith = typedCmd + return + case *ClientMalformed: + l.Warn("Got ClientMalformed") + return + } + case msg := <-rmr: + switch typedMsg := msg.(type) { + case *room.JoinResponse: + // Huh???? + l.Info("Got unnecesary JoinResponse") + return + case *room.RefreshResponse: + l.Info("Got RefreshResponse") + wsw <- &ServerRefresh{ + State: typedMsg.CurrentState(), + } + case *room.ApplyResponse: + l.Info("Got ApplyResponse") + if typedMsg.Success() { + wsw <- &ServerOK{ + IDs: []uint32{typedMsg.ActionID()}, + } + } else { + wsw <- &ServerFailed{ + IDs: []uint32{typedMsg.ActionID()}, + Error: typedMsg.Failure().Error(), + } + } + case *room.ActionBroadcast: + l.Info("Got ActionBroadcast") + wsw <- &ServerAct{ + Actions: action.ServerSlice{typedMsg.Action()}, + } + case *room.LeaveResponse: + l.Info("Got odd LeaveResponse") + // Oh. u_u I wasn't- okay. + return + case *room.ShutdownRequest: + // Oh. Oh! Okay! Sorry! + l.Info("Got ShutdownRequest") + leaveWith = rm.AcknowledgeShutdown() + return + } + } + } +} + +// A Connection corresponds to a pair of actors. +type Connection struct { + r reader + w writer +} + +func NewConnection(conn *websocket.Conn, logger *zap.Logger) *Connection { + readChan := make(chan time.Time) + out := &Connection{ + r: reader{ + conn: conn, + channel: make(chan ClientCommand), + readNotifications: readChan, + logger: logger.Named("reader"), + }, + w: writer{ + conn: conn, + channel: make(chan ServerCommand), + readNotifications: readChan, + timer: nil, + nextPingAt: time.Time{}, + logger: logger.Named("writer"), + }, + } + go out.r.act() + go out.w.act() + return out +} + +// ReadChannel returns the channel that can be used to read client messages from the connection. +// After receiving SocketClosed, the reader will close its channel. +func (c *Connection) ReadChannel() <-chan ClientCommand { + return c.r.channel +} + +// WriteChannel returns the channel that can be used to send server messages on the connection. +// After sending SocketClosed, the writer will close its channel; do not send any further messages on the channel. +func (c *Connection) WriteChannel() chan<- ServerCommand { + return c.w.channel +} diff --git a/server/websocket/reader.go b/server/ws/reader.go similarity index 87% rename from server/websocket/reader.go rename to server/ws/reader.go index fc08ea5..b557489 100644 --- a/server/websocket/reader.go +++ b/server/ws/reader.go @@ -1,4 +1,4 @@ -package websocket +package ws import ( "fmt" @@ -33,6 +33,11 @@ func (r *reader) act() { defer r.shutdown() // Our first deadline starts immediately, so let the writer know. r.updateDeadlines() + // The reader should not automatically respond with close messages. + // It should simply let the close message be returned as an error. + r.conn.SetCloseHandler(func(code int, text string) error { + return nil + }) r.conn.SetPongHandler(func(appData string) error { r.logger.Debug("Received pong, extending read deadline") r.updateDeadlines() @@ -44,18 +49,18 @@ func (r *reader) act() { for { messageType, messageData, err := r.conn.ReadMessage() if err != nil { - var closure SocketClosed + var closure *SocketClosed if websocket.IsCloseError(err, StandardClientCloseTypes...) { typedErr := err.(*websocket.CloseError) r.logger.Debug("Received normal close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text)) - closure = SocketClosed{Code: typedErr.Code, Text: typedErr.Text} + closure = &SocketClosed{Code: typedErr.Code, Text: typedErr.Text} } else if websocket.IsUnexpectedCloseError(err, StandardClientCloseTypes...) { typedErr := err.(*websocket.CloseError) r.logger.Warn("Received unexpected close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text)) - closure = SocketClosed{Code: typedErr.Code, Text: typedErr.Text} + closure = &SocketClosed{Code: typedErr.Code, Text: typedErr.Text} } else { r.logger.Error("Error while reading message, shutting down", zap.Error(err)) - closure = SocketClosed{Error: err} + closure = &SocketClosed{Error: err} } r.logger.Debug("Sending close message to reader", zap.Object("closeMessage", closure)) r.channel <- closure @@ -75,7 +80,7 @@ func (r *reader) parseCommand(socketType int, data []byte) ClientCommand { MessageType: socketType, } r.logger.Error("Received command with unknown WebSocket message type", zap.Error(err)) - return ClientMalformed{ + return &ClientMalformed{ Error: err, } } @@ -83,13 +88,13 @@ func (r *reader) parseCommand(socketType int, data []byte) ClientCommand { var cmdPb ClientCommandPB err := proto.Unmarshal(data, &cmdPb) if err != nil { - return ClientMalformed{ + return &ClientMalformed{ Error: err, } } cmd, err := (&cmdPb).ToGo() if err != nil { - return ClientMalformed{Error: err} + return &ClientMalformed{Error: err} } return cmd } diff --git a/server/websocket/server.go b/server/ws/server.go similarity index 99% rename from server/websocket/server.go rename to server/ws/server.go index 1a34d38..5207288 100644 --- a/server/websocket/server.go +++ b/server/ws/server.go @@ -1,4 +1,4 @@ -package websocket +package ws import ( "git.reya.zone/reya/hexmap/server/action" diff --git a/server/websocket/server.pbconv.go b/server/ws/server.pbconv.go similarity index 71% rename from server/websocket/server.pbconv.go rename to server/ws/server.pbconv.go index 0d7ee15..f0116ec 100644 --- a/server/websocket/server.pbconv.go +++ b/server/ws/server.pbconv.go @@ -1,4 +1,4 @@ -package websocket +package ws import "git.reya.zone/reya/hexmap/server/action" @@ -19,18 +19,18 @@ func (x *ServerCommandPB) ToGo() (ServerCommand, error) { } } -func (x *ServerHelloPB) ToGo() (ServerHello, error) { +func (x *ServerHelloPB) ToGo() (*ServerHello, error) { state, err := x.State.ToGo() if err != nil { - return ServerHello{}, err + return nil, err } - return ServerHello{ + return &ServerHello{ Version: x.Version, State: &state, }, nil } -func (s ServerHello) ToServerPB() *ServerCommandPB { +func (s *ServerHello) ToServerPB() *ServerCommandPB { return &ServerCommandPB{ Command: &ServerCommandPB_Hello{ Hello: &ServerHelloPB{ @@ -41,17 +41,17 @@ func (s ServerHello) ToServerPB() *ServerCommandPB { } } -func (x *ServerRefreshPB) ToGo() (ServerRefresh, error) { +func (x *ServerRefreshPB) ToGo() (*ServerRefresh, error) { state, err := x.State.ToGo() if err != nil { - return ServerRefresh{}, err + return nil, err } - return ServerRefresh{ + return &ServerRefresh{ State: &state, }, nil } -func (s ServerRefresh) ToServerPB() *ServerCommandPB { +func (s *ServerRefresh) ToServerPB() *ServerCommandPB { return &ServerCommandPB{ Command: &ServerCommandPB_Refresh{ Refresh: &ServerRefreshPB{ @@ -61,15 +61,15 @@ func (s ServerRefresh) ToServerPB() *ServerCommandPB { } } -func (x *ServerOKPB) ToGo() ServerOK { +func (x *ServerOKPB) ToGo() *ServerOK { ids := make(IDSlice, len(x.Ids)) copy(ids, x.Ids) - return ServerOK{ + return &ServerOK{ IDs: ids, } } -func (s ServerOK) ToServerPB() *ServerCommandPB { +func (s *ServerOK) ToServerPB() *ServerCommandPB { ids := make([]uint32, len(s.IDs)) copy(ids, s.IDs) return &ServerCommandPB{ @@ -81,16 +81,16 @@ func (s ServerOK) ToServerPB() *ServerCommandPB { } } -func (x *ServerFailedPB) ToGo() ServerFailed { +func (x *ServerFailedPB) ToGo() *ServerFailed { ids := make(IDSlice, len(x.Ids)) copy(ids, x.Ids) - return ServerFailed{ + return &ServerFailed{ IDs: ids, Error: x.Error, } } -func (s ServerFailed) ToServerPB() *ServerCommandPB { +func (s *ServerFailed) ToServerPB() *ServerCommandPB { ids := make([]uint32, len(s.IDs)) copy(ids, s.IDs) return &ServerCommandPB{ @@ -103,21 +103,21 @@ func (s ServerFailed) ToServerPB() *ServerCommandPB { } } -func (x *ServerActPB) ToGo() (ServerAct, error) { +func (x *ServerActPB) ToGo() (*ServerAct, error) { actions := make(action.ServerSlice, len(x.Actions)) for index, act := range x.Actions { convertedAct, err := act.ToGo() if err != nil { - return ServerAct{}, err + return nil, err } actions[index] = convertedAct } - return ServerAct{ + return &ServerAct{ Actions: actions, }, nil } -func (s ServerAct) ToServerPB() *ServerCommandPB { +func (s *ServerAct) ToServerPB() *ServerCommandPB { actions := make([]*action.ServerActionPB, len(s.Actions)) for index, act := range s.Actions { actions[index] = act.ToServerPB() @@ -131,6 +131,6 @@ func (s ServerAct) ToServerPB() *ServerCommandPB { } } -func (SocketClosed) ToServerPB() *ServerCommandPB { +func (*SocketClosed) ToServerPB() *ServerCommandPB { return nil } diff --git a/server/websocket/shared.go b/server/ws/shared.go similarity index 83% rename from server/websocket/shared.go rename to server/ws/shared.go index 984935c..c0acd50 100644 --- a/server/websocket/shared.go +++ b/server/ws/shared.go @@ -1,4 +1,4 @@ -package websocket +package ws import ( "github.com/gorilla/websocket" @@ -6,9 +6,8 @@ import ( ) const ( - PingData string = "are you still there?" - - GoodbyeType = "GOODBYE" + PingData string = "are you still there?" + ProtocolVersion uint32 = 1 ) var StandardClientCloseTypes = []int{websocket.CloseNormalClosure, websocket.CloseGoingAway, websocket.CloseAbnormalClosure} @@ -28,15 +27,11 @@ type SocketClosed struct { } func (c SocketClosed) MarshalLogObject(encoder zapcore.ObjectEncoder) error { - encoder.AddString("type", GoodbyeType) - encoder.AddInt16("code", int16(c.Code)) + encoder.AddString("type", "GOODBYE") + encoder.AddInt("code", c.Code) encoder.AddString("text", c.Text) if c.Error != nil { encoder.AddString("error", c.Error.Error()) } return nil } - -func (c SocketClosed) ServerType() ServerMessageType { - return GoodbyeType -} diff --git a/server/websocket/writer.go b/server/ws/writer.go similarity index 94% rename from server/websocket/writer.go rename to server/ws/writer.go index 874893a..417e961 100644 --- a/server/websocket/writer.go +++ b/server/ws/writer.go @@ -1,4 +1,4 @@ -package websocket +package ws import ( "github.com/gorilla/websocket" @@ -9,7 +9,7 @@ import ( ) type writer struct { - // conn is the websocket connection that this writer is responsible for writing on. + // conn is the ws connection that this writer is responsible for writing on. conn *websocket.Conn // channel is the channel used to receive server messages to be sent to the client. // When it receives a SocketClosed, the process on the sending end promises not to send any further messages, as the writer will close it right after. @@ -66,7 +66,7 @@ func (w *writer) act() { } case raw := <-w.channel: switch msg := raw.(type) { - case SocketClosed: + case *SocketClosed: w.logger.Debug("Received close message, forwarding and shutting down", zap.Object("msg", msg)) w.sendClose(msg) // bye bye, we'll graceful shutdown because we deferred it @@ -106,7 +106,7 @@ func (w *writer) send(msg ServerCommand) { w.logger.Error("Error while setting write deadline", zap.Time("writeDeadline", writeDeadline), zap.Object("msg", msg), zap.Error(err)) } w.logger.Debug("Opening message writer to send command", zap.Object("msg", msg)) - writer, err := w.conn.NextWriter(websocket.TextMessage) + writer, err := w.conn.NextWriter(websocket.BinaryMessage) if err != nil { w.logger.Error("Error while getting writer from connection", zap.Error(err)) return @@ -127,9 +127,9 @@ func (w *writer) send(msg ServerCommand) { // Deferred close happens now } -// sendClose sends a close message on the websocket connection, but does not actually close the connection. +// sendClose sends a close message on the ws connection, but does not actually close the connection. // It does, however, close the incoming message channel to the writer. -func (w *writer) sendClose(msg SocketClosed) { +func (w *writer) sendClose(msg *SocketClosed) { w.logger.Debug("Shutting down the writer channel") close(w.channel) w.channel = nil @@ -140,7 +140,7 @@ func (w *writer) sendClose(msg SocketClosed) { } } -// sendPing sends a ping message on the websocket connection. The content is arbitrary. +// sendPing sends a ping message on the ws connection. The content is arbitrary. func (w *writer) sendPing() { w.logger.Debug("Sending ping") err := w.conn.WriteControl(websocket.PingMessage, []byte(PingData), time.Now().Add(ControlTimeLimit)) @@ -176,7 +176,7 @@ func (w *writer) gracefulShutdown() { } case raw := <-w.channel: switch msg := raw.(type) { - case SocketClosed: + case *SocketClosed: w.logger.Debug("Received close message from channel while shutting down, forwarding", zap.Object("msg", msg)) w.sendClose(msg) default: