parent
34e64c7130
commit
ab29ec20be
@ -0,0 +1,193 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"git.reya.zone/reya/hexmap/server/room" |
||||
"git.reya.zone/reya/hexmap/server/state" |
||||
"git.reya.zone/reya/hexmap/server/ws" |
||||
"github.com/gorilla/websocket" |
||||
"go.uber.org/zap" |
||||
"google.golang.org/protobuf/proto" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"os" |
||||
"path/filepath" |
||||
"strconv" |
||||
"time" |
||||
) |
||||
|
||||
const SaveDir = "/home/reya/hexmaps" |
||||
|
||||
func save(m state.HexMap, l *zap.Logger) error { |
||||
filename := filepath.Join(SaveDir, "map."+strconv.FormatInt(time.Now().Unix(), 16)) |
||||
l.Debug("Saving to file", zap.String("filename", filename)) |
||||
marshaled, err := proto.Marshal(m.ToPB()) |
||||
l.Debug("Marshaled proto") |
||||
if err != nil { |
||||
return err |
||||
} |
||||
l.Debug("Opening file") |
||||
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0x644) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
l.Debug("Writing to file") |
||||
_, err = file.Write(marshaled) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
l.Debug("Closing file") |
||||
err = file.Close() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
l.Info("Saved to file", zap.String("filename", filename)) |
||||
return nil |
||||
} |
||||
|
||||
func load(l *zap.Logger) (*state.HexMap, error) { |
||||
filename := filepath.Join(SaveDir, "map.LOAD") |
||||
l.Debug("Loading from file", zap.String("filename", filename)) |
||||
file, err := os.Open(filename) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
l.Debug("Reading file") |
||||
marshaled, err := ioutil.ReadAll(file) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
pb := &state.HexMapPB{} |
||||
l.Debug("Extracting protobuf from file") |
||||
err = proto.Unmarshal(marshaled, pb) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
l.Debug("Closing file") |
||||
err = file.Close() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
m, err := pb.ToGo() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
l.Info("Loaded from file", zap.String("filename", filename)) |
||||
return &m, nil |
||||
} |
||||
|
||||
func BackupMap(client *room.Client, l *zap.Logger) { |
||||
var err error |
||||
myState := &state.Synced{} |
||||
l.Info("Starting backup system") |
||||
for { |
||||
msg := <-client.IncomingChannel() |
||||
switch typedMsg := msg.(type) { |
||||
case *room.JoinResponse: |
||||
myState = typedMsg.CurrentState() |
||||
err := save(myState.Map, l) |
||||
if err != nil { |
||||
l.Error("Failed saving during join response", zap.Error(err)) |
||||
} |
||||
case *room.ActionBroadcast: |
||||
err = typedMsg.Action().Apply(myState) |
||||
if err == nil { |
||||
err = save(myState.Map, l) |
||||
if err != nil { |
||||
l.Error("Failed saving during action broadcast", zap.Error(err)) |
||||
} |
||||
} |
||||
case *room.ShutdownRequest: |
||||
client.OutgoingChannel() <- client.AcknowledgeShutdown() |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func ServeWS(logger *zap.Logger) (err error) { |
||||
m := http.NewServeMux() |
||||
httpLogger := logger.Named("HTTP") |
||||
hexes, err := load(logger) |
||||
if err != nil { |
||||
hexes = state.NewHexMap(state.Layout{ |
||||
Orientation: state.PointyTop, |
||||
IndentedLines: state.EvenLines, |
||||
}, 25, 10) |
||||
} |
||||
rm := room.New(room.NewOptions{ |
||||
BaseLogger: logger.Named("Room"), |
||||
StartingState: &state.Synced{ |
||||
Map: *hexes, |
||||
User: state.UserState{ |
||||
ActiveColor: state.Color{ |
||||
R: 0, |
||||
G: 0, |
||||
B: 0, |
||||
A: 255, |
||||
}, |
||||
}, |
||||
}, |
||||
StartingClientOptions: room.NewClientOptions{ |
||||
IncomingChannel: nil, |
||||
AcceptBroadcasts: true, |
||||
RequestStartingState: true, |
||||
}, |
||||
}) |
||||
go BackupMap(rm, logger.Named("BackupMap")) |
||||
m.Handle("/map", &ws.HTTPHandler{ |
||||
Upgrader: websocket.Upgrader{ |
||||
Subprotocols: []string{"v1.hexmap.deliciousreya.net"}, |
||||
CheckOrigin: func(r *http.Request) bool { |
||||
return r.Header.Get("Origin") == "https://hexmap.deliciousreya.net" |
||||
}, |
||||
}, |
||||
Logger: logger.Named("WS"), |
||||
Room: rm, |
||||
}) |
||||
srv := http.Server{ |
||||
Addr: "127.0.0.1:5238", |
||||
Handler: m, |
||||
ErrorLog: zap.NewStdLog(httpLogger), |
||||
} |
||||
m.HandleFunc("/exit", func(writer http.ResponseWriter, request *http.Request) { |
||||
// Some light dissuasion of accidental probing.
|
||||
// To keep good people out.
|
||||
if request.FormValue("superSecretPassword") != "Gesture/Retrial5/Untrained/Countable/Extrude/Jeep/Cheese/Carbon" { |
||||
writer.WriteHeader(403) |
||||
_, err = writer.Write([]byte("... What are you trying to pull?")) |
||||
return |
||||
} |
||||
writer.WriteHeader(200) |
||||
_, err := writer.Write([]byte("OK, shutting down, bye!")) |
||||
if err != nil { |
||||
logger.Warn("Error while writing goodbye response", zap.Error(err)) |
||||
} |
||||
time.AfterFunc(500*time.Millisecond, func() { |
||||
err := srv.Shutdown(context.Background()) |
||||
if err != nil { |
||||
logger.Error("Error while shutting down the server", zap.Error(err)) |
||||
} |
||||
}) |
||||
}) |
||||
err = srv.ListenAndServe() |
||||
if err != nil && err != http.ErrServerClosed { |
||||
return err |
||||
} |
||||
rm.OutgoingChannel() <- rm.Stop() |
||||
for { |
||||
msg := <-rm.IncomingChannel() |
||||
switch msg.(type) { |
||||
case *room.ShutdownRequest: |
||||
rm.OutgoingChannel() <- rm.AcknowledgeShutdown() |
||||
return nil |
||||
} |
||||
} |
||||
} |
||||
|
||||
func main() { |
||||
logger, err := zap.NewDevelopment() |
||||
err = ServeWS(logger) |
||||
if err != nil { |
||||
logger.Fatal("Error while serving HTTP", zap.Error(err)) |
||||
} |
||||
} |
@ -1,37 +0,0 @@ |
||||
package websocket |
||||
|
||||
import ( |
||||
"github.com/gorilla/websocket" |
||||
"time" |
||||
) |
||||
|
||||
const ( |
||||
// ReadTimeLimit is the maximum time the server is willing to wait after receiving a message before receiving another one.
|
||||
ReadTimeLimit = 60 * time.Second |
||||
// WriteTimeLimit is the maximum time the server is willing to wait to send a message.
|
||||
WriteTimeLimit = 10 * time.Second |
||||
// ControlTimeLimit is the maximum time the server is willing to wait to send a control message like Ping or Close.
|
||||
ControlTimeLimit = (WriteTimeLimit * 5) / 10 |
||||
// PingDelay is the time between pings.
|
||||
// It must be less than ReadTimeLimit to account for latency and delays on either side.
|
||||
PingDelay = (ReadTimeLimit * 7) / 10 |
||||
) |
||||
|
||||
// A Connection corresponds to a pair of actors.
|
||||
type Connection struct { |
||||
conn *websocket.Conn |
||||
r reader |
||||
w writer |
||||
} |
||||
|
||||
// ReadChannel returns the channel that can be used to read client messages from the connection.
|
||||
// After receiving SocketClosed, the reader will close its channel.
|
||||
func (c *Connection) ReadChannel() <-chan ClientCommand { |
||||
return c.r.channel |
||||
} |
||||
|
||||
// WriteChannel returns the channel that can be used to send server messages on the connection.
|
||||
// After sending SocketClosed, the writer will close its channel; do not send any further messages on the channel.
|
||||
func (c *Connection) WriteChannel() chan<- ServerCommand { |
||||
return c.w.channel |
||||
} |
@ -0,0 +1,276 @@ |
||||
package ws |
||||
|
||||
import ( |
||||
"git.reya.zone/reya/hexmap/server/action" |
||||
"git.reya.zone/reya/hexmap/server/room" |
||||
"github.com/gorilla/websocket" |
||||
"go.uber.org/zap" |
||||
"net/http" |
||||
"time" |
||||
) |
||||
|
||||
const ( |
||||
// ReadTimeLimit is the maximum time the server is willing to wait after receiving a message before receiving another one.
|
||||
ReadTimeLimit = 60 * time.Second |
||||
// WriteTimeLimit is the maximum time the server is willing to wait to send a message.
|
||||
WriteTimeLimit = 10 * time.Second |
||||
// ControlTimeLimit is the maximum time the server is willing to wait to send a control message like Ping or Close.
|
||||
ControlTimeLimit = (WriteTimeLimit * 5) / 10 |
||||
// PingDelay is the time between pings.
|
||||
// It must be less than ReadTimeLimit to account for latency and delays on either side.
|
||||
PingDelay = (ReadTimeLimit * 7) / 10 |
||||
) |
||||
|
||||
type HTTPHandler struct { |
||||
Upgrader websocket.Upgrader |
||||
Logger *zap.Logger |
||||
Room *room.Client |
||||
} |
||||
|
||||
func destroyBadProtocolSocket(c *websocket.Conn, logger *zap.Logger) { |
||||
err := c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseProtocolError, "Invalid subprotocols"), time.Now().Add(ControlTimeLimit)) |
||||
if err != nil { |
||||
logger.Error("Failed to write close message") |
||||
} |
||||
err = c.SetReadDeadline(time.Now().Add(ControlTimeLimit)) |
||||
if err != nil { |
||||
logger.Error("Failed to set read deadline") |
||||
} |
||||
for { |
||||
_, _, err := c.ReadMessage() |
||||
if err != nil { |
||||
if !websocket.IsCloseError(err, websocket.CloseProtocolError) { |
||||
logger.Error("Websocket connection shut down ignominiously", zap.Error(err)) |
||||
} |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (h *HTTPHandler) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) { |
||||
c, err := h.Upgrader.Upgrade(responseWriter, request, http.Header{}) |
||||
if err != nil { |
||||
h.Logger.Error("Failed to upgrade ws connection", zap.Error(err)) |
||||
return |
||||
} |
||||
if c.Subprotocol() == "" { |
||||
h.Logger.Error("No matching subprotocol", zap.String("clientProtocols", request.Header.Get("Sec-Websocket-Protocol"))) |
||||
go destroyBadProtocolSocket(c, h.Logger) |
||||
return |
||||
} |
||||
result := NewConnection(c, h.Logger.Named("Connection")) |
||||
exchange(result, h.Logger.Named("Link"), func(o room.NewClientOptions) *room.Client { |
||||
return h.Room.NewClient(o) |
||||
}) |
||||
} |
||||
|
||||
func exchange(c *Connection, l *zap.Logger, clientMaker func(options room.NewClientOptions) *room.Client) { |
||||
wsr := c.ReadChannel() |
||||
wsw := c.WriteChannel() |
||||
|
||||
l.Info("Connection established") |
||||
closeWith := &SocketClosed{ |
||||
Code: websocket.CloseAbnormalClosure, |
||||
Text: "I don't know what happened. But goodbye!", |
||||
} |
||||
defer func() { |
||||
l.Info("Shutting down") |
||||
// Wait for the websocket connection to shut down.
|
||||
wsw <- closeWith |
||||
if wsr != nil { |
||||
for { |
||||
msg := <-wsr |
||||
switch msg.(type) { |
||||
case *SocketClosed: |
||||
return |
||||
} |
||||
} |
||||
} |
||||
}() |
||||
// State 1: Waiting for a hello.
|
||||
// Anything else is death.
|
||||
cmd := <-wsr |
||||
switch typedCmd := cmd.(type) { |
||||
case *ClientHello: |
||||
if typedCmd.Version != ProtocolVersion { |
||||
l.Warn("Bad Hello version") |
||||
// Disgusting. I can't even look at you.
|
||||
closeWith = &SocketClosed{ |
||||
Code: websocket.CloseProtocolError, |
||||
Text: "Wrong protocol version", |
||||
} |
||||
return |
||||
} |
||||
l.Info("Got Hello") |
||||
default: |
||||
l.Warn("Got NON-hello") |
||||
closeWith = &SocketClosed{ |
||||
Code: websocket.CloseProtocolError, |
||||
Text: "You don't even say hello?", |
||||
} |
||||
return |
||||
} |
||||
l.Info("Waiting for room.") |
||||
// State 2: Waiting for the room to notice us.
|
||||
rm := clientMaker(room.NewClientOptions{ |
||||
AcceptBroadcasts: true, |
||||
RequestStartingState: true, |
||||
}) |
||||
rmr := rm.IncomingChannel() |
||||
rmw := rm.OutgoingChannel() |
||||
var leaveWith room.ClientMessage = nil |
||||
defer func() { |
||||
l.Info("Leaving room") |
||||
if leaveWith == nil { |
||||
leaveWith = rm.Leave() |
||||
} |
||||
rmw <- leaveWith |
||||
if _, ok := leaveWith.(*room.ShutdownResponse); ok { |
||||
// The room was already shutting down.
|
||||
return |
||||
} |
||||
for { |
||||
msg := <-rmr |
||||
switch msg.(type) { |
||||
case *room.LeaveResponse: |
||||
return |
||||
case *room.ShutdownRequest: |
||||
rmw <- rm.AcknowledgeShutdown() |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
l.Info("Waiting for JoinResponse") |
||||
msg := <-rmr |
||||
switch typedMsg := msg.(type) { |
||||
case *room.JoinResponse: |
||||
l.Info("Got JoinResponse") |
||||
wsw <- &ServerHello{ |
||||
Version: ProtocolVersion, |
||||
State: typedMsg.CurrentState(), |
||||
} |
||||
case *room.ShutdownRequest: |
||||
l.Info("Got ShutdownRequest") |
||||
// Room was shutting down when we joined, oops!
|
||||
closeWith = &SocketClosed{ |
||||
Code: websocket.CloseGoingAway, |
||||
Text: "Shutting down right as you joined. Sorry!", |
||||
} |
||||
return |
||||
default: |
||||
l.Info("Got non-JoinResponse/ShutdownRequest") |
||||
// Uh. That's concerning. We don't have anything to send our client.
|
||||
// Let's just give up.
|
||||
return |
||||
} |
||||
l.Info("Waiting for messages") |
||||
for { |
||||
select { |
||||
case cmd := <-wsr: |
||||
switch typedCmd := cmd.(type) { |
||||
case *ClientHello: |
||||
l.Info("Got unnecessary ClientHello") |
||||
// Huh???
|
||||
closeWith = &SocketClosed{ |
||||
Code: websocket.CloseProtocolError, |
||||
Text: "Enough hellos. Goodbye.", |
||||
} |
||||
return |
||||
case *ClientRefresh: |
||||
l.Info("Got ClientRefresh") |
||||
rmw <- rm.Refresh() |
||||
case *ClientAct: |
||||
l.Info("Got ClientAct") |
||||
for _, act := range typedCmd.Actions { |
||||
rmw <- rm.Apply(act) |
||||
} |
||||
case *SocketClosed: |
||||
l.Info("Got SocketClosed", zap.Object("close", typedCmd)) |
||||
closeWith = typedCmd |
||||
return |
||||
case *ClientMalformed: |
||||
l.Warn("Got ClientMalformed") |
||||
return |
||||
} |
||||
case msg := <-rmr: |
||||
switch typedMsg := msg.(type) { |
||||
case *room.JoinResponse: |
||||
// Huh????
|
||||
l.Info("Got unnecesary JoinResponse") |
||||
return |
||||
case *room.RefreshResponse: |
||||
l.Info("Got RefreshResponse") |
||||
wsw <- &ServerRefresh{ |
||||
State: typedMsg.CurrentState(), |
||||
} |
||||
case *room.ApplyResponse: |
||||
l.Info("Got ApplyResponse") |
||||
if typedMsg.Success() { |
||||
wsw <- &ServerOK{ |
||||
IDs: []uint32{typedMsg.ActionID()}, |
||||
} |
||||
} else { |
||||
wsw <- &ServerFailed{ |
||||
IDs: []uint32{typedMsg.ActionID()}, |
||||
Error: typedMsg.Failure().Error(), |
||||
} |
||||
} |
||||
case *room.ActionBroadcast: |
||||
l.Info("Got ActionBroadcast") |
||||
wsw <- &ServerAct{ |
||||
Actions: action.ServerSlice{typedMsg.Action()}, |
||||
} |
||||
case *room.LeaveResponse: |
||||
l.Info("Got odd LeaveResponse") |
||||
// Oh. u_u I wasn't- okay.
|
||||
return |
||||
case *room.ShutdownRequest: |
||||
// Oh. Oh! Okay! Sorry!
|
||||
l.Info("Got ShutdownRequest") |
||||
leaveWith = rm.AcknowledgeShutdown() |
||||
return |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// A Connection corresponds to a pair of actors.
|
||||
type Connection struct { |
||||
r reader |
||||
w writer |
||||
} |
||||
|
||||
func NewConnection(conn *websocket.Conn, logger *zap.Logger) *Connection { |
||||
readChan := make(chan time.Time) |
||||
out := &Connection{ |
||||
r: reader{ |
||||
conn: conn, |
||||
channel: make(chan ClientCommand), |
||||
readNotifications: readChan, |
||||
logger: logger.Named("reader"), |
||||
}, |
||||
w: writer{ |
||||
conn: conn, |
||||
channel: make(chan ServerCommand), |
||||
readNotifications: readChan, |
||||
timer: nil, |
||||
nextPingAt: time.Time{}, |
||||
logger: logger.Named("writer"), |
||||
}, |
||||
} |
||||
go out.r.act() |
||||
go out.w.act() |
||||
return out |
||||
} |
||||
|
||||
// ReadChannel returns the channel that can be used to read client messages from the connection.
|
||||
// After receiving SocketClosed, the reader will close its channel.
|
||||
func (c *Connection) ReadChannel() <-chan ClientCommand { |
||||
return c.r.channel |
||||
} |
||||
|
||||
// WriteChannel returns the channel that can be used to send server messages on the connection.
|
||||
// After sending SocketClosed, the writer will close its channel; do not send any further messages on the channel.
|
||||
func (c *Connection) WriteChannel() chan<- ServerCommand { |
||||
return c.w.channel |
||||
} |
@ -1,4 +1,4 @@ |
||||
package websocket |
||||
package ws |
||||
|
||||
import ( |
||||
"git.reya.zone/reya/hexmap/server/action" |
Loading…
Reference in new issue