From 3c187e364bedd440096284a388ebfbe0813af6e6 Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Wed, 5 Sep 2018 01:56:25 +0200 Subject: [PATCH] implement subscribe --- transport/websocket/websocket.go | 57 +++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 5 deletions(-) diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 786ae79..192d259 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -19,6 +19,7 @@ type Websocket struct { conn *websocket.Conn clientID string msgID *uint64 + subs map[string]chan struct{} } var _ transport.Transport = (*Websocket)(nil) @@ -28,9 +29,9 @@ func (w *Websocket) Init(options *transport.Options) error { err error msgID uint64 ) - w.TransportOpts = options w.msgID = &msgID + w.subs = map[string]chan struct{}{} w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) if err != nil { return err @@ -48,7 +49,7 @@ func (w *Websocket) Options() *transport.Options { return w.TransportOpts } func (w *Websocket) Handshake() (err error) { - if err = w.conn.WriteJSON(append(nil, message.Message{ + if err = w.conn.WriteJSON(append([]message.Message{}, message.Message{ Channel: string(transport.Handshake), Version: "1.0", //todo const SupportedConnectionTypes: []string{transportName}, @@ -57,7 +58,7 @@ func (w *Websocket) Handshake() (err error) { } var hsResps []message.Message - if err = w.conn.ReadJSON(hsResps); err != nil { + if err = w.conn.ReadJSON(&hsResps); err != nil { return err } @@ -71,7 +72,7 @@ func (w *Websocket) Handshake() (err error) { func (w *Websocket) Connect() error { //todo verify if extensions are applied on connect,verify if hs is complete - return w.conn.WriteJSON(append(nil, message.Message{ + return w.conn.WriteJSON(append([]message.Message{}, message.Message{ Channel: string(transport.Connect), ClientId: w.clientID, ConnectionType: transportName, @@ -80,7 +81,53 @@ func (w *Websocket) Connect() error { } func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error { - panic("not implemented") + m := &message.Message{ + Channel: string(transport.Subscribe), + ClientId: w.clientID, + Subscription: "/" + subscription, + Id: w.nextMsgID(), + } + if w.TransportOpts.OutExt != nil { + w.TransportOpts.OutExt(m) + } + err := w.conn.WriteJSON(append([]message.Message{}, *m)) + if err != nil { + return err + } + + var hsResps []message.Message + if err = w.conn.ReadJSON(&hsResps); err != nil { + return err + } + + subResp := hsResps[0] + if subResp.GetError() != nil { + return err + } + if !subResp.Successful { + //report err just for sanity + } + unsubsCh := make(chan struct{}, 0) + //todo multiple subs + w.subs[subscription] = unsubsCh + + for { + select { + case <-unsubsCh: + return nil + default: + } + //todo guard unsusribe + var hsResps []message.Message + err := w.conn.ReadJSON(&hsResps) + if err != nil { + return err + } + + msg := hsResps[0] + onMessage(&msg) + } + return nil } func (w *Websocket) Unsubscribe(subscription string) error {