From f11de6960bf2521938efdf25f51548d7d2152062 Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Wed, 5 Sep 2018 15:45:53 +0200 Subject: [PATCH] dispatch errors in subscriptions --- transport/websocket/websocket.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 0718bad..788099c 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -70,6 +70,20 @@ func (w *Websocket) readWorker() error { switch msg.Channel { case transport.Subscribe: //handle Subscribe resp + if !msg.Successful { + w.subsMu.Lock() + subscription, ok := w.subs[msg.Channel] + w.subsMu.Unlock() + if !ok { + panic("BUG: subscription not registered") + } + subscription <- &msg + close(subscription) + w.subsMu.Lock() + delete(w.subs, msg.Channel) + w.subsMu.Unlock() + + } case transport.Unsubscribe: //handle Unsubscribe resp case transport.Connect: @@ -117,7 +131,7 @@ func (w *Websocket) Options() *transport.Options { func (w *Websocket) Handshake() (err error) { m := message.Message{ - Channel: string(transport.Handshake), + Channel: transport.Handshake, Version: "1.0", //todo const SupportedConnectionTypes: []string{transportName}, } @@ -145,7 +159,7 @@ func (w *Websocket) Handshake() (err error) { func (w *Websocket) Connect() error { m := message.Message{ - Channel: string(transport.Connect), + Channel: transport.Connect, ClientId: w.clientID, ConnectionType: transportName, Id: w.nextMsgID(), @@ -158,7 +172,7 @@ func (w *Websocket) Connect() error { func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error { m := &message.Message{ - Channel: string(transport.Subscribe), + Channel: transport.Subscribe, ClientId: w.clientID, Subscription: subscription, Id: w.nextMsgID(), @@ -179,6 +193,9 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(message *messa var inMsg *message.Message for inMsg = range inMsgCh { + if inMsg.GetError() != nil { + return inMsg.GetError() + } onMessage(inMsg) } return nil