From 896bafe70b15f944a9266f1b8582c3ebb4e8ab08 Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Fri, 22 Sep 2023 16:06:58 -0500 Subject: [PATCH] Attempting to subscribe in a go routine --- subscription/subscription.go | 9 ++++++++- transport/websocket/websocket.go | 3 ++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/subscription/subscription.go b/subscription/subscription.go index 78cccba..f56bbf1 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -32,11 +32,18 @@ func NewSubscription(chanel string, unsub Unsubscriber, authToken string, msgCh func (s *Subscription) OnMessage(onMessage func(channel string, msg message.Data)) error { var inMsg *message.Message + go s.StartMessageLoop(inMsg, onMessage) + + return nil +} + +func (s *Subscription) StartMessageLoop(inMsg *message.Message, callback func(channel string, msg message.Data)) error { for inMsg = range s.msgCh { if inMsg.GetError() != nil { + return inMsg.GetError() } - onMessage(inMsg.Channel, inMsg.Data) + callback(inMsg.Channel, inMsg.Data) } return nil } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 071eec7..2087338 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -138,7 +138,8 @@ func (w *Websocket) Handshake(msg *message.Message) (resp *message.Message, err // a connection is established by sending a message to the /meta/connect channel func (w *Websocket) Connect(msg *message.Message) error { go func() { - log.Fatal(w.readWorker()) + err := w.readWorker() + log.Fatal(err) }() return w.SendMessage(msg) }