diff --git a/message/message.go b/message/message.go index 0ab19ab..8f1379b 100644 --- a/message/message.go +++ b/message/message.go @@ -16,7 +16,7 @@ type Message struct { Ext interface{} `json:"ext,omitempty"` Id string `json:"id,omitempty"` ClientId string `json:"clientId,omitempty"` - Advice Advise `json:"advice,omitempty"` + Advice *Advise `json:"advice,omitempty"` Data Data `json:"data,omitempty"` Timestamp uint64 `json:"timestamp,omitempty"` AuthSuccessful bool `json:"authSuccessful,omitempty"` diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 6b06023..52b623e 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -23,6 +23,7 @@ type Websocket struct { clientID string msgID *uint64 once sync.Once + advice atomic.Value //type message.Advise subsMu sync.Mutex //todo sync.Map subs map[string]chan *message.Message @@ -55,6 +56,10 @@ func (w *Websocket) readWorker() error { //dispatch msg := &payload[0] + if msg.Advice != nil { + w.handleAdvise(msg.Advice) + } + if transport.IsControlMsg(msg.Channel) { //handle it switch msg.Channel { @@ -201,7 +206,13 @@ func (w *Websocket) Unsubscribe(subscription string) error { } func (w *Websocket) Publish(subscription string, data message.Data) error { - panic("not implemented") + m := &message.Message{ + Channel: subscription, + Data: data, + ClientId: w.clientID, + Id: w.nextMsgID(), + } + return w.sendMessage(m) } func (w *Websocket) applyOutExtensions(m *message.Message) { @@ -209,8 +220,14 @@ func (w *Websocket) applyOutExtensions(m *message.Message) { w.TransportOpts.OutExt[i](m) } } + func (w *Websocket) applyInExtensions(m *message.Message) { for i := range w.TransportOpts.InExt { w.TransportOpts.InExt[i](m) } } + +func (w *Websocket) handleAdvise(m *message.Advise) { + //todo actually handle the advice + w.advice.Store(m) +}