You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
hexmap/server/websocket/reader.go

181 lines
5.7 KiB

package websocket
import (
"bytes"
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"go.uber.org/zap"
"time"
)
type reader struct {
// conn is the connection to the client read from by the reader.
conn *websocket.Conn
// channel is the channel that the reader sends messages it has received on.
channel chan ClientCommand
// readNotifications is the channel that alerts are sent to the writer on, to let it know to
readNotifications chan<- time.Time
// logger is the logger used to record the state of the reader, primarily in Debug level.
logger *zap.Logger
}
// InvalidMessageType is placed in ClientMalformed when the type of the WebSocket message was incorrect.
type InvalidMessageType struct {
MessageType int
}
func (i InvalidMessageType) Error() string {
return fmt.Sprintf("invalid message type %d", i.MessageType)
}
// InvalidCommandType is placed in ClientMalformed when the type of the command was unknown.
type InvalidCommandType struct {
CommandType ClientCommandType
}
func (i InvalidCommandType) Error() string {
return fmt.Sprintf("invalid command type %s", i.CommandType)
}
// InvalidPayload is placed in ClientMalformed when the payload could not be parsed.
type InvalidPayload struct {
CommandType ClientCommandType
Cause error
}
func (i InvalidPayload) Error() string {
return fmt.Sprintf("command type %s had invalid payload: %s", i.CommandType, i.Cause)
}
func (i InvalidPayload) Unwrap() error {
return i.Cause
}
// act contains the main read loop of the reader.
func (r *reader) act() {
defer r.shutdown()
// Our first deadline starts immediately, so let the writer know.
r.updateDeadlines()
r.conn.SetPongHandler(func(appData string) error {
r.logger.Debug("Received pong, extending read deadline")
r.updateDeadlines()
if appData != PingData {
r.logger.Warn("Got unexpected data in the pong", zap.String("appData", appData))
}
return nil
})
for {
messageType, messageData, err := r.conn.ReadMessage()
if err != nil {
var closure SocketClosed
if websocket.IsCloseError(err, StandardClientCloseTypes...) {
typedErr := err.(*websocket.CloseError)
r.logger.Debug("Received normal close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text))
closure = SocketClosed { Code: typedErr.Code, Text: typedErr.Text }
} else if websocket.IsUnexpectedCloseError(err, StandardClientCloseTypes...) {
typedErr := err.(*websocket.CloseError)
r.logger.Warn("Received unexpected close message, shutting down", zap.Int("code", typedErr.Code), zap.String("text", typedErr.Text))
closure = SocketClosed { Code: typedErr.Code, Text: typedErr.Text }
} else {
r.logger.Error("Error while reading message, shutting down", zap.Error(err))
closure = SocketClosed { Error: err }
}
r.logger.Debug("Sending close message to reader", zap.Object("closeMessage", closure))
r.channel <- closure
// We must exit now - errors from this method are permanent, after all.
// We'll do the shutdown we deferred.
return
}
r.updateDeadlines()
r.channel <- r.parseCommand(messageType, messageData)
}
}
// parseCommand attempts to parse the incoming message
func (r *reader) parseCommand(socketType int, data []byte) ClientCommand {
if socketType != websocket.TextMessage {
err := &InvalidMessageType{
MessageType: socketType,
}
r.logger.Error("Received command with unknown WebSocket message type", zap.Error(err))
return ClientMalformed{
Error: err,
}
}
r.logger.Debug("Received command, parse")
parts := bytes.SplitN(data, []byte(" "), 2)
commandBytes := parts[0]
payloadJson := parts[1]
var command ClientCommandType
if len(payloadJson) == 0 {
// Since there's no payload, we expect the command to end with an exclamation point.
if bytes.HasSuffix(commandBytes, []byte("!")) {
command = ClientCommandType(bytes.TrimSuffix(commandBytes, []byte("!")))
} else {
r.logger.Warn("Received command not fitting the protocol: has no payload but no ! after command type")
command = ClientCommandType(commandBytes)
}
} else {
command = ClientCommandType(commandBytes)
}
switch command {
case ClientHelloType:
hello := &ClientHello{}
err := json.Unmarshal(payloadJson, hello)
if err != nil {
return ClientMalformed{
Error: &InvalidPayload{
CommandType: command,
Cause: err,
},
}
}
return hello
case ClientRefreshType:
if len(payloadJson) != 0 {
r.logger.Warn("Received command not fitting the protocol: has payload for payloadless Refresh command")
}
refresh := &ClientRefresh{}
return refresh
case ClientActType:
act := &ClientAct{}
err := json.Unmarshal(payloadJson, act)
if err != nil {
return ClientMalformed{
Error: &InvalidPayload{
CommandType: command,
Cause: err,
},
}
}
return act
default:
return ClientMalformed{
Error: InvalidCommandType{CommandType: command},
}
}
}
// updateDeadlines extends the time limit for pongs, and instructs the writer to hold off on sending a ping for the next PingDelay.
func (r *reader) updateDeadlines() {
receivedAt := time.Now()
r.logger.Debug("Alerting writer to extend ping timer", zap.Time("receivedAt", receivedAt))
r.readNotifications <- receivedAt
newDeadline := receivedAt.Add(ReadTimeLimit)
r.logger.Debug("Extending read deadline", zap.Time("newDeadline", newDeadline))
err := r.conn.SetReadDeadline(newDeadline)
if err != nil {
r.logger.Error("Error while extending read deadline", zap.Error(err))
}
r.logger.Debug("Read deadline extended")
}
// shutdown closes all resources associated with the reader (the channel and readNotifications) but leaves its conn running.
func (r *reader) shutdown() {
close(r.channel)
r.channel = nil
close(r.readNotifications)
r.readNotifications = nil
r.conn = nil
}