package websocket import ( "fmt" "github.com/gorilla/websocket" "go.uber.org/zap" "google.golang.org/protobuf/proto" "time" ) type reader struct { // conn is the connection to the client read from by the reader. conn *websocket.Conn // channel is the channel that the reader sends messages it has received on. channel chan ClientCommand // readNotifications is the channel that alerts are sent to the writer on, to let it know to readNotifications chan<- time.Time // logger is the logger used to record the state of the reader, primarily in Debug level. logger *zap.Logger } // InvalidMessageType is placed in ClientMalformed when the type of the WebSocket message was incorrect. type InvalidMessageType struct { MessageType int } func (i InvalidMessageType) Error() string { return fmt.Sprintf("invalid message type %d", i.MessageType) } // act contains the main read loop of the reader. func (r *reader) act() { defer r.shutdown() // Our first deadline starts immediately, so let the writer know. r.updateDeadlines() r.conn.SetPongHandler(func(appData string) error { r.logger.Debug("Received pong, extending read deadline") r.updateDeadlines() if appData != PingData { r.logger.Warn("Got unexpected data in the pong", zap.String("appData", appData)) } return nil }) for { messageType, messageData, err := r.conn.ReadMessage() if err != nil { var closure SocketClosed if websocket.IsCloseError(err, StandardClientCloseTypes...) { typedErr := err.(*websocket.CloseError) r.logger.Debug("Received normal close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text)) closure = SocketClosed{Code: typedErr.Code, Text: typedErr.Text} } else if websocket.IsUnexpectedCloseError(err, StandardClientCloseTypes...) { typedErr := err.(*websocket.CloseError) r.logger.Warn("Received unexpected close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text)) closure = SocketClosed{Code: typedErr.Code, Text: typedErr.Text} } else { r.logger.Error("Error while reading message, shutting down", zap.Error(err)) closure = SocketClosed{Error: err} } r.logger.Debug("Sending close message to reader", zap.Object("closeMessage", closure)) r.channel <- closure // We must exit now - errors from this method are permanent, after all. // We'll do the shutdown we deferred. return } r.updateDeadlines() r.channel <- r.parseCommand(messageType, messageData) } } // parseCommand attempts to parse the incoming message func (r *reader) parseCommand(socketType int, data []byte) ClientCommand { if socketType != websocket.BinaryMessage { err := &InvalidMessageType{ MessageType: socketType, } r.logger.Error("Received command with unknown WebSocket message type", zap.Error(err)) return ClientMalformed{ Error: err, } } r.logger.Debug("Received command, parsing") var cmdPb ClientCommandPB err := proto.Unmarshal(data, &cmdPb) if err != nil { return ClientMalformed{ Error: err, } } cmd, err := (&cmdPb).ToGo() if err != nil { return ClientMalformed{Error: err} } return cmd } // updateDeadlines extends the time limit for pongs, and instructs the writer to hold off on sending a ping for the next PingDelay. func (r *reader) updateDeadlines() { receivedAt := time.Now() r.logger.Debug("Alerting writer to extend ping timer", zap.Time("receivedAt", receivedAt)) r.readNotifications <- receivedAt newDeadline := receivedAt.Add(ReadTimeLimit) r.logger.Debug("Extending read deadline", zap.Time("newDeadline", newDeadline)) err := r.conn.SetReadDeadline(newDeadline) if err != nil { r.logger.Error("Error while extending read deadline", zap.Error(err)) } r.logger.Debug("Read deadline extended") } // shutdown closes all resources associated with the reader (the channel and readNotifications) but leaves its conn running. func (r *reader) shutdown() { close(r.channel) r.channel = nil close(r.readNotifications) r.readNotifications = nil r.conn = nil }