diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 788099c..708d168 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -53,8 +53,8 @@ func (w *Websocket) Init(options *transport.Options) error { } func (w *Websocket) readWorker() error { - var payload []message.Message for { + var payload []message.Message err := w.conn.ReadJSON(&payload) if err != nil { return err @@ -63,26 +63,23 @@ func (w *Websocket) readWorker() error { msg := payload[0] if transport.IsControlMsg(msg.Channel) { - - log.Println("recv control message", debugJson(msg)) - //handle it switch msg.Channel { case transport.Subscribe: //handle Subscribe resp if !msg.Successful { w.subsMu.Lock() - subscription, ok := w.subs[msg.Channel] + subscription, ok := w.subs[msg.Subscription] w.subsMu.Unlock() + log.Println(debugJson(msg)) if !ok { - panic("BUG: subscription not registered") + panic("BUG: subscription not registered `" + msg.Subscription + "`") } subscription <- &msg close(subscription) w.subsMu.Lock() delete(w.subs, msg.Channel) w.subsMu.Unlock() - } case transport.Unsubscribe: //handle Unsubscribe resp @@ -128,8 +125,8 @@ func (w *Websocket) nextMsgID() string { func (w *Websocket) Options() *transport.Options { return w.TransportOpts } -func (w *Websocket) Handshake() (err error) { +func (w *Websocket) Handshake() (err error) { m := message.Message{ Channel: transport.Handshake, Version: "1.0", //todo const @@ -152,7 +149,6 @@ func (w *Websocket) Handshake() (err error) { if resp.GetError() != nil { return err } - log.Println(debugJson(resp)) w.clientID = resp.ClientId return nil } @@ -165,7 +161,6 @@ func (w *Websocket) Connect() error { Id: w.nextMsgID(), } //todo verify if extensions are applied on connect,verify if hs is complete - go w.readWorker() return w.sendMessage(&m) } @@ -186,10 +181,11 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(message *messa } //todo validate - inMsgCh := make(chan *message.Message, 0) + w.subsMu.Lock() w.subs[subscription] = inMsgCh + w.subsMu.Unlock() var inMsg *message.Message for inMsg = range inMsgCh {