From ffa426bf695a1156a1c0f70de415247ca92f78d8 Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Wed, 5 Sep 2018 15:17:11 +0200 Subject: [PATCH] identify control messages --- transport/transport.go | 13 ++++++++++++- transport/websocket/websocket.go | 13 +++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/transport/transport.go b/transport/transport.go index 434929e..480894a 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -23,7 +23,7 @@ type Transport interface { Publish(subscription string, message *message.Message) error } -type Event string +type Event = string const ( Subscribe Event = "/meta/subscribe" @@ -33,6 +33,17 @@ const ( Disconnect Event = "/meta/disconnect" ) +var ControlEvents = []Event{Subscribe, Connect, Unsubscribe, Handshake, Disconnect} + +func IsControlMsg(channel string) bool { + for i := range ControlEvents { + if channel == ControlEvents[i] { + return true + } + } + return false +} + var registeredTransports = map[string]Transport{} func RegisterTransport(t Transport) { diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 64b2a97..30ecd24 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -7,7 +7,6 @@ import ( "github.com/thesyncim/faye/transport" "log" "strconv" - "strings" "sync" "sync/atomic" ) @@ -62,15 +61,21 @@ func (w *Websocket) readWorker() error { } //dispatch msg := payload[0] - if strings.HasPrefix(msg.Channel, "/meta") { - continue //todo update introspect message and update state + + if transport.IsControlMsg(msg.Channel) { + //handle it + log.Println("recv control message", debugJson(msg)) + + continue } w.subsMu.Lock() subscription := w.subs[msg.Channel] w.subsMu.Unlock() - subscription <- &msg + if subscription != nil { + subscription <- &msg + } } }