implement subscribe

This commit is contained in:
Marcelo Pires 2018-09-05 01:56:25 +02:00
parent fc09ac255e
commit 3c187e364b

View File

@ -19,6 +19,7 @@ type Websocket struct {
conn *websocket.Conn conn *websocket.Conn
clientID string clientID string
msgID *uint64 msgID *uint64
subs map[string]chan struct{}
} }
var _ transport.Transport = (*Websocket)(nil) var _ transport.Transport = (*Websocket)(nil)
@ -28,9 +29,9 @@ func (w *Websocket) Init(options *transport.Options) error {
err error err error
msgID uint64 msgID uint64
) )
w.TransportOpts = options w.TransportOpts = options
w.msgID = &msgID w.msgID = &msgID
w.subs = map[string]chan struct{}{}
w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil)
if err != nil { if err != nil {
return err return err
@ -48,7 +49,7 @@ func (w *Websocket) Options() *transport.Options {
return w.TransportOpts return w.TransportOpts
} }
func (w *Websocket) Handshake() (err error) { func (w *Websocket) Handshake() (err error) {
if err = w.conn.WriteJSON(append(nil, message.Message{ if err = w.conn.WriteJSON(append([]message.Message{}, message.Message{
Channel: string(transport.Handshake), Channel: string(transport.Handshake),
Version: "1.0", //todo const Version: "1.0", //todo const
SupportedConnectionTypes: []string{transportName}, SupportedConnectionTypes: []string{transportName},
@ -57,7 +58,7 @@ func (w *Websocket) Handshake() (err error) {
} }
var hsResps []message.Message var hsResps []message.Message
if err = w.conn.ReadJSON(hsResps); err != nil { if err = w.conn.ReadJSON(&hsResps); err != nil {
return err return err
} }
@ -71,7 +72,7 @@ func (w *Websocket) Handshake() (err error) {
func (w *Websocket) Connect() error { func (w *Websocket) Connect() error {
//todo verify if extensions are applied on connect,verify if hs is complete //todo verify if extensions are applied on connect,verify if hs is complete
return w.conn.WriteJSON(append(nil, message.Message{ return w.conn.WriteJSON(append([]message.Message{}, message.Message{
Channel: string(transport.Connect), Channel: string(transport.Connect),
ClientId: w.clientID, ClientId: w.clientID,
ConnectionType: transportName, ConnectionType: transportName,
@ -80,7 +81,53 @@ func (w *Websocket) Connect() error {
} }
func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error { func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error {
panic("not implemented") m := &message.Message{
Channel: string(transport.Subscribe),
ClientId: w.clientID,
Subscription: "/" + subscription,
Id: w.nextMsgID(),
}
if w.TransportOpts.OutExt != nil {
w.TransportOpts.OutExt(m)
}
err := w.conn.WriteJSON(append([]message.Message{}, *m))
if err != nil {
return err
}
var hsResps []message.Message
if err = w.conn.ReadJSON(&hsResps); err != nil {
return err
}
subResp := hsResps[0]
if subResp.GetError() != nil {
return err
}
if !subResp.Successful {
//report err just for sanity
}
unsubsCh := make(chan struct{}, 0)
//todo multiple subs
w.subs[subscription] = unsubsCh
for {
select {
case <-unsubsCh:
return nil
default:
}
//todo guard unsusribe
var hsResps []message.Message
err := w.conn.ReadJSON(&hsResps)
if err != nil {
return err
}
msg := hsResps[0]
onMessage(&msg)
}
return nil
} }
func (w *Websocket) Unsubscribe(subscription string) error { func (w *Websocket) Unsubscribe(subscription string) error {