From 33a61d6d27d11c2847a8f7a8c6bf24a6db7e17f2 Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Mon, 24 Sep 2018 16:13:57 +0200 Subject: [PATCH] add dispatcher simplify transport implementation support wildcard subscriptions --- client.go | 40 ++-- extensions/debug.go | 4 +- extensions/getstream.go | 9 +- internal/dispatcher/dispatcher.go | 269 +++++++++++++++++++++++ internal/store/name.go | 40 ++++ internal/store/subscription.go | 90 ++++++++ internal/store/subscription_test.go | 140 ++++++++++++ message/message.go | 55 ++++- subscription/subscription.go | 48 ++--- subscription/subscription_test.go | 22 +- test/client_test.go | 77 +++++-- transport/transport.go | 81 ++----- transport/websocket/websocket.go | 321 +++++----------------------- 13 files changed, 786 insertions(+), 410 deletions(-) create mode 100644 internal/dispatcher/dispatcher.go create mode 100644 internal/store/name.go create mode 100644 internal/store/subscription.go create mode 100644 internal/store/subscription_test.go diff --git a/client.go b/client.go index 75457ea..5659477 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,7 @@ package fayec import ( + "github.com/thesyncim/faye/internal/dispatcher" "github.com/thesyncim/faye/message" "github.com/thesyncim/faye/subscription" "github.com/thesyncim/faye/transport" @@ -8,8 +9,9 @@ import ( ) type options struct { - transport transport.Transport - tOpts transport.Options + transport transport.Transport + transportOpts transport.Options + extensions message.Extensions } var defaultOpts = options{ @@ -20,7 +22,6 @@ var defaultOpts = options{ type client interface { Disconnect() error Subscribe(subscription string) (*subscription.Subscription, error) - //Unsubscribe(subscription string) error Publish(subscription string, message message.Data) (string, error) OnPublishResponse(subscription string, onMsg func(message *message.Message)) } @@ -32,7 +33,8 @@ var _ client = (*Client)(nil) // Client represents a client connection to an faye server. type Client struct { - opts options + opts options + dispatcher *dispatcher.Dispatcher } //NewClient creates a new faye client with the provided options and connect to the specified url. @@ -43,16 +45,10 @@ func NewClient(url string, opts ...Option) (*Client, error) { opt(&c.opts) } - var err error - if err = c.opts.transport.Init(url, &c.opts.tOpts); err != nil { - return nil, err - } - - if err = c.opts.transport.Handshake(); err != nil { - return nil, err - } - - if err = c.opts.transport.Connect(); err != nil { + c.dispatcher = dispatcher.NewDispatcher(url, c.opts.transportOpts, c.opts.extensions) + c.dispatcher.SetTransport(c.opts.transport) + err := c.dispatcher.Connect() + if err != nil { return nil, err } @@ -61,13 +57,13 @@ func NewClient(url string, opts ...Option) (*Client, error) { //Subscribe informs the server that messages published to that channel are delivered to itself. func (c *Client) Subscribe(subscription string) (*subscription.Subscription, error) { - return c.opts.transport.Subscribe(subscription) + return c.dispatcher.Subscribe(subscription) } //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event //if this feature is supported by the server use the OnPublishResponse to get the publish status. func (c *Client) Publish(subscription string, data message.Data) (id string, err error) { - return c.opts.transport.Publish(subscription, data) + return c.dispatcher.Publish(subscription, data) } //OnPublishResponse sets the handler to be triggered if the server replies to the publish request. @@ -75,20 +71,20 @@ func (c *Client) Publish(subscription string, data message.Data) (id string, err //ever be triggered. //can be used to identify the status of the published request and for example retry failed published requests. func (c *Client) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { - c.opts.transport.OnPublishResponse(subscription, onMsg) + c.dispatcher.OnPublishResponse(subscription, onMsg) } //Disconnect closes all subscriptions and inform the server to remove any client-related state. //any subsequent method call to the client object will result in undefined behaviour. func (c *Client) Disconnect() error { - return c.opts.transport.Disconnect() + return c.dispatcher.Disconnect() } //WithOutExtension append the provided outgoing extension to the the default transport options //extensions run in the order that they are provided func WithOutExtension(extension message.Extension) Option { return func(o *options) { - o.tOpts.Extensions.Out = append(o.tOpts.Extensions.Out, extension) + o.extensions.Out = append(o.extensions.Out, extension) } } @@ -96,8 +92,8 @@ func WithOutExtension(extension message.Extension) Option { //extensions run in the order that they are provided func WithExtension(inExt message.Extension, outExt message.Extension) Option { return func(o *options) { - o.tOpts.Extensions.In = append(o.tOpts.Extensions.In, inExt) - o.tOpts.Extensions.Out = append(o.tOpts.Extensions.Out, outExt) + o.extensions.In = append(o.extensions.In, inExt) + o.extensions.Out = append(o.extensions.Out, outExt) } } @@ -105,7 +101,7 @@ func WithExtension(inExt message.Extension, outExt message.Extension) Option { //extensions run in the order that they are provided func WithInExtension(extension message.Extension) Option { return func(o *options) { - o.tOpts.Extensions.In = append(o.tOpts.Extensions.In, extension) + o.extensions.In = append(o.extensions.In, extension) } } diff --git a/extensions/debug.go b/extensions/debug.go index 5041eb1..7dde847 100644 --- a/extensions/debug.go +++ b/extensions/debug.go @@ -8,7 +8,7 @@ import ( ) func debugJson(v interface{}) string { - b, _ := json.MarshalIndent(v, "", " ") + b, _ := json.MarshalIndent(v, "", " ") return string(b) } @@ -20,7 +20,7 @@ type DebugExtension struct { func NewDebugExtension(out io.Writer) *DebugExtension { li := log.New(out, "InMsg", 0) - lo := log.New(out, "outMsg", 0) + lo := log.New(out, "OutMsg", 0) return &DebugExtension{in: li, out: lo} } diff --git a/extensions/getstream.go b/extensions/getstream.go index a729664..84f435e 100644 --- a/extensions/getstream.go +++ b/extensions/getstream.go @@ -2,7 +2,6 @@ package extensions import ( "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/transport" ) type GetStream struct { @@ -18,10 +17,10 @@ func NewGetStream(apiKey string, signature string) GetStream { } } -func (gt GetStream) OutExtension(message *message.Message) { - if message.Channel == string(transport.MetaSubscribe) { +func (gt GetStream) OutExtension(msg *message.Message) { + if msg.Channel == string(message.MetaSubscribe) { //get useriID - gt.UserID = message.Subscription[1:] - message.Ext = gt + gt.UserID = msg.Subscription[1:] + msg.Ext = gt } } diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go new file mode 100644 index 0000000..d4f53d0 --- /dev/null +++ b/internal/dispatcher/dispatcher.go @@ -0,0 +1,269 @@ +package dispatcher + +import ( + "fmt" + "github.com/thesyncim/faye/internal/store" + "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/subscription" + "github.com/thesyncim/faye/transport" + "log" + "strconv" + "sync" + "sync/atomic" +) + +type dispatcher interface { + SetTransport(t transport.Transport) + //Subscribe informs the server that messages published to that channel are delivered to itself. + Subscribe(channel string) (*subscription.Subscription, error) + //Unsubscribe informs the server that the client will no longer listen to incoming event messages on + //the specified channel/subscription + Unsubscribe(sub *subscription.Subscription) error + //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event + //if this feature is supported by the server use the OnPublishResponse to get the publish status. + Publish(subscription string, message message.Data) (id string, err error) + //OnPublishResponse sets the handler to be triggered if the server replies to the publish request + //according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will + //ever be triggered + //can be used to identify the status of the published request and for example retry failed published requests + OnPublishResponse(subscription string, onMsg func(message *message.Message)) +} + +var _ dispatcher = (*Dispatcher)(nil) + +type Dispatcher struct { + endpoint string + //transports map[string]transport.Transport + transport transport.Transport + transportOpts transport.Options + + msgID *uint64 + + extensions message.Extensions + + //map requestID + pendingSubs map[string]chan error //todo wrap in structure + pendingSubsMu sync.Mutex + store *store.SubscriptionsStore + + onPublishResponseMu sync.Mutex //todo sync.Map + onPublishResponse map[string]func(message *message.Message) + + clientID string +} + +func NewDispatcher(endpoint string, tOpts transport.Options, ext message.Extensions) *Dispatcher { + var msgID uint64 + return &Dispatcher{ + endpoint: endpoint, + msgID: &msgID, + store: store.NewStore(100), + transportOpts: tOpts, + extensions: ext, + onPublishResponse: map[string]func(message *message.Message){}, + pendingSubs: map[string]chan error{}, + } +} + +//todo allow multiple transports +func (d *Dispatcher) Connect() error { + var err error + if err = d.transport.Init(d.endpoint, &d.transportOpts); err != nil { + return err + } + + if err = d.metaHandshake(); err != nil { + return err + } + return d.metaConnect() +} + +func (d *Dispatcher) metaHandshake() error { + m := &message.Message{ + Channel: message.MetaHandshake, + Version: "1.0", //todo const + SupportedConnectionTypes: []string{d.transport.Name()}, //todo list all tranports + } + d.extensions.ApplyOutExtensions(m) + handshakeResp, err := d.transport.Handshake(m) + if err != nil { + return err + } + d.extensions.ApplyInExtensions(handshakeResp) + if handshakeResp.GetError() != nil { + return err + } + d.clientID = handshakeResp.ClientId + return nil +} + +func (d *Dispatcher) metaConnect() error { + m := &message.Message{ + Channel: message.MetaConnect, + ClientId: d.clientID, + ConnectionType: d.transport.Name(), + Id: d.nextMsgID(), + } + return d.transport.Connect(m) +} + +func (d *Dispatcher) Disconnect() error { + m := &message.Message{ + Channel: message.MetaDisconnect, + ClientId: d.clientID, + Id: d.nextMsgID(), + } + return d.transport.Disconnect(m) +} + +func (d *Dispatcher) dispatchMessage(msg *message.Message) { + d.extensions.ApplyInExtensions(msg) + + if message.IsMetaMessage(msg) { + //handle it + switch msg.Channel { + case message.MetaSubscribe: + //handle MetaSubscribe resp + d.pendingSubsMu.Lock() + confirmCh, ok := d.pendingSubs[msg.Id] + d.pendingSubsMu.Unlock() + if !ok { + panic("BUG: subscription not registered `" + msg.Subscription + "`") + } + + if !msg.Successful { + if msg.GetError() == nil { + //inject the error if the server returns unsuccessful without error + msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) + } + confirmCh <- msg.GetError() + //v2 + } else { + confirmCh <- nil + } + return + } + } + //is Event Message + //there are 2 types of Event Message + // 1. Publish + // 2. Delivery + if message.IsEventDelivery(msg) { + subscriptions := d.store.Match(msg.Channel) + //send to all listeners + log.Println(subscriptions, msg) + for i := range subscriptions { + if subscriptions[i].MsgChannel() != nil { + select { + case subscriptions[i].MsgChannel() <- msg: + default: + log.Println("subscription has no listeners") //todo remove + } + } + } + return + } + + if message.IsEventPublish(msg) { + d.onPublishResponseMu.Lock() + onPublish, ok := d.onPublishResponse[msg.Channel] + d.onPublishResponseMu.Unlock() + if ok { + onPublish(msg) + } + } + +} + +func (d *Dispatcher) SetTransport(t transport.Transport) { + t.SetOnMessageReceivedHandler(d.dispatchMessage) + d.transport = t +} + +func (d *Dispatcher) nextMsgID() string { + return strconv.Itoa(int(atomic.AddUint64(d.msgID, 1))) +} + +//sendMessage send applies the out extensions and sends a message throught the transport +func (d *Dispatcher) sendMessage(m *message.Message) error { + d.extensions.ApplyOutExtensions(m) + return d.transport.SendMessage(m) +} + +func (d *Dispatcher) Subscribe(channel string) (*subscription.Subscription, error) { + id := d.nextMsgID() + m := &message.Message{ + Channel: message.MetaSubscribe, + ClientId: d.clientID, + Subscription: channel, + Id: id, + } + + if err := d.transport.SendMessage(m); err != nil { + return nil, err + } + + inMsgCh := make(chan *message.Message, 0) + subscriptionConfirmation := make(chan error) + + d.pendingSubsMu.Lock() + d.pendingSubs[id] = subscriptionConfirmation + d.pendingSubsMu.Unlock() + + sub, err := subscription.NewSubscription(channel, d.Unsubscribe, inMsgCh) + if err != nil { + return nil, err + } + + //todo timeout here + err = <-subscriptionConfirmation + if err != nil { + //log.Println(err) + return nil, err + } + d.store.Add(sub) + return sub, nil + +} + +func (d *Dispatcher) Unsubscribe(sub *subscription.Subscription) error { + //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe + d.store.Remove(sub) + //if this is last subscription we will send meta unsubscribe to the server + if d.store.Count(sub.Name()) == 0 { + d.onPublishResponseMu.Lock() + delete(d.onPublishResponse, sub.Name()) + d.onPublishResponseMu.Unlock() + + m := &message.Message{ + Channel: message.MetaUnsubscribe, + Subscription: sub.Name(), + ClientId: d.clientID, + Id: d.nextMsgID(), + } + return d.transport.SendMessage(m) + } + + return nil +} + +func (d *Dispatcher) Publish(subscription string, data message.Data) (id string, err error) { + id = d.nextMsgID() + + m := &message.Message{ + Channel: subscription, + Data: data, + ClientId: d.clientID, + Id: id, + } + if err = d.sendMessage(m); err != nil { + return "", err + } + return id, nil +} + +func (d *Dispatcher) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { + d.onPublishResponseMu.Lock() + d.onPublishResponse[subscription] = onMsg + d.onPublishResponseMu.Unlock() +} diff --git a/internal/store/name.go b/internal/store/name.go new file mode 100644 index 0000000..b7849f6 --- /dev/null +++ b/internal/store/name.go @@ -0,0 +1,40 @@ +package store + +import ( + "strings" +) + +type Name struct { + n string + patterns []string +} + +func NewName(name string) *Name { + var n Name + n.n = name + //expand once + n.patterns = n.expand() + return &n +} + +func (n *Name) Match(channel string) bool { + for i := range n.patterns { + if n.patterns[i] == channel { + return true + } + } + return false +} + +func (n *Name) expand() []string { + segments := strings.Split(n.n, "/") + num_segments := len(segments) + patterns := make([]string, num_segments+1) + patterns[0] = "/**" + for i := 1; i < len(segments); i = i + 2 { + patterns[i] = strings.Join(segments[:i+1], "/") + "/**" + } + patterns[len(patterns)-2] = strings.Join(segments[:num_segments-1], "/") + "/*" + patterns[len(patterns)-1] = n.n + return patterns +} diff --git a/internal/store/subscription.go b/internal/store/subscription.go new file mode 100644 index 0000000..3758851 --- /dev/null +++ b/internal/store/subscription.go @@ -0,0 +1,90 @@ +package store + +import ( + "github.com/thesyncim/faye/subscription" + "sync" +) + +type SubscriptionsStore struct { + mutex sync.Mutex + subs map[string][]*subscription.Subscription + + //cache for expanded channel names + cache map[string]*Name +} + +func NewStore(size int) *SubscriptionsStore { + return &SubscriptionsStore{ + subs: make(map[string][]*subscription.Subscription, size), + cache: map[string]*Name{}, + } +} + +func (s *SubscriptionsStore) Add(sub *subscription.Subscription) { + s.mutex.Lock() + s.subs[sub.Name()] = append(s.subs[sub.Name()], sub) + s.mutex.Unlock() +} + +//Match returns the subscriptions that match with the specified channel name +//Wildcard subscriptions are matched +func (s *SubscriptionsStore) Match(channel string) []*subscription.Subscription { + var ( + matches []*subscription.Subscription + name *Name + ok bool + ) + s.mutex.Lock() + if name, ok = s.cache[channel]; !ok { + name = NewName(channel) + s.cache[channel] = name + } + for _, subs := range s.subs { + for i := range subs { + if name.Match(subs[i].Name()) { + matches = append(matches, subs[i]) + } + } + } + s.mutex.Unlock() + return matches +} + +func (s *SubscriptionsStore) Remove(sub *subscription.Subscription) { + close(sub.MsgChannel()) + s.mutex.Lock() + for channel, subs := range s.subs { + for i := range subs { + if subs[i] == sub { + //delete the subscription + subs = subs[:i+copy(subs[i:], subs[i+1:])] + if len(subs) == 0 { + delete(s.subs, channel) + } + goto end + } + } + } + +end: + s.mutex.Unlock() +} + +//RemoveAll removel all subscriptions and close all channels +func (s *SubscriptionsStore) RemoveAll() { + s.mutex.Lock() + for i := range s.subs { + //close all listeners + for j := range s.subs[i] { + s.subs[i][j].Unsubscribe() + close(s.subs[i][j].MsgChannel()) + } + delete(s.subs, i) + } + s.mutex.Unlock() +} + +//Count return the number of subscriptions associated with the specified channel +func (s *SubscriptionsStore) Count(channel string) int { + return len(s.Match(channel)) +} diff --git a/internal/store/subscription_test.go b/internal/store/subscription_test.go new file mode 100644 index 0000000..8a5ab77 --- /dev/null +++ b/internal/store/subscription_test.go @@ -0,0 +1,140 @@ +package store + +import ( + "github.com/thesyncim/faye/subscription" + "reflect" + "testing" +) + +var ( + wildcardSubscription, _ = subscription.NewSubscription("a", "/wildcard/*", nil, nil, nil) + simpleSubscription, _ = subscription.NewSubscription("b", "/foo/bar", nil, nil, nil) +) + +func TestStore_Add(t *testing.T) { + type args struct { + name string + subs []*subscription.Subscription + } + + tests := []struct { + name string + s *SubscriptionsStore + args args + expected *SubscriptionsStore + }{ + + { + name: "add one", + s: NewStore(10), + args: args{ + name: "/wildcard/*", + subs: []*subscription.Subscription{wildcardSubscription}, + }, + expected: &SubscriptionsStore{ + subs: map[string][]*subscription.Subscription{ + "/wildcard/*": {wildcardSubscription}, + }, + }, + }, + { + name: "add three", + s: NewStore(10), + args: args{ + name: "/wildcard/*", + subs: []*subscription.Subscription{wildcardSubscription, wildcardSubscription, wildcardSubscription}, + }, + expected: &SubscriptionsStore{ + subs: map[string][]*subscription.Subscription{ + "/wildcard/*": {wildcardSubscription, wildcardSubscription, wildcardSubscription}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + for i := range tt.args.subs { + tt.s.Add(tt.args.subs[i]) + } + + if !reflect.DeepEqual(tt.expected, tt.s) { + t.Fatalf("expecting :%v got: %v", tt.expected, tt.s) + } + }) + } +} + +func TestStore_Match(t *testing.T) { + type args struct { + name string + } + simpleStore := NewStore(0) + simpleStore.Add(simpleSubscription) + wildcardStore := NewStore(0) + wildcardStore.Add(wildcardSubscription) + tests := []struct { + name string + s *SubscriptionsStore + args args + want []*subscription.Subscription + }{ + { + name: "match simple", + s: simpleStore, + want: []*subscription.Subscription{simpleSubscription}, + args: args{ + name: "/foo/bar", + }, + }, + { + name: "match wildcard 1", + s: wildcardStore, + want: []*subscription.Subscription{wildcardSubscription}, + args: args{ + name: "/wildcard/a", + }, + }, + { + name: "match wildcard 2", + s: wildcardStore, + want: []*subscription.Subscription{wildcardSubscription}, + args: args{ + name: "/wildcard/ccc", + }, + }, + { + name: "match non existent", + s: wildcardStore, + want: nil, + args: args{ + name: "/wildcardsadasd", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.s.Match(tt.args.name); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SubscriptionsStore.Match() = %#v, want %#v", got, tt.want) + } + }) + } +} + +func TestStore_Remove(t *testing.T) { + type args struct { + sub *subscription.Subscription + } + tests := []struct { + name string + s *SubscriptionsStore + args args + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.s.Remove(tt.args.sub) + }) + } +} diff --git a/message/message.go b/message/message.go index c5bb677..94c62ab 100644 --- a/message/message.go +++ b/message/message.go @@ -86,7 +86,9 @@ func (a *Advise) MarshalJSON() ([]byte, error) { MultipleClients bool `json:"multiple-clients,omitempty"` Hosts []string `json:"hosts,omitempty"` } + var builder bytes.Buffer + err := json.NewEncoder(&builder).Encode(jsonStruct{ Reconnect: string(a.Reconnect), Interval: int64(a.Interval / time.Millisecond), @@ -96,7 +98,6 @@ func (a *Advise) MarshalJSON() ([]byte, error) { }) return builder.Bytes(), err - } func (a *Advise) UnmarshalJSON(b []byte) error { @@ -105,18 +106,22 @@ func (a *Advise) UnmarshalJSON(b []byte) error { if err != nil { return err } + reconnect, ok := raw["reconnect"] if ok { a.Reconnect = Reconnect(reconnect.(string)) } + interval, ok := raw["interval"] if ok { a.Interval = time.Duration(interval.(float64)) * time.Millisecond } + timeout, ok := raw["timeout"] if ok { a.Timeout = time.Duration(timeout.(float64)) * time.Millisecond } + mc, ok := raw["multiple-clients"] if ok { a.MultipleClients = mc.(bool) @@ -129,3 +134,51 @@ func (a *Advise) UnmarshalJSON(b []byte) error { return nil } +func IsEventDelivery(msg *Message) bool { + if IsMetaMessage(msg) { + return false + } + if msg.Data != nil { + return true + } + return false +} + +//MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself. +type MetaMessage = string + +const ( + MetaSubscribe MetaMessage = "/meta/subscribe" + MetaConnect MetaMessage = "/meta/connect" + MetaDisconnect MetaMessage = "/meta/disconnect" + MetaUnsubscribe MetaMessage = "/meta/unsubscribe" + MetaHandshake MetaMessage = "/meta/handshake" +) + +//EventMessage are published in event messages sent from a faye client to a faye server +//and are delivered in event messages sent from a faye server to a faye client. +type EventMessage = int + +const ( + // + EventPublish EventMessage = iota + EventDelivery +) + +var metaMessages = []MetaMessage{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect} + +func IsMetaMessage(msg *Message) bool { + for i := range metaMessages { + if msg.Channel == metaMessages[i] { + return true + } + } + return false +} + +func IsEventPublish(msg *Message) bool { + if IsMetaMessage(msg) { + return false + } + return !IsEventDelivery(msg) +} diff --git a/subscription/subscription.go b/subscription/subscription.go index 994271e..0a4f4ff 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -1,73 +1,67 @@ package subscription import ( + "errors" "github.com/thesyncim/faye/message" "regexp" ) +var ErrInvalidChannelName = errors.New("invalid channel channel") + type Unsubscriber func(subscription *Subscription) error -type Publisher func(msg message.Data) (string, error) - type Subscription struct { - id string //request Subscription ID channel string - ok chan error //used by unsub Unsubscriber - pub Publisher msgCh chan *message.Message } -func NewSubscription(id string, chanel string, unsub Unsubscriber, pub Publisher, msgCh chan *message.Message, ok chan error) *Subscription { +//todo error +func NewSubscription(chanel string, unsub Unsubscriber, msgCh chan *message.Message) (*Subscription, error) { + if !IsValidSubscriptionName(chanel) { + return nil, ErrInvalidChannelName + } return &Subscription{ - pub: pub, - ok: ok, - id: id, channel: chanel, unsub: unsub, msgCh: msgCh, - } + }, nil } -func (s *Subscription) OnMessage(onMessage func(msg message.Data)) error { +func (s *Subscription) OnMessage(onMessage func(channel string, msg message.Data)) error { var inMsg *message.Message for inMsg = range s.msgCh { if inMsg.GetError() != nil { return inMsg.GetError() } - onMessage(inMsg.Data) + onMessage(inMsg.Channel, inMsg.Data) } return nil } -func (s *Subscription) ID() string { - return s.id -} - func (s *Subscription) MsgChannel() chan *message.Message { return s.msgCh } -func (s *Subscription) Channel() string { +func (s *Subscription) Name() string { return s.channel } -//todo remove -func (s *Subscription) SubscriptionResult() chan error { - return s.ok -} - +//Unsubscribe ... func (s *Subscription) Unsubscribe() error { return s.unsub(s) } -func (s *Subscription) Publish(msg message.Data) (string, error) { - return s.pub(msg) -} - +//validChannelName channel specifies is the channel is in the format /foo/432/bar var validChannelName = regexp.MustCompile(`^\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+(\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+)*$`) + var validChannelPattern = regexp.MustCompile(`^(\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+)*\/\*{1,2}$`) -func IsValidChannel(channel string) bool { +func IsValidSubscriptionName(channel string) bool { return validChannelName.MatchString(channel) || validChannelPattern.MatchString(channel) } + +//isValidPublishName +func IsValidPublishName(channel string) bool { + return validChannelName.MatchString(channel) +} diff --git a/subscription/subscription_test.go b/subscription/subscription_test.go index ed5359e..34c3837 100644 --- a/subscription/subscription_test.go +++ b/subscription/subscription_test.go @@ -1,20 +1,23 @@ package subscription -import "testing" +import ( + "testing" +) /* assertEqual( ["/**", "/foo", "/*"], - Channel.expand("/foo") ) + channel.expand("/foo") ) assertEqual( ["/**", "/foo/bar", "/foo/*", "/foo/**"], - Channel.expand("/foo/bar") ) + channel.expand("/foo/bar") ) assertEqual( ["/**", "/foo/bar/qux", "/foo/bar/*", "/foo/**", "/foo/bar/**"], */ -func TestIsValidChannel(t *testing.T) { +func TestIsValidSubscriptionName(t *testing.T) { type args struct { channel string } + tests := []struct { name string args args @@ -62,11 +65,18 @@ func TestIsValidChannel(t *testing.T) { }, want: false, }, + { + name: "asterisk before slash", + args: args{ + channel: "/foo*", + }, + want: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := IsValidChannel(tt.args.channel); got != tt.want { - t.Errorf("IsValidChannel() = %v, want %v", got, tt.want) + if got := IsValidSubscriptionName(tt.args.channel); got != tt.want { + t.Errorf("isValidChannelName() = %v, want %v", got, tt.want) } }) } diff --git a/test/client_test.go b/test/client_test.go index b358514..b68726a 100644 --- a/test/client_test.go +++ b/test/client_test.go @@ -11,6 +11,7 @@ import ( "log" "os" "os/exec" + "reflect" "runtime" "sync" "testing" @@ -21,11 +22,15 @@ var once sync.Once var unauthorizedErr = errors.New("500::unauthorized channel") +func init() { + setup(nil) +} + func setup(t *testing.T) context.CancelFunc { //jump to test dir - ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) cmd := exec.CommandContext(ctx, "npm", "start") cmd.Stdout = os.Stdout @@ -33,7 +38,7 @@ func setup(t *testing.T) context.CancelFunc { err := cmd.Start() if err != nil { - t.Fatal(err) + log.Fatal(err) } var mcancel context.CancelFunc = func() { @@ -48,9 +53,6 @@ func setup(t *testing.T) context.CancelFunc { } func TestServerSubscribeAndPublish10Messages(t *testing.T) { - shutdown := setup(t) - - defer shutdown() debug := extensions.NewDebugExtension(os.Stdout) @@ -62,10 +64,11 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { var delivered int var done sync.WaitGroup done.Add(10) - client.OnPublishResponse("/test", func(message *message.Message) { - if !message.Successful { - t.Fatalf("failed to send message with id %s", message.Id) + client.OnPublishResponse("/test", func(msg *message.Message) { + if !msg.Successful { + t.Fatalf("failed to send msg with id %s", msg.Id) } + delivered++ done.Done() }) @@ -75,7 +78,7 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { if err != nil { t.Fatal(err) } - err = sub.OnMessage(func(data message.Data) { + err = sub.OnMessage(func(channel string, data message.Data) { if data != "hello world" { t.Fatalf("expecting: `hello world` got : %s", data) } @@ -112,9 +115,6 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { } func TestSubscribeUnauthorizedChannel(t *testing.T) { - shutdown := setup(t) - - defer shutdown() debug := extensions.NewDebugExtension(os.Stdout) @@ -134,3 +134,56 @@ func TestSubscribeUnauthorizedChannel(t *testing.T) { t.Fatal("expecting error") } + +func TestWildcardSubscription(t *testing.T) { + + debug := extensions.NewDebugExtension(os.Stdout) + + client, err := NewClient("ws://localhost:8000/faye", WithExtension(debug.InExtension, debug.OutExtension)) + if err != nil { + t.Fatal(err) + } + + var done sync.WaitGroup + done.Add(20) + + sub, err := client.Subscribe("/wildcard/**") + if err != nil { + t.Fatal(err) + } + + var recv = map[string]int{} + + go func() { + err := sub.OnMessage(func(channel string, msg message.Data) { + done.Done() + recv[channel]++ + }) + if err != nil { + t.Fatal(err) + } + }() + + //give some time for setup + time.Sleep(time.Second) + + for _, channel := range []string{"/wildcard/foo", "/wildcard/bar"} { + for i := 0; i < 10; i++ { + _, err := client.Publish(channel, "hello world") + if err != nil { + t.Fatal(err) + } + } + } + + done.Wait() + + expected := map[string]int{ + "/wildcard/foo": 10, + "/wildcard/bar": 10} + + if !reflect.DeepEqual(recv, expected) { + t.Fatal(recv) + } + +} diff --git a/transport/transport.go b/transport/transport.go index f474b2b..27ccf3d 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -2,16 +2,14 @@ package transport import ( "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/subscription" "net/http" "time" ) //Options represents the connection options to be used by a transport type Options struct { - Extensions message.Extensions - Headers http.Header + Cookies http.CookieJar RetryInterval time.Duration DialDeadline time.Duration @@ -21,82 +19,33 @@ type Options struct { //Transport represents the transport to be used to comunicate with the faye server type Transport interface { - //Name returns the transport name + //name returns the transport name Name() string //Init initializes the transport with the provided options Init(endpoint string, options *Options) error //Options return the transport Options Options() *Options //Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel. - Handshake() error - //Connect is called after a client has discovered the server’s capabilities with a handshake exchange, + Handshake(msg *message.Message) (*message.Message, error) + //Init is called after a client has discovered the server’s capabilities with a handshake exchange, //a connection is established by sending a message to the /meta/connect channel - Connect() error + Connect(msg *message.Message) error //Disconnect closes all subscriptions and inform the server to remove any client-related state. //any subsequent method call to the transport object will result in undefined behaviour. - Disconnect() error - //Subscribe informs the server that messages published to that channel are delivered to itself. - Subscribe(channel string) (*subscription.Subscription, error) - //Unsubscribe informs the server that the client will no longer listen to incoming event messages on - //the specified channel/subscription - Unsubscribe(sub *subscription.Subscription) error - //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event - //if this feature is supported by the server use the OnPublishResponse to get the publish status. - Publish(subscription string, message message.Data) (id string, err error) - //OnPublishResponse sets the handler to be triggered if the server replies to the publish request - //according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will - //ever be triggered - //can be used to identify the status of the published request and for example retry failed published requests - OnPublishResponse(subscription string, onMsg func(message *message.Message)) -} + Disconnect(msg *message.Message) error + //SendMessage sens a message through the transport + SendMessage(msg *message.Message) error -//MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself. -type MetaMessage = string + SetOnMessageReceivedHandler(onMsg func(msg *message.Message)) -const ( - MetaSubscribe MetaMessage = "/meta/subscribe" - MetaConnect MetaMessage = "/meta/connect" - MetaDisconnect MetaMessage = "/meta/disconnect" - MetaUnsubscribe MetaMessage = "/meta/unsubscribe" - MetaHandshake MetaMessage = "/meta/handshake" -) + //SetOnTransportUpHandler is called when the transport is connected + SetOnTransportUpHandler(callback func()) -//EventMessage are published in event messages sent from a faye client to a faye server -//and are delivered in event messages sent from a faye server to a faye client. -type EventMessage = int + //SetOnTransportDownHandler is called when the transport goes down + SetOnTransportDownHandler(callback func(error)) -const ( - // - EventPublish EventMessage = iota - EventDelivery -) - -var metaMessages = []MetaMessage{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect} - -func IsMetaMessage(msg *message.Message) bool { - for i := range metaMessages { - if msg.Channel == metaMessages[i] { - return true - } - } - return false -} - -func IsEventDelivery(msg *message.Message) bool { - if IsMetaMessage(msg) { - return false - } - if msg.Data != nil { - return true - } - return false -} - -func IsEventPublish(msg *message.Message) bool { - if IsMetaMessage(msg) { - return false - } - return !IsEventDelivery(msg) + //handled by dispatcher + SetOnErrorHandler(onError func(err error)) } var registeredTransports = map[string]Transport{} diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 9c71647..090499c 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -1,14 +1,10 @@ package websocket import ( - "fmt" "github.com/gorilla/websocket" "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/subscription" "github.com/thesyncim/faye/transport" "log" - - "strconv" "sync" "sync/atomic" ) @@ -23,20 +19,17 @@ func init() { type Websocket struct { topts *transport.Options - connMu sync.Mutex - conn *websocket.Conn - clientID string - msgID *uint64 - once sync.Once - advice atomic.Value //type message.Advise + connMu sync.Mutex + conn *websocket.Conn + + advice atomic.Value //type message.Advise //todo move to dispatcher stopCh chan error //todo replace wth context - subscriptionsMu sync.Mutex //todo thread safe radix tree - subscriptions map[string][]*subscription.Subscription - - onPubResponseMu sync.Mutex //todo sync.Map - onPublishResponse map[string]func(message *message.Message) + onMsg func(msg *message.Message) + onError func(err error) + onTransportDown func(err error) + onTransportUp func() } var _ transport.Transport = (*Websocket)(nil) @@ -44,22 +37,30 @@ var _ transport.Transport = (*Websocket)(nil) //Init initializes the transport with the provided options func (w *Websocket) Init(endpoint string, options *transport.Options) error { var ( - err error - msgID uint64 + err error ) w.topts = options - w.msgID = &msgID - //w.subs = map[string]chan *message.Message{} - w.subscriptions = map[string][]*subscription.Subscription{} - w.onPublishResponse = map[string]func(message *message.Message){} + w.stopCh = make(chan error) w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers) if err != nil { return err } + + w.conn.SetPingHandler(func(appData string) error { + return w.conn.WriteJSON(make([]struct{}, 0)) + }) + if err != nil { + return err + } return nil } +//Init initializes the transport with the provided options +func (w *Websocket) SetOnErrorHandler(onError func(err error)) { + w.onError = onError +} + func (w *Websocket) readWorker() error { for { select { @@ -70,118 +71,27 @@ func (w *Websocket) readWorker() error { var payload []message.Message err := w.conn.ReadJSON(&payload) if err != nil { + return err } //dispatch msg := &payload[0] - w.topts.Extensions.ApplyInExtensions(msg) - - if msg.Advice != nil { - w.handleAdvise(msg.Advice) - } - - if transport.IsMetaMessage(msg) { - //handle it - switch msg.Channel { - case transport.MetaSubscribe: - //handle MetaSubscribe resp - w.subscriptionsMu.Lock() - subscriptions, ok := w.subscriptions[msg.Subscription] - if !ok { - panic("BUG: subscription not registered `" + msg.Subscription + "`") - } - if !msg.Successful { - if msg.GetError() == nil { - //inject the error if the server returns unsuccessful without error - msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) - } - var si = -1 - for i := range subscriptions { - if subscriptions[i].ID() == msg.Id { - si = i - select { - case subscriptions[i].SubscriptionResult() <- msg.GetError(): - close(subscriptions[i].MsgChannel()) - default: - log.Println("subscription has no listeners") //todo remove*/ - } - } - } - //remove subscription - if si > -1 { - subscriptions = subscriptions[:si+copy(subscriptions[si:], subscriptions[si+1:])] - } - - w.subscriptions[msg.Subscription] = subscriptions - //v2 - } else { - for i := range subscriptions { - if subscriptions[i].ID() == msg.Id { - select { - case subscriptions[i].SubscriptionResult() <- nil: - default: - log.Println("subscription has no listeners") //todo remove*/ - } - } - } - } - w.subscriptionsMu.Unlock() - - } - - continue - } - //is Event Message - //there are 2 types of Event Message - // 1. Publish - // 2. Delivery - if transport.IsEventDelivery(msg) { - w.subscriptionsMu.Lock() - subscriptions, ok := w.subscriptions[msg.Channel] - - if ok { - //send to all listeners - for i := range subscriptions { - if subscriptions[i].MsgChannel() != nil { - select { - case subscriptions[i].MsgChannel() <- msg: - default: - log.Println("subscription has no listeners") //todo remove - - } - } - } - } - w.subscriptionsMu.Unlock() - - continue - } - - if transport.IsEventPublish(msg) { - w.onPubResponseMu.Lock() - onPublish, ok := w.onPublishResponse[msg.Channel] - w.onPubResponseMu.Unlock() - if ok { - onPublish(msg) - } - } - + w.onMsg(msg) } } -//Name returns the transport name (websocket) +//name returns the transport name (websocket) func (w *Websocket) Name() string { return transportName } -func (w *Websocket) sendMessage(m *message.Message) error { +func (w *Websocket) SendMessage(m *message.Message) error { w.connMu.Lock() defer w.connMu.Unlock() - w.topts.Extensions.ApplyOutExtensions(m) var payload []message.Message payload = append(payload, *m) -again: +again: //todo move this to scheduler err := w.conn.WriteJSON(payload) if websocket.IsUnexpectedCloseError(err) { advise := w.advice.Load().(*message.Advise) @@ -194,9 +104,6 @@ again: } return nil } -func (w *Websocket) nextMsgID() string { - return strconv.Itoa(int(atomic.AddUint64(w.msgID, 1))) -} //Options return the transport Options func (w *Websocket) Options() *transport.Options { @@ -204,171 +111,47 @@ func (w *Websocket) Options() *transport.Options { } //Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel. -func (w *Websocket) Handshake() (err error) { - m := message.Message{ - Channel: transport.MetaHandshake, - Version: "1.0", //todo const - SupportedConnectionTypes: []string{transportName}, - } - err = w.sendMessage(&m) +func (w *Websocket) Handshake(msg *message.Message) (resp *message.Message, err error) { + err = w.SendMessage(msg) + if err != nil { - return err + return nil, err } var hsResps []message.Message if err = w.conn.ReadJSON(&hsResps); err != nil { - return err + return nil, err } - resp := &hsResps[0] - w.topts.Extensions.ApplyInExtensions(resp) - if resp.GetError() != nil { - return err - } - w.clientID = resp.ClientId - return nil + resp = &hsResps[0] + return resp, nil } -//Connect is called after a client has discovered the server’s capabilities with a handshake exchange, +//Init is called after a client has discovered the server’s capabilities with a handshake exchange, //a connection is established by sending a message to the /meta/connect channel -func (w *Websocket) Connect() error { - m := message.Message{ - Channel: transport.MetaConnect, - ClientId: w.clientID, - ConnectionType: transportName, - Id: w.nextMsgID(), - } - go w.readWorker() - return w.sendMessage(&m) +func (w *Websocket) Connect(msg *message.Message) error { + go func() { + log.Fatal(w.readWorker()) + }() + return w.SendMessage(msg) +} + +func (w *Websocket) SetOnTransportDownHandler(onTransportDown func(err error)) { + w.onTransportDown = onTransportDown +} + +func (w *Websocket) SetOnTransportUpHandler(onTransportUp func()) { + w.onTransportUp = onTransportUp } //Disconnect closes all subscriptions and inform the server to remove any client-related state. //any subsequent method call to the client object will result in undefined behaviour. -func (w *Websocket) Disconnect() error { - m := message.Message{ - Channel: transport.MetaDisconnect, - ClientId: w.clientID, - Id: w.nextMsgID(), - } - +func (w *Websocket) Disconnect(m *message.Message) error { w.stopCh <- nil close(w.stopCh) - w.subscriptionsMu.Lock() - for i := range w.subscriptions { - //close all listeners - for j := range w.subscriptions[i] { - close(w.subscriptions[i][j].MsgChannel()) - } - delete(w.subscriptions, i) - } - w.subscriptionsMu.Unlock() - - return w.sendMessage(&m) + return w.SendMessage(m) } -//Subscribe informs the server that messages published to that channel are delivered to itself. -func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error) { - id := w.nextMsgID() - m := &message.Message{ - Channel: transport.MetaSubscribe, - ClientId: w.clientID, - Subscription: channel, - Id: id, - } - - if err := w.sendMessage(m); err != nil { - return nil, err - } - - inMsgCh := make(chan *message.Message, 0) - subRes := make(chan error) - - var pub = func(data message.Data) (string, error) { - return w.Publish(channel, data) - } - sub := subscription.NewSubscription(id, channel, w.Unsubscribe, pub, inMsgCh, subRes) - - w.subscriptionsMu.Lock() - w.subscriptions[channel] = append(w.subscriptions[channel], sub) - w.subscriptionsMu.Unlock() - - //todo timeout here - err := <-subRes - if err != nil { - log.Println(err) - return nil, err - } - log.Println(sub) - return sub, nil -} - -//Unsubscribe informs the server that the client will no longer listen to incoming event messages on -//the specified channel/subscription -func (w *Websocket) Unsubscribe(subscription *subscription.Subscription) error { - //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe - w.subscriptionsMu.Lock() - defer w.subscriptionsMu.Unlock() - subs, ok := w.subscriptions[subscription.Channel()] - if ok { - var si = -1 - for i := range subs { - if subs[i] == subscription { - close(subs[i].MsgChannel()) - si = i - } - } - if si > -1 { - //remove the subscription - subs = subs[:si+copy(subs[si:], subs[si+1:])] - } - w.subscriptions[subscription.Channel()] = subs - //if no more listeners to this subscription send unsubscribe to server - if len(subs) == 0 { - delete(w.subscriptions, subscription.Channel()) - //remove onPublishResponse handler - w.onPubResponseMu.Lock() - delete(w.onPublishResponse, subscription.Channel()) - w.onPubResponseMu.Unlock() - m := &message.Message{ - Channel: transport.MetaUnsubscribe, - Subscription: subscription.Channel(), - ClientId: w.clientID, - Id: w.nextMsgID(), - } - return w.sendMessage(m) - } - } - - return nil -} - -//Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event -//if this feature is supported by the server use the OnPublishResponse to get the publish status. -func (w *Websocket) Publish(subscription string, data message.Data) (id string, err error) { - id = w.nextMsgID() - m := &message.Message{ - Channel: subscription, - Data: data, - ClientId: w.clientID, - Id: id, - } - if err = w.sendMessage(m); err != nil { - return "", err - } - return id, nil -} - -//OnPublishResponse sets the handler to be triggered if the server replies to the publish request -//according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will -//ever be triggered -//can be used to identify the status of the published request and for example retry failed published requests -func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { - w.onPubResponseMu.Lock() - w.onPublishResponse[subscription] = onMsg - w.onPubResponseMu.Unlock() -} - -func (w *Websocket) handleAdvise(m *message.Advise) { - //todo actually handle the advice - w.advice.Store(m) +func (w *Websocket) SetOnMessageReceivedHandler(onMsg func(*message.Message)) { + w.onMsg = onMsg }