package websocket import ( "bytes" "encoding/json" "fmt" "github.com/gorilla/websocket" "go.uber.org/zap" "time" ) type reader struct { // conn is the connection to the client read from by the reader. conn *websocket.Conn // channel is the channel that the reader sends messages it has received on. channel chan ClientCommand // readNotifications is the channel that alerts are sent to the writer on, to let it know to readNotifications chan<- time.Time // logger is the logger used to record the state of the reader, primarily in Debug level. logger *zap.Logger } // InvalidMessageType is placed in ClientMalformed when the type of the WebSocket message was incorrect. type InvalidMessageType struct { MessageType int } func (i InvalidMessageType) Error() string { return fmt.Sprintf("invalid message type %d", i.MessageType) } // InvalidCommandType is placed in ClientMalformed when the type of the command was unknown. type InvalidCommandType struct { CommandType ClientCommandType } func (i InvalidCommandType) Error() string { return fmt.Sprintf("invalid command type %s", i.CommandType) } // InvalidPayload is placed in ClientMalformed when the payload could not be parsed. type InvalidPayload struct { CommandType ClientCommandType Cause error } func (i InvalidPayload) Error() string { return fmt.Sprintf("command type %s had invalid payload: %s", i.CommandType, i.Cause) } func (i InvalidPayload) Unwrap() error { return i.Cause } // act contains the main read loop of the reader. func (r *reader) act() { defer r.shutdown() // Our first deadline starts immediately, so let the writer know. r.updateDeadlines() r.conn.SetPongHandler(func(appData string) error { r.logger.Debug("Received pong, extending read deadline") r.updateDeadlines() if appData != PingData { r.logger.Warn("Got unexpected data in the pong", zap.String("appData", appData)) } return nil }) for { messageType, messageData, err := r.conn.ReadMessage() if err != nil { var closure SocketClosed if websocket.IsCloseError(err, StandardClientCloseTypes...) { typedErr := err.(*websocket.CloseError) r.logger.Debug("Received normal close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text)) closure = SocketClosed { Code: typedErr.Code, Text: typedErr.Text } } else if websocket.IsUnexpectedCloseError(err, StandardClientCloseTypes...) { typedErr := err.(*websocket.CloseError) r.logger.Warn("Received unexpected close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text)) closure = SocketClosed { Code: typedErr.Code, Text: typedErr.Text } } else { r.logger.Error("Error while reading message, shutting down", zap.Error(err)) closure = SocketClosed { Error: err } } r.logger.Debug("Sending close message to reader", zap.Object("closeMessage", closure)) r.channel <- closure // We must exit now - errors from this method are permanent, after all. // We'll do the shutdown we deferred. return } r.updateDeadlines() r.channel <- r.parseCommand(messageType, messageData) } } // parseCommand attempts to parse the incoming message func (r *reader) parseCommand(socketType int, data []byte) ClientCommand { if socketType != websocket.TextMessage { err := &InvalidMessageType{ MessageType: socketType, } r.logger.Error("Received command with unknown WebSocket message type", zap.Error(err)) return ClientMalformed{ Error: err, } } r.logger.Debug("Received command, parse") parts := bytes.SplitN(data, []byte(" "), 2) commandBytes := parts[0] payloadJson := parts[1] var command ClientCommandType if len(payloadJson) == 0 { // Since there's no payload, we expect the command to end with an exclamation point. if bytes.HasSuffix(commandBytes, []byte("!")) { command = ClientCommandType(bytes.TrimSuffix(commandBytes, []byte("!"))) } else { r.logger.Warn("Received command not fitting the protocol: has no payload but no ! after command type") command = ClientCommandType(commandBytes) } } else { command = ClientCommandType(commandBytes) } switch command { case ClientHelloType: hello := &ClientHello{} err := json.Unmarshal(payloadJson, hello) if err != nil { return ClientMalformed{ Error: &InvalidPayload{ CommandType: command, Cause: err, }, } } return hello case ClientRefreshType: if len(payloadJson) != 0 { r.logger.Warn("Received command not fitting the protocol: has payload for payloadless Refresh command") } refresh := &ClientRefresh{} return refresh case ClientActType: act := &ClientAct{} err := json.Unmarshal(payloadJson, act) if err != nil { return ClientMalformed{ Error: &InvalidPayload{ CommandType: command, Cause: err, }, } } return act default: return ClientMalformed{ Error: InvalidCommandType{CommandType: command}, } } } // updateDeadlines extends the time limit for pongs, and instructs the writer to hold off on sending a ping for the next PingDelay. func (r *reader) updateDeadlines() { receivedAt := time.Now() r.logger.Debug("Alerting writer to extend ping timer", zap.Time("receivedAt", receivedAt)) r.readNotifications <- receivedAt newDeadline := receivedAt.Add(ReadTimeLimit) r.logger.Debug("Extending read deadline", zap.Time("newDeadline", newDeadline)) err := r.conn.SetReadDeadline(newDeadline) if err != nil { r.logger.Error("Error while extending read deadline", zap.Error(err)) } r.logger.Debug("Read deadline extended") } // shutdown closes all resources associated with the reader (the channel and readNotifications) but leaves its conn running. func (r *reader) shutdown() { close(r.channel) r.channel = nil close(r.readNotifications) r.readNotifications = nil r.conn = nil }