handle advise

This commit is contained in:
Marcelo Pires 2018-09-06 13:23:53 +02:00
parent eed039f351
commit ee24023387
2 changed files with 19 additions and 2 deletions

View File

@ -16,7 +16,7 @@ type Message struct {
Ext interface{} `json:"ext,omitempty"` Ext interface{} `json:"ext,omitempty"`
Id string `json:"id,omitempty"` Id string `json:"id,omitempty"`
ClientId string `json:"clientId,omitempty"` ClientId string `json:"clientId,omitempty"`
Advice Advise `json:"advice,omitempty"` Advice *Advise `json:"advice,omitempty"`
Data Data `json:"data,omitempty"` Data Data `json:"data,omitempty"`
Timestamp uint64 `json:"timestamp,omitempty"` Timestamp uint64 `json:"timestamp,omitempty"`
AuthSuccessful bool `json:"authSuccessful,omitempty"` AuthSuccessful bool `json:"authSuccessful,omitempty"`

View File

@ -23,6 +23,7 @@ type Websocket struct {
clientID string clientID string
msgID *uint64 msgID *uint64
once sync.Once once sync.Once
advice atomic.Value //type message.Advise
subsMu sync.Mutex //todo sync.Map subsMu sync.Mutex //todo sync.Map
subs map[string]chan *message.Message subs map[string]chan *message.Message
@ -55,6 +56,10 @@ func (w *Websocket) readWorker() error {
//dispatch //dispatch
msg := &payload[0] msg := &payload[0]
if msg.Advice != nil {
w.handleAdvise(msg.Advice)
}
if transport.IsControlMsg(msg.Channel) { if transport.IsControlMsg(msg.Channel) {
//handle it //handle it
switch msg.Channel { switch msg.Channel {
@ -201,7 +206,13 @@ func (w *Websocket) Unsubscribe(subscription string) error {
} }
func (w *Websocket) Publish(subscription string, data message.Data) error { func (w *Websocket) Publish(subscription string, data message.Data) error {
panic("not implemented") m := &message.Message{
Channel: subscription,
Data: data,
ClientId: w.clientID,
Id: w.nextMsgID(),
}
return w.sendMessage(m)
} }
func (w *Websocket) applyOutExtensions(m *message.Message) { func (w *Websocket) applyOutExtensions(m *message.Message) {
@ -209,8 +220,14 @@ func (w *Websocket) applyOutExtensions(m *message.Message) {
w.TransportOpts.OutExt[i](m) w.TransportOpts.OutExt[i](m)
} }
} }
func (w *Websocket) applyInExtensions(m *message.Message) { func (w *Websocket) applyInExtensions(m *message.Message) {
for i := range w.TransportOpts.InExt { for i := range w.TransportOpts.InExt {
w.TransportOpts.InExt[i](m) w.TransportOpts.InExt[i](m)
} }
} }
func (w *Websocket) handleAdvise(m *message.Advise) {
//todo actually handle the advice
w.advice.Store(m)
}