diff --git a/client.go b/client.go index 839a60e..a783a7c 100644 --- a/client.go +++ b/client.go @@ -31,7 +31,7 @@ type client interface { // Option set the Client options, such as Transport, message extensions,etc. type Option func(*options) -var _ client = (*Client)(nil) +//var _ client = (*Client)(nil) // Client represents a client connection to an faye server. type Client struct { @@ -58,14 +58,14 @@ func NewClient(url string, opts ...Option) (*Client, error) { } // Subscribe informs the server that messages published to that channel are delivered to itself. -func (c *Client) Subscribe(channel string) (*subscription.Subscription, error) { - return c.dispatcher.Subscribe(channel) +func (c *Client) Subscribe(channel string, authToken string) (*subscription.Subscription, error) { + return c.dispatcher.Subscribe(channel, authToken) } // Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event // if this feature is supported by the server use the OnPublishResponse to get the publish status. -func (c *Client) Publish(channel string, data message.Data) (err error) { - return c.dispatcher.Publish(channel, data) +func (c *Client) Publish(channel string, authToken string, data message.Data) (err error) { + return c.dispatcher.Publish(channel, authToken, data) } // Disconnect closes all subscriptions and inform the server to remove any client-related state. diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index 0db734e..c90768d 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -10,6 +10,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" ) type Dispatcher struct { @@ -116,7 +117,7 @@ func (d *Dispatcher) dispatchMessage(msg *message.Message) { subsList := d.store.GetAll() for i := range subsList { sub := subsList[i] - d.Subscribe(sub.Name()) + d.Subscribe(sub.Name(), sub.AuthToken()) } case message.MetaSubscribe: //handle MetaSubscribe resp @@ -187,13 +188,14 @@ func (d *Dispatcher) sendMessage(m *message.Message) error { return d.transport.SendMessage(m) } -func (d *Dispatcher) Subscribe(channel string) (*subscription.Subscription, error) { +func (d *Dispatcher) Subscribe(channel string, authToken string) (*subscription.Subscription, error) { id := d.nextMsgID() m := &message.Message{ Channel: message.MetaSubscribe, ClientId: d.clientID, Subscription: channel, Id: id, + Ext: map[string]string{"access_token": authToken, "timestamp": string(time.Now().Unix())}, } d.extensions.ApplyOutExtensions(m) @@ -208,7 +210,7 @@ func (d *Dispatcher) Subscribe(channel string) (*subscription.Subscription, erro d.pendingSubs[id] = subscriptionConfirmation d.pendingSubsMu.Unlock() - sub, err := subscription.NewSubscription(channel, d.Unsubscribe, inMsgCh) + sub, err := subscription.NewSubscription(channel, d.Unsubscribe, authToken, inMsgCh) if err != nil { return nil, err } @@ -245,7 +247,7 @@ func (d *Dispatcher) Unsubscribe(sub *subscription.Subscription) error { return nil } -func (d *Dispatcher) Publish(subscription string, data message.Data) (err error) { +func (d *Dispatcher) Publish(subscription string, authToken string, data message.Data) (err error) { id := d.nextMsgID() m := &message.Message{ @@ -253,6 +255,7 @@ func (d *Dispatcher) Publish(subscription string, data message.Data) (err error) Data: data, ClientId: d.clientID, Id: id, + Ext: map[string]string{"access_token": authToken, "timestamp": string(time.Now().Unix())}, } //ack from server diff --git a/subscription/subscription.go b/subscription/subscription.go index a41052e..78cccba 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -11,35 +11,33 @@ var ErrInvalidChannelName = errors.New("invalid channel channel") type Unsubscriber func(subscription *Subscription) error type Subscription struct { - channel string - unsub Unsubscriber - msgCh chan *message.Message + channel string + authToken string + unsub Unsubscriber + msgCh chan *message.Message } // todo error -func NewSubscription(chanel string, unsub Unsubscriber, msgCh chan *message.Message) (*Subscription, error) { +func NewSubscription(chanel string, unsub Unsubscriber, authToken string, msgCh chan *message.Message) (*Subscription, error) { if !IsValidSubscriptionName(chanel) { return nil, ErrInvalidChannelName } return &Subscription{ - channel: chanel, - unsub: unsub, - msgCh: msgCh, + channel: chanel, + authToken: authToken, + unsub: unsub, + msgCh: msgCh, }, nil } func (s *Subscription) OnMessage(onMessage func(channel string, msg message.Data)) error { var inMsg *message.Message - go func() error { - for inMsg = range s.msgCh { - if inMsg.GetError() != nil { - return inMsg.GetError() - } - onMessage(inMsg.Channel, inMsg.Data) + for inMsg = range s.msgCh { + if inMsg.GetError() != nil { + return inMsg.GetError() } - return nil - }() - + onMessage(inMsg.Channel, inMsg.Data) + } return nil } @@ -56,6 +54,10 @@ func (s *Subscription) Unsubscribe() error { return s.unsub(s) } +func (s *Subscription) AuthToken() string { + return s.authToken +} + // validChannelName channel specifies is the channel is in the format /foo/432/bar var validChannelName = regexp.MustCompile(`^\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+(\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+)*$`)