From 8db68101fdf75795da9ad4b088d03deb7e440b34 Mon Sep 17 00:00:00 2001 From: Mari Date: Mon, 12 Jul 2021 16:10:50 -0400 Subject: [PATCH] Implement a protocol writer for the WebSockets. --- server/go.mod | 1 + server/go.sum | 2 + server/websocket/connection.go | 37 +++++++++ server/websocket/reader.go | 14 ++++ server/websocket/shared.go | 12 +-- server/websocket/writer.go | 144 +++++++++++++++++++++++++++++++++ 6 files changed, 201 insertions(+), 9 deletions(-) create mode 100644 server/websocket/connection.go create mode 100644 server/websocket/reader.go create mode 100644 server/websocket/writer.go diff --git a/server/go.mod b/server/go.mod index fbf60c8..7af6de1 100644 --- a/server/go.mod +++ b/server/go.mod @@ -3,6 +3,7 @@ module hexmap-server go 1.16 require ( + github.com/gorilla/websocket v1.4.2 github.com/rs/xid v1.3.0 go.uber.org/zap v1.18.1 ) diff --git a/server/go.sum b/server/go.sum index 717064d..bb39d18 100644 --- a/server/go.sum +++ b/server/go.sum @@ -3,6 +3,8 @@ github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZx github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= diff --git a/server/websocket/connection.go b/server/websocket/connection.go new file mode 100644 index 0000000..de8165e --- /dev/null +++ b/server/websocket/connection.go @@ -0,0 +1,37 @@ +package websocket + +import ( + "github.com/gorilla/websocket" + "time" +) + +const ( + // ReadTimeLimit is the maximum time the server is willing to wait after receiving a message before receiving another one. + ReadTimeLimit = 60 * time.Second + // WriteTimeLimit is the maximum time the server is willing to wait to send a message. + WriteTimeLimit = 10 * time.Second + // ControlTimeLimit is the maximum time the server is willing to wait to send a control message like Ping or Close. + ControlTimeLimit = (WriteTimeLimit * 5) / 10 + // PingDelay is the time between pings. + // It must be less than ReadTimeLimit to account for latency and delays on either side. + PingDelay = (ReadTimeLimit * 7) / 10 +) + +// A Connection corresponds to a pair of actors. +type Connection struct { + conn *websocket.Conn + r reader + w writer +} + +// ReadChannel returns the channel that can be used to read client messages from the connection. +// After receiving SocketClosed, the reader will close its channel. +func (c *Connection) ReadChannel() <-chan ClientMessage { + return c.r.channel +} + +// WriteChannel returns the channel that can be used to send server messages on the connection. +// After sending SocketClosed, the writer will close its channel; do not send any further messages on the channel. +func (c *Connection) WriteChannel() chan<- ServerMessage { + return c.w.channel +} diff --git a/server/websocket/reader.go b/server/websocket/reader.go new file mode 100644 index 0000000..066ec73 --- /dev/null +++ b/server/websocket/reader.go @@ -0,0 +1,14 @@ +package websocket + +import ( + "github.com/gorilla/websocket" + "time" +) + +// Todo: Listen for pongs and extend the read deadline every time you get one + +type reader struct { + conn *websocket.Conn + channel chan ClientMessage + readNotifications chan<- time.Duration +} diff --git a/server/websocket/shared.go b/server/websocket/shared.go index e9e0335..0417a4b 100644 --- a/server/websocket/shared.go +++ b/server/websocket/shared.go @@ -15,23 +15,17 @@ const ( GoodbyeType = "GOODBYE" ) -// TODO: Noting that there should be three channels in play: -// 1) Reader to client: to receive messages from the connection -// 2) Client to writer: to send messages on the connection -// 3) Writer to reader: indicating that it is about to send a close message, and the reader should wait for one and -// time out the connection if it takes too long. - // SocketClosed is synthesized when a client closes the WebSocket connection, or sent to the write process to write a // WebSocket close message. // Sending a SocketClosed on a channel causes that channel to be closed right after. type SocketClosed struct { // Code is the StatusCode given (or which should be given) in the close message. - Code StatusCode + Code StatusCode `json:"code"` // Text is the reason text given (or which should be given) in the close message. Max 123 characters. - Text string + Text string `json:"text"` // Error may be an error that resulted in the closure of the socket. // Will not be written by the writer; only useful when it's returned from the reader. - Error error + Error error `json:"error"` } func (c SocketClosed) MarshalLogObject(encoder zapcore.ObjectEncoder) error { diff --git a/server/websocket/writer.go b/server/websocket/writer.go new file mode 100644 index 0000000..4fac8be --- /dev/null +++ b/server/websocket/writer.go @@ -0,0 +1,144 @@ +package websocket + +import ( + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "go.uber.org/zap" + "io" + "time" +) + +type writer struct { + conn *websocket.Conn + channel chan ServerMessage + readNotifications <-chan time.Duration + timer *time.Ticker + logger *zap.Logger +} + +func (w *writer) act() { + defer w.gracefulShutdown() + w.logger.Debug("Starting up") + w.timer = time.NewTicker(PingDelay) + for { + select { + case _, open := <-w.readNotifications: + if open { + w.logger.Debug("Received reader read, extending ping") + w.timer.Reset(PingDelay) + } else { + w.logger.Debug("Received reader close, shutting down") + w.readNotifications = nil + // bye bye, we'll graceful shutdown because we deferred it + return + } + case raw := <-w.channel: + switch msg := raw.(type) { + case SocketClosed: + w.logger.Debug("Received close message, forwarding and shutting down", zap.Object("msg", msg)) + w.sendClose(msg) + // bye bye, we'll graceful shutdown because we deferred it + return + default: + w.logger.Debug("Received message, forwarding", zap.Object("msg", msg)) + w.send(msg) + } + case <-w.timer.C: + w.sendPing() + } + w.logger.Debug("Awakening handled, resuming listening") + } +} + +func (w *writer) send(msg ServerMessage) { + writer, err := w.conn.NextWriter(websocket.TextMessage) + if err != nil { + w.logger.Error("error while getting writer from connection", zap.Error(err)) + return + } + defer func(writer io.WriteCloser) { + err := writer.Close() + if err != nil { + w.logger.Error("error while closing writer to send message", zap.Error(err)) + } + }(writer) + payload, err := json.Marshal(msg) + if err != nil { + w.logger.Error("error while rendering message payload to JSON", zap.Error(err)) + return + } + if len(payload) == 2 { + // This is an empty JSON message. We can leave it out. + _, err = fmt.Fprintf(writer, "%s!", msg.ServerType()) + if err != nil { + w.logger.Error("error while writing command-only message", zap.Error(err)) + } + } else { + // Because we need to send this, we put in a space instead of an exclamation mark. + _, err = fmt.Fprintf(writer, "%s %s", msg.ServerType(), payload) + if err != nil { + w.logger.Error("error while writing command-only message", zap.Error(err)) + } + } +} + +func (w *writer) sendClose(msg SocketClosed) { + w.logger.Debug("Shutting down the writer channel") + close(w.channel) + w.channel = nil + w.logger.Debug("Writing close message") + err := w.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(int(msg.Code), msg.Text), time.Now().Add(ControlTimeLimit)) + if err != nil { + w.logger.Error("Error while sending close", zap.Error(err)) + } +} + +func (w *writer) sendPing() { + w.logger.Debug("Sending ping") + err := w.conn.WriteControl(websocket.PingMessage, []byte("are you still there?"), time.Now().Add(ControlTimeLimit)) + if err != nil { + w.logger.Error("Error while sending ping", zap.Error(err)) + } +} + +// gracefulShutdown causes the writer to wait for the close handshake to finish and then shut down. +// It waits for the reader's readNotifications to close, indicating that it has also shut down, and for the channel to +// receive a SocketClosed message indicating that the client has shut down. +// During this time, the writer ignores all other messages from the channel and sends no pings. +func (w *writer) gracefulShutdown() { + defer w.finalShutdown() + w.logger.Debug("Waiting for all channels to shut down") + w.timer.Stop() + for { + if w.channel == nil && w.readNotifications == nil { + w.logger.Debug("All channels closed, beginning final shutdown") + // all done, we outta here, let the defer pick up the final shutdown + return + } + select { + case _, open := <-w.readNotifications: + if !open { + w.logger.Debug("Received reader close while shutting down") + w.readNotifications = nil + } + case raw := <-w.channel: + switch msg := raw.(type) { + case SocketClosed: + w.logger.Debug("Received close message from channel while shutting down, forwarding", zap.Object("msg", msg)) + w.sendClose(msg) + default: + w.logger.Debug("Ignoring non-close message while shutting down", zap.Object("msg", msg)) + } + } + } +} + +func (w *writer) finalShutdown() { + w.logger.Debug("Closing WebSocket connection") + err := w.conn.Close() + if err != nil { + w.logger.Error("Received an error while closing", zap.Error(err)) + } + w.logger.Debug("Shut down") +}