You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
276 lines
7.4 KiB
276 lines
7.4 KiB
package ws
|
|
|
|
import (
|
|
"git.reya.zone/reya/hexmap/server/action"
|
|
"git.reya.zone/reya/hexmap/server/room"
|
|
"github.com/gorilla/websocket"
|
|
"go.uber.org/zap"
|
|
"net/http"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
// ReadTimeLimit is the maximum time the server is willing to wait after receiving a message before receiving another one.
|
|
ReadTimeLimit = 60 * time.Second
|
|
// WriteTimeLimit is the maximum time the server is willing to wait to send a message.
|
|
WriteTimeLimit = 10 * time.Second
|
|
// ControlTimeLimit is the maximum time the server is willing to wait to send a control message like Ping or Close.
|
|
ControlTimeLimit = (WriteTimeLimit * 5) / 10
|
|
// PingDelay is the time between pings.
|
|
// It must be less than ReadTimeLimit to account for latency and delays on either side.
|
|
PingDelay = (ReadTimeLimit * 7) / 10
|
|
)
|
|
|
|
type HTTPHandler struct {
|
|
Upgrader websocket.Upgrader
|
|
Logger *zap.Logger
|
|
Room *room.Client
|
|
}
|
|
|
|
func destroyBadProtocolSocket(c *websocket.Conn, logger *zap.Logger) {
|
|
err := c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseProtocolError, "Invalid subprotocols"), time.Now().Add(ControlTimeLimit))
|
|
if err != nil {
|
|
logger.Error("Failed to write close message")
|
|
}
|
|
err = c.SetReadDeadline(time.Now().Add(ControlTimeLimit))
|
|
if err != nil {
|
|
logger.Error("Failed to set read deadline")
|
|
}
|
|
for {
|
|
_, _, err := c.ReadMessage()
|
|
if err != nil {
|
|
if !websocket.IsCloseError(err, websocket.CloseProtocolError) {
|
|
logger.Error("Websocket connection shut down ignominiously", zap.Error(err))
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *HTTPHandler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
|
|
c, err := h.Upgrader.Upgrade(responseWriter, request, http.Header{})
|
|
if err != nil {
|
|
h.Logger.Error("Failed to upgrade ws connection", zap.Error(err))
|
|
return
|
|
}
|
|
if c.Subprotocol() == "" {
|
|
h.Logger.Error("No matching subprotocol", zap.String("clientProtocols", request.Header.Get("Sec-Websocket-Protocol")))
|
|
go destroyBadProtocolSocket(c, h.Logger)
|
|
return
|
|
}
|
|
result := NewConnection(c, h.Logger.Named("Connection"))
|
|
exchange(result, h.Logger.Named("Link"), func(o room.NewClientOptions) *room.Client {
|
|
return h.Room.NewClient(o)
|
|
})
|
|
}
|
|
|
|
func exchange(c *Connection, l *zap.Logger, clientMaker func(options room.NewClientOptions) *room.Client) {
|
|
wsr := c.ReadChannel()
|
|
wsw := c.WriteChannel()
|
|
|
|
l.Info("Connection established")
|
|
closeWith := &SocketClosed{
|
|
Code: websocket.CloseAbnormalClosure,
|
|
Text: "I don't know what happened. But goodbye!",
|
|
}
|
|
defer func() {
|
|
l.Info("Shutting down")
|
|
// Wait for the websocket connection to shut down.
|
|
wsw <- closeWith
|
|
if wsr != nil {
|
|
for {
|
|
msg := <-wsr
|
|
switch msg.(type) {
|
|
case *SocketClosed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
// State 1: Waiting for a hello.
|
|
// Anything else is death.
|
|
cmd := <-wsr
|
|
switch typedCmd := cmd.(type) {
|
|
case *ClientHello:
|
|
if typedCmd.Version != ProtocolVersion {
|
|
l.Warn("Bad Hello version")
|
|
// Disgusting. I can't even look at you.
|
|
closeWith = &SocketClosed{
|
|
Code: websocket.CloseProtocolError,
|
|
Text: "Wrong protocol version",
|
|
}
|
|
return
|
|
}
|
|
l.Info("Got Hello")
|
|
default:
|
|
l.Warn("Got NON-hello")
|
|
closeWith = &SocketClosed{
|
|
Code: websocket.CloseProtocolError,
|
|
Text: "You don't even say hello?",
|
|
}
|
|
return
|
|
}
|
|
l.Info("Waiting for room.")
|
|
// State 2: Waiting for the room to notice us.
|
|
rm := clientMaker(room.NewClientOptions{
|
|
AcceptBroadcasts: true,
|
|
RequestStartingState: true,
|
|
})
|
|
rmr := rm.IncomingChannel()
|
|
rmw := rm.OutgoingChannel()
|
|
var leaveWith room.ClientMessage = nil
|
|
defer func() {
|
|
l.Info("Leaving room")
|
|
if leaveWith == nil {
|
|
leaveWith = rm.Leave()
|
|
}
|
|
rmw <- leaveWith
|
|
if _, ok := leaveWith.(*room.ShutdownResponse); ok {
|
|
// The room was already shutting down.
|
|
return
|
|
}
|
|
for {
|
|
msg := <-rmr
|
|
switch msg.(type) {
|
|
case *room.LeaveResponse:
|
|
return
|
|
case *room.ShutdownRequest:
|
|
rmw <- rm.AcknowledgeShutdown()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
l.Info("Waiting for JoinResponse")
|
|
msg := <-rmr
|
|
switch typedMsg := msg.(type) {
|
|
case *room.JoinResponse:
|
|
l.Info("Got JoinResponse")
|
|
wsw <- &ServerHello{
|
|
Version: ProtocolVersion,
|
|
State: typedMsg.CurrentState(),
|
|
}
|
|
case *room.ShutdownRequest:
|
|
l.Info("Got ShutdownRequest")
|
|
// Room was shutting down when we joined, oops!
|
|
closeWith = &SocketClosed{
|
|
Code: websocket.CloseGoingAway,
|
|
Text: "Shutting down right as you joined. Sorry!",
|
|
}
|
|
return
|
|
default:
|
|
l.Info("Got non-JoinResponse/ShutdownRequest")
|
|
// Uh. That's concerning. We don't have anything to send our client.
|
|
// Let's just give up.
|
|
return
|
|
}
|
|
l.Info("Waiting for messages")
|
|
for {
|
|
select {
|
|
case cmd := <-wsr:
|
|
switch typedCmd := cmd.(type) {
|
|
case *ClientHello:
|
|
l.Info("Got unnecessary ClientHello")
|
|
// Huh???
|
|
closeWith = &SocketClosed{
|
|
Code: websocket.CloseProtocolError,
|
|
Text: "Enough hellos. Goodbye.",
|
|
}
|
|
return
|
|
case *ClientRefresh:
|
|
l.Info("Got ClientRefresh")
|
|
rmw <- rm.Refresh()
|
|
case *ClientAct:
|
|
l.Info("Got ClientAct")
|
|
for _, act := range typedCmd.Actions {
|
|
rmw <- rm.Apply(act)
|
|
}
|
|
case *SocketClosed:
|
|
l.Info("Got SocketClosed", zap.Object("close", typedCmd))
|
|
closeWith = typedCmd
|
|
return
|
|
case *ClientMalformed:
|
|
l.Warn("Got ClientMalformed")
|
|
return
|
|
}
|
|
case msg := <-rmr:
|
|
switch typedMsg := msg.(type) {
|
|
case *room.JoinResponse:
|
|
// Huh????
|
|
l.Info("Got unnecesary JoinResponse")
|
|
return
|
|
case *room.RefreshResponse:
|
|
l.Info("Got RefreshResponse")
|
|
wsw <- &ServerRefresh{
|
|
State: typedMsg.CurrentState(),
|
|
}
|
|
case *room.ApplyResponse:
|
|
l.Info("Got ApplyResponse")
|
|
if typedMsg.Success() {
|
|
wsw <- &ServerOK{
|
|
IDs: []uint32{typedMsg.ActionID()},
|
|
}
|
|
} else {
|
|
wsw <- &ServerFailed{
|
|
IDs: []uint32{typedMsg.ActionID()},
|
|
Error: typedMsg.Failure().Error(),
|
|
}
|
|
}
|
|
case *room.ActionBroadcast:
|
|
l.Info("Got ActionBroadcast")
|
|
wsw <- &ServerAct{
|
|
Actions: action.ServerSlice{typedMsg.Action()},
|
|
}
|
|
case *room.LeaveResponse:
|
|
l.Info("Got odd LeaveResponse")
|
|
// Oh. u_u I wasn't- okay.
|
|
return
|
|
case *room.ShutdownRequest:
|
|
// Oh. Oh! Okay! Sorry!
|
|
l.Info("Got ShutdownRequest")
|
|
leaveWith = rm.AcknowledgeShutdown()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// A Connection corresponds to a pair of actors.
|
|
type Connection struct {
|
|
r reader
|
|
w writer
|
|
}
|
|
|
|
func NewConnection(conn *websocket.Conn, logger *zap.Logger) *Connection {
|
|
readChan := make(chan time.Time)
|
|
out := &Connection{
|
|
r: reader{
|
|
conn: conn,
|
|
channel: make(chan ClientCommand),
|
|
readNotifications: readChan,
|
|
logger: logger.Named("reader"),
|
|
},
|
|
w: writer{
|
|
conn: conn,
|
|
channel: make(chan ServerCommand),
|
|
readNotifications: readChan,
|
|
timer: nil,
|
|
nextPingAt: time.Time{},
|
|
logger: logger.Named("writer"),
|
|
},
|
|
}
|
|
go out.r.act()
|
|
go out.w.act()
|
|
return out
|
|
}
|
|
|
|
// ReadChannel returns the channel that can be used to read client messages from the connection.
|
|
// After receiving SocketClosed, the reader will close its channel.
|
|
func (c *Connection) ReadChannel() <-chan ClientCommand {
|
|
return c.r.channel
|
|
}
|
|
|
|
// WriteChannel returns the channel that can be used to send server messages on the connection.
|
|
// After sending SocketClosed, the writer will close its channel; do not send any further messages on the channel.
|
|
func (c *Connection) WriteChannel() chan<- ServerCommand {
|
|
return c.w.channel
|
|
}
|
|
|