diff --git a/subscription/subscription.go b/subscription/subscription.go index 78cccba..f56bbf1 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -32,11 +32,18 @@ func NewSubscription(chanel string, unsub Unsubscriber, authToken string, msgCh func (s *Subscription) OnMessage(onMessage func(channel string, msg message.Data)) error { var inMsg *message.Message + go s.StartMessageLoop(inMsg, onMessage) + + return nil +} + +func (s *Subscription) StartMessageLoop(inMsg *message.Message, callback func(channel string, msg message.Data)) error { for inMsg = range s.msgCh { if inMsg.GetError() != nil { + return inMsg.GetError() } - onMessage(inMsg.Channel, inMsg.Data) + callback(inMsg.Channel, inMsg.Data) } return nil } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 071eec7..2087338 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -138,7 +138,8 @@ func (w *Websocket) Handshake(msg *message.Message) (resp *message.Message, err // a connection is established by sending a message to the /meta/connect channel func (w *Websocket) Connect(msg *message.Message) error { go func() { - log.Fatal(w.readWorker()) + err := w.readWorker() + log.Fatal(err) }() return w.SendMessage(msg) }