diff --git a/client.go b/client.go index 34e61ce..1568ac6 100644 --- a/client.go +++ b/client.go @@ -18,8 +18,10 @@ var defaultOpts = options{ //https://faye.jcoglan.com/architecture.html type client interface { - Subscribe(subscription string, onMsg func(data message.Data)) error - Publish(subscription string, data message.Data) error + Disconnect() error + Subscribe(subscription string, onMessage func(message message.Data)) error + Unsubscribe(subscription string) error + Publish(subscription string, message message.Data) error //todo unsubscribe,etc } @@ -58,11 +60,28 @@ func NewClient(url string, opts ...Option) (*Client, error) { return &c, nil } +func (c *Client) Subscribe(subscription string, onMsg func(message message.Data)) error { + return c.opts.transport.Subscribe(subscription, onMsg) +} + +func (c *Client) Unsubscribe(subscription string) error { + return c.opts.transport.Unsubscribe(subscription) +} + +func (c *Client) Publish(subscription string, data message.Data) error { + return c.opts.transport.Publish(subscription, data) +} + +func (c *Client) Disconnect() error { + return c.opts.transport.Disconnect() +} + func WithOutExtension(extension message.Extension) Option { return func(o *options) { o.outExt = append(o.outExt, extension) } } + func WithExtension(inExt message.Extension, outExt message.Extension) Option { return func(o *options) { o.inExt = append(o.inExt, inExt) @@ -81,11 +100,3 @@ func WithTransport(t transport.Transport) Option { o.transport = t } } - -func (c *Client) Subscribe(subscription string, onMsg func(message message.Data)) error { - return c.opts.transport.Subscribe(subscription, onMsg) -} - -func (c *Client) Publish(subscription string, data message.Data) error { - return c.opts.transport.Publish(subscription, data) -} diff --git a/extensions/getstream.go b/extensions/getstream.go index 8cb8171..a729664 100644 --- a/extensions/getstream.go +++ b/extensions/getstream.go @@ -19,7 +19,7 @@ func NewGetStream(apiKey string, signature string) GetStream { } func (gt GetStream) OutExtension(message *message.Message) { - if message.Channel == string(transport.Subscribe) { + if message.Channel == string(transport.MetaSubscribe) { //get useriID gt.UserID = message.Subscription[1:] message.Ext = gt diff --git a/transport/transport.go b/transport/transport.go index 5c4cfa9..d843b11 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -1,11 +1,16 @@ package transport -import "github.com/thesyncim/faye/message" +import ( + "github.com/thesyncim/faye/message" + "time" +) // handshake, connect, disconnect, subscribe, unsubscribe and publish type Options struct { - Url string + Url string + RetryInterval time.Duration + InExt []message.Extension OutExt []message.Extension //todo dial timeout @@ -18,26 +23,44 @@ type Transport interface { Options() *Options Handshake() error Connect() error + Disconnect() error Subscribe(subscription string, onMessage func(message message.Data)) error Unsubscribe(subscription string) error Publish(subscription string, message message.Data) error } -type Event = string +type Meta = string const ( - Subscribe Event = "/meta/subscribe" - Connect Event = "/meta/connect" - Unsubscribe Event = "/meta/unsubscribe" - Handshake Event = "/meta/handshake" - Disconnect Event = "/meta/disconnect" + MetaSubscribe Meta = "/meta/subscribe" + MetaConnect Meta = "/meta/connect" + MetaDisconnect Meta = "/meta/disconnect" + MetaUnsubscribe Meta = "/meta/unsubscribe" + MetaHandshake Meta = "/meta/handshake" ) -var ControlEvents = []Event{Subscribe, Connect, Unsubscribe, Handshake, Disconnect} +type Reconnect = string -func IsControlMsg(channel string) bool { - for i := range ControlEvents { - if channel == ControlEvents[i] { +const ( + //ReconnectRetry indicates that a client MAY attempt to reconnect with a /meta/connect message, + //after the interval (as defined by interval advice field or client-default backoff), and with the same credentials. + ReconnectRetry Reconnect = "retry" + + //ReconnectHandshake indicates that the server has terminated any prior connection status and the client MUST reconnect + // with a /meta/handshake message. + //A client MUST NOT automatically retry when a reconnect advice handshake has been received. + ReconnectHandshake Reconnect = "handshake" + + //ReconnectNone indicates a hard failure for the connect attempt. + //A client MUST respect reconnect advice none and MUST NOT automatically retry or handshake. + ReconnectNone Reconnect = "none" +) + +var MetaEvents = []Meta{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect} + +func IsMetaEvent(channel string) bool { + for i := range MetaEvents { + if channel == MetaEvents[i] { return true } } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 52b623e..fa937bb 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -25,6 +25,8 @@ type Websocket struct { once sync.Once advice atomic.Value //type message.Advise + stopCh chan struct{} + subsMu sync.Mutex //todo sync.Map subs map[string]chan *message.Message } @@ -39,6 +41,7 @@ func (w *Websocket) Init(options *transport.Options) error { w.TransportOpts = options w.msgID = &msgID w.subs = map[string]chan *message.Message{} + w.stopCh = make(chan struct{}) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) if err != nil { return err @@ -48,6 +51,11 @@ func (w *Websocket) Init(options *transport.Options) error { func (w *Websocket) readWorker() error { for { + select { + case <-w.stopCh: + return nil + default: + } var payload []message.Message err := w.conn.ReadJSON(&payload) if err != nil { @@ -60,11 +68,11 @@ func (w *Websocket) readWorker() error { w.handleAdvise(msg.Advice) } - if transport.IsControlMsg(msg.Channel) { + if transport.IsMetaEvent(msg.Channel) { //handle it switch msg.Channel { - case transport.Subscribe: - //handle Subscribe resp + case transport.MetaSubscribe: + //handle MetaSubscribe resp if !msg.Successful { w.subsMu.Lock() subscription, ok := w.subs[msg.Subscription] @@ -82,16 +90,16 @@ func (w *Websocket) readWorker() error { delete(w.subs, msg.Channel) w.subsMu.Unlock() } - case transport.Unsubscribe: - //handle Unsubscribe resp - case transport.Connect: - //handle Connect resp + case transport.MetaUnsubscribe: + //handle MetaUnsubscribe resp + case transport.MetaConnect: + //handle MetaConnect resp - case transport.Disconnect: - //handle Disconnect resp + case transport.MetaDisconnect: + //handle MetaDisconnect resp - case transport.Handshake: - //handle Handshake resp + case transport.MetaHandshake: + //handle MetaHandshake resp } continue @@ -130,7 +138,7 @@ func (w *Websocket) Options() *transport.Options { func (w *Websocket) Handshake() (err error) { m := message.Message{ - Channel: transport.Handshake, + Channel: transport.MetaHandshake, Version: "1.0", //todo const SupportedConnectionTypes: []string{transportName}, } @@ -155,7 +163,7 @@ func (w *Websocket) Handshake() (err error) { func (w *Websocket) Connect() error { m := message.Message{ - Channel: transport.Connect, + Channel: transport.MetaConnect, ClientId: w.clientID, ConnectionType: transportName, Id: w.nextMsgID(), @@ -165,9 +173,22 @@ func (w *Websocket) Connect() error { return w.sendMessage(&m) } +func (w *Websocket) Disconnect() error { + m := message.Message{ + Channel: transport.MetaDisconnect, + ClientId: w.clientID, + Id: w.nextMsgID(), + } + + w.stopCh <- struct{}{} + close(w.stopCh) + + return w.sendMessage(&m) +} + func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error { m := &message.Message{ - Channel: transport.Subscribe, + Channel: transport.MetaSubscribe, ClientId: w.clientID, Subscription: subscription, Id: w.nextMsgID(), @@ -197,7 +218,7 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(data message.D func (w *Websocket) Unsubscribe(subscription string) error { //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe m := &message.Message{ - Channel: transport.Unsubscribe, + Channel: transport.MetaUnsubscribe, Subscription: subscription, ClientId: w.clientID, Id: w.nextMsgID(),