dispatch errors in subscriptions

This commit is contained in:
Marcelo Pires 2018-09-05 15:45:53 +02:00
parent 3cf76b96df
commit f11de6960b

View File

@ -70,6 +70,20 @@ func (w *Websocket) readWorker() error {
switch msg.Channel {
case transport.Subscribe:
//handle Subscribe resp
if !msg.Successful {
w.subsMu.Lock()
subscription, ok := w.subs[msg.Channel]
w.subsMu.Unlock()
if !ok {
panic("BUG: subscription not registered")
}
subscription <- &msg
close(subscription)
w.subsMu.Lock()
delete(w.subs, msg.Channel)
w.subsMu.Unlock()
}
case transport.Unsubscribe:
//handle Unsubscribe resp
case transport.Connect:
@ -117,7 +131,7 @@ func (w *Websocket) Options() *transport.Options {
func (w *Websocket) Handshake() (err error) {
m := message.Message{
Channel: string(transport.Handshake),
Channel: transport.Handshake,
Version: "1.0", //todo const
SupportedConnectionTypes: []string{transportName},
}
@ -145,7 +159,7 @@ func (w *Websocket) Handshake() (err error) {
func (w *Websocket) Connect() error {
m := message.Message{
Channel: string(transport.Connect),
Channel: transport.Connect,
ClientId: w.clientID,
ConnectionType: transportName,
Id: w.nextMsgID(),
@ -158,7 +172,7 @@ func (w *Websocket) Connect() error {
func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error {
m := &message.Message{
Channel: string(transport.Subscribe),
Channel: transport.Subscribe,
ClientId: w.clientID,
Subscription: subscription,
Id: w.nextMsgID(),
@ -179,6 +193,9 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(message *messa
var inMsg *message.Message
for inMsg = range inMsgCh {
if inMsg.GetError() != nil {
return inMsg.GetError()
}
onMessage(inMsg)
}
return nil