implement missing client methods

stop gracefully on disconnect
This commit is contained in:
Marcelo Pires 2018-09-06 14:28:54 +02:00
parent ee24023387
commit e70f0f0b52
4 changed files with 93 additions and 38 deletions

View File

@ -18,8 +18,10 @@ var defaultOpts = options{
//https://faye.jcoglan.com/architecture.html //https://faye.jcoglan.com/architecture.html
type client interface { type client interface {
Subscribe(subscription string, onMsg func(data message.Data)) error Disconnect() error
Publish(subscription string, data message.Data) error Subscribe(subscription string, onMessage func(message message.Data)) error
Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) error
//todo unsubscribe,etc //todo unsubscribe,etc
} }
@ -58,11 +60,28 @@ func NewClient(url string, opts ...Option) (*Client, error) {
return &c, nil return &c, nil
} }
func (c *Client) Subscribe(subscription string, onMsg func(message message.Data)) error {
return c.opts.transport.Subscribe(subscription, onMsg)
}
func (c *Client) Unsubscribe(subscription string) error {
return c.opts.transport.Unsubscribe(subscription)
}
func (c *Client) Publish(subscription string, data message.Data) error {
return c.opts.transport.Publish(subscription, data)
}
func (c *Client) Disconnect() error {
return c.opts.transport.Disconnect()
}
func WithOutExtension(extension message.Extension) Option { func WithOutExtension(extension message.Extension) Option {
return func(o *options) { return func(o *options) {
o.outExt = append(o.outExt, extension) o.outExt = append(o.outExt, extension)
} }
} }
func WithExtension(inExt message.Extension, outExt message.Extension) Option { func WithExtension(inExt message.Extension, outExt message.Extension) Option {
return func(o *options) { return func(o *options) {
o.inExt = append(o.inExt, inExt) o.inExt = append(o.inExt, inExt)
@ -81,11 +100,3 @@ func WithTransport(t transport.Transport) Option {
o.transport = t o.transport = t
} }
} }
func (c *Client) Subscribe(subscription string, onMsg func(message message.Data)) error {
return c.opts.transport.Subscribe(subscription, onMsg)
}
func (c *Client) Publish(subscription string, data message.Data) error {
return c.opts.transport.Publish(subscription, data)
}

View File

@ -19,7 +19,7 @@ func NewGetStream(apiKey string, signature string) GetStream {
} }
func (gt GetStream) OutExtension(message *message.Message) { func (gt GetStream) OutExtension(message *message.Message) {
if message.Channel == string(transport.Subscribe) { if message.Channel == string(transport.MetaSubscribe) {
//get useriID //get useriID
gt.UserID = message.Subscription[1:] gt.UserID = message.Subscription[1:]
message.Ext = gt message.Ext = gt

View File

@ -1,11 +1,16 @@
package transport package transport
import "github.com/thesyncim/faye/message" import (
"github.com/thesyncim/faye/message"
"time"
)
// handshake, connect, disconnect, subscribe, unsubscribe and publish // handshake, connect, disconnect, subscribe, unsubscribe and publish
type Options struct { type Options struct {
Url string Url string
RetryInterval time.Duration
InExt []message.Extension InExt []message.Extension
OutExt []message.Extension OutExt []message.Extension
//todo dial timeout //todo dial timeout
@ -18,26 +23,44 @@ type Transport interface {
Options() *Options Options() *Options
Handshake() error Handshake() error
Connect() error Connect() error
Disconnect() error
Subscribe(subscription string, onMessage func(message message.Data)) error Subscribe(subscription string, onMessage func(message message.Data)) error
Unsubscribe(subscription string) error Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) error Publish(subscription string, message message.Data) error
} }
type Event = string type Meta = string
const ( const (
Subscribe Event = "/meta/subscribe" MetaSubscribe Meta = "/meta/subscribe"
Connect Event = "/meta/connect" MetaConnect Meta = "/meta/connect"
Unsubscribe Event = "/meta/unsubscribe" MetaDisconnect Meta = "/meta/disconnect"
Handshake Event = "/meta/handshake" MetaUnsubscribe Meta = "/meta/unsubscribe"
Disconnect Event = "/meta/disconnect" MetaHandshake Meta = "/meta/handshake"
) )
var ControlEvents = []Event{Subscribe, Connect, Unsubscribe, Handshake, Disconnect} type Reconnect = string
func IsControlMsg(channel string) bool { const (
for i := range ControlEvents { //ReconnectRetry indicates that a client MAY attempt to reconnect with a /meta/connect message,
if channel == ControlEvents[i] { //after the interval (as defined by interval advice field or client-default backoff), and with the same credentials.
ReconnectRetry Reconnect = "retry"
//ReconnectHandshake indicates that the server has terminated any prior connection status and the client MUST reconnect
// with a /meta/handshake message.
//A client MUST NOT automatically retry when a reconnect advice handshake has been received.
ReconnectHandshake Reconnect = "handshake"
//ReconnectNone indicates a hard failure for the connect attempt.
//A client MUST respect reconnect advice none and MUST NOT automatically retry or handshake.
ReconnectNone Reconnect = "none"
)
var MetaEvents = []Meta{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect}
func IsMetaEvent(channel string) bool {
for i := range MetaEvents {
if channel == MetaEvents[i] {
return true return true
} }
} }

View File

@ -25,6 +25,8 @@ type Websocket struct {
once sync.Once once sync.Once
advice atomic.Value //type message.Advise advice atomic.Value //type message.Advise
stopCh chan struct{}
subsMu sync.Mutex //todo sync.Map subsMu sync.Mutex //todo sync.Map
subs map[string]chan *message.Message subs map[string]chan *message.Message
} }
@ -39,6 +41,7 @@ func (w *Websocket) Init(options *transport.Options) error {
w.TransportOpts = options w.TransportOpts = options
w.msgID = &msgID w.msgID = &msgID
w.subs = map[string]chan *message.Message{} w.subs = map[string]chan *message.Message{}
w.stopCh = make(chan struct{})
w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil)
if err != nil { if err != nil {
return err return err
@ -48,6 +51,11 @@ func (w *Websocket) Init(options *transport.Options) error {
func (w *Websocket) readWorker() error { func (w *Websocket) readWorker() error {
for { for {
select {
case <-w.stopCh:
return nil
default:
}
var payload []message.Message var payload []message.Message
err := w.conn.ReadJSON(&payload) err := w.conn.ReadJSON(&payload)
if err != nil { if err != nil {
@ -60,11 +68,11 @@ func (w *Websocket) readWorker() error {
w.handleAdvise(msg.Advice) w.handleAdvise(msg.Advice)
} }
if transport.IsControlMsg(msg.Channel) { if transport.IsMetaEvent(msg.Channel) {
//handle it //handle it
switch msg.Channel { switch msg.Channel {
case transport.Subscribe: case transport.MetaSubscribe:
//handle Subscribe resp //handle MetaSubscribe resp
if !msg.Successful { if !msg.Successful {
w.subsMu.Lock() w.subsMu.Lock()
subscription, ok := w.subs[msg.Subscription] subscription, ok := w.subs[msg.Subscription]
@ -82,16 +90,16 @@ func (w *Websocket) readWorker() error {
delete(w.subs, msg.Channel) delete(w.subs, msg.Channel)
w.subsMu.Unlock() w.subsMu.Unlock()
} }
case transport.Unsubscribe: case transport.MetaUnsubscribe:
//handle Unsubscribe resp //handle MetaUnsubscribe resp
case transport.Connect: case transport.MetaConnect:
//handle Connect resp //handle MetaConnect resp
case transport.Disconnect: case transport.MetaDisconnect:
//handle Disconnect resp //handle MetaDisconnect resp
case transport.Handshake: case transport.MetaHandshake:
//handle Handshake resp //handle MetaHandshake resp
} }
continue continue
@ -130,7 +138,7 @@ func (w *Websocket) Options() *transport.Options {
func (w *Websocket) Handshake() (err error) { func (w *Websocket) Handshake() (err error) {
m := message.Message{ m := message.Message{
Channel: transport.Handshake, Channel: transport.MetaHandshake,
Version: "1.0", //todo const Version: "1.0", //todo const
SupportedConnectionTypes: []string{transportName}, SupportedConnectionTypes: []string{transportName},
} }
@ -155,7 +163,7 @@ func (w *Websocket) Handshake() (err error) {
func (w *Websocket) Connect() error { func (w *Websocket) Connect() error {
m := message.Message{ m := message.Message{
Channel: transport.Connect, Channel: transport.MetaConnect,
ClientId: w.clientID, ClientId: w.clientID,
ConnectionType: transportName, ConnectionType: transportName,
Id: w.nextMsgID(), Id: w.nextMsgID(),
@ -165,9 +173,22 @@ func (w *Websocket) Connect() error {
return w.sendMessage(&m) return w.sendMessage(&m)
} }
func (w *Websocket) Disconnect() error {
m := message.Message{
Channel: transport.MetaDisconnect,
ClientId: w.clientID,
Id: w.nextMsgID(),
}
w.stopCh <- struct{}{}
close(w.stopCh)
return w.sendMessage(&m)
}
func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error { func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error {
m := &message.Message{ m := &message.Message{
Channel: transport.Subscribe, Channel: transport.MetaSubscribe,
ClientId: w.clientID, ClientId: w.clientID,
Subscription: subscription, Subscription: subscription,
Id: w.nextMsgID(), Id: w.nextMsgID(),
@ -197,7 +218,7 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(data message.D
func (w *Websocket) Unsubscribe(subscription string) error { func (w *Websocket) Unsubscribe(subscription string) error {
//https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe
m := &message.Message{ m := &message.Message{
Channel: transport.Unsubscribe, Channel: transport.MetaUnsubscribe,
Subscription: subscription, Subscription: subscription,
ClientId: w.clientID, ClientId: w.clientID,
Id: w.nextMsgID(), Id: w.nextMsgID(),