diff --git a/client.go b/client.go index 594682e..75457ea 100644 --- a/client.go +++ b/client.go @@ -8,9 +8,8 @@ import ( ) type options struct { - inExt []message.Extension - outExt []message.Extension transport transport.Transport + tOpts transport.Options } var defaultOpts = options{ @@ -43,13 +42,9 @@ func NewClient(url string, opts ...Option) (*Client, error) { for _, opt := range opts { opt(&c.opts) } - tops := &transport.Options{ - Url: url, - InExt: c.opts.inExt, - OutExt: c.opts.outExt, - } + var err error - if err = c.opts.transport.Init(tops); err != nil { + if err = c.opts.transport.Init(url, &c.opts.tOpts); err != nil { return nil, err } @@ -89,11 +84,11 @@ func (c *Client) Disconnect() error { return c.opts.transport.Disconnect() } -//WithOutExtension append the provided outgoing extension to the list of outgoing extensions. +//WithOutExtension append the provided outgoing extension to the the default transport options //extensions run in the order that they are provided func WithOutExtension(extension message.Extension) Option { return func(o *options) { - o.outExt = append(o.outExt, extension) + o.tOpts.Extensions.Out = append(o.tOpts.Extensions.Out, extension) } } @@ -101,8 +96,8 @@ func WithOutExtension(extension message.Extension) Option { //extensions run in the order that they are provided func WithExtension(inExt message.Extension, outExt message.Extension) Option { return func(o *options) { - o.inExt = append(o.inExt, inExt) - o.outExt = append(o.outExt, outExt) + o.tOpts.Extensions.In = append(o.tOpts.Extensions.In, inExt) + o.tOpts.Extensions.Out = append(o.tOpts.Extensions.Out, outExt) } } @@ -110,7 +105,7 @@ func WithExtension(inExt message.Extension, outExt message.Extension) Option { //extensions run in the order that they are provided func WithInExtension(extension message.Extension) Option { return func(o *options) { - o.inExt = append(o.inExt, extension) + o.tOpts.Extensions.In = append(o.tOpts.Extensions.In, extension) } } diff --git a/message/message.go b/message/message.go index 953e8c7..c5bb677 100644 --- a/message/message.go +++ b/message/message.go @@ -9,6 +9,23 @@ import ( type Extension func(message *Message) +type Extensions struct { + In []Extension + Out []Extension +} + +func (e *Extensions) ApplyOutExtensions(m *Message) { + for i := range e.Out { + e.Out[i](m) + } +} + +func (e *Extensions) ApplyInExtensions(m *Message) { + for i := range e.In { + e.In[i](m) + } +} + type Data = interface{} type Message struct { diff --git a/transport/transport.go b/transport/transport.go index c62bf8c..23b7914 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -8,11 +8,9 @@ import ( //Options represents the connection options to be used by a transport type Options struct { - Url string RetryInterval time.Duration - InExt []message.Extension - OutExt []message.Extension + Extensions message.Extensions //todo dial timeout //todo read/write deadline } @@ -22,7 +20,7 @@ type Transport interface { //Name returns the transport name Name() string //Init initializes the transport with the provided options - Init(options *Options) error + Init(endpoint string, options *Options) error //Options return the transport Options Options() *Options //Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel. diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 7597158..8fa33c5 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -21,7 +21,7 @@ func init() { //Websocket represents an websocket transport for the faye protocol type Websocket struct { - TransportOpts *transport.Options + topts *transport.Options connMu sync.Mutex conn *websocket.Conn @@ -45,18 +45,18 @@ type Websocket struct { var _ transport.Transport = (*Websocket)(nil) //Init initializes the transport with the provided options -func (w *Websocket) Init(options *transport.Options) error { +func (w *Websocket) Init(endpoint string, options *transport.Options) error { var ( err error msgID uint64 ) - w.TransportOpts = options + w.topts = options w.msgID = &msgID //w.subs = map[string]chan *message.Message{} w.subs2 = map[string][]*subscription.Subscription{} w.onPublishResponse = map[string]func(message *message.Message){} w.stopCh = make(chan error) - w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) + w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, nil) if err != nil { return err } @@ -77,7 +77,7 @@ func (w *Websocket) readWorker() error { } //dispatch msg := &payload[0] - w.applyInExtensions(msg) + w.topts.Extensions.ApplyInExtensions(msg) if msg.Advice != nil { w.handleAdvise(msg.Advice) @@ -180,7 +180,7 @@ func (w *Websocket) Name() string { func (w *Websocket) sendMessage(m *message.Message) error { w.connMu.Lock() defer w.connMu.Unlock() - w.applyOutExtensions(m) + w.topts.Extensions.ApplyOutExtensions(m) var payload []message.Message payload = append(payload, *m) @@ -203,7 +203,7 @@ func (w *Websocket) nextMsgID() string { //Options return the transport Options func (w *Websocket) Options() *transport.Options { - return w.TransportOpts + return w.topts } //Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel. @@ -224,7 +224,7 @@ func (w *Websocket) Handshake() (err error) { } resp := &hsResps[0] - w.applyInExtensions(resp) + w.topts.Extensions.ApplyInExtensions(resp) if resp.GetError() != nil { return err } @@ -371,18 +371,6 @@ func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *m w.onPubResponseMu.Unlock() } -func (w *Websocket) applyOutExtensions(m *message.Message) { - for i := range w.TransportOpts.OutExt { - w.TransportOpts.OutExt[i](m) - } -} - -func (w *Websocket) applyInExtensions(m *message.Message) { - for i := range w.TransportOpts.InExt { - w.TransportOpts.InExt[i](m) - } -} - func (w *Websocket) handleAdvise(m *message.Advise) { //todo actually handle the advice w.advice.Store(m)