diff --git a/client.go b/client.go index 8e7d13a..594682e 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package fayec import ( "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/subscription" "github.com/thesyncim/faye/transport" _ "github.com/thesyncim/faye/transport/websocket" ) @@ -19,8 +20,8 @@ var defaultOpts = options{ //https://faye.jcoglan.com/architecture.html type client interface { Disconnect() error - Subscribe(subscription string, onMessage func(message message.Data)) error - Unsubscribe(subscription string) error + Subscribe(subscription string) (*subscription.Subscription, error) + //Unsubscribe(subscription string) error Publish(subscription string, message message.Data) (string, error) OnPublishResponse(subscription string, onMsg func(message *message.Message)) } @@ -64,14 +65,8 @@ func NewClient(url string, opts ...Option) (*Client, error) { } //Subscribe informs the server that messages published to that channel are delivered to itself. -func (c *Client) Subscribe(subscription string, onMsg func(message message.Data)) error { - return c.opts.transport.Subscribe(subscription, onMsg) -} - -//Unsubscribe informs the server that the client will no longer listen to incoming event messages on -//the specified channel/subscription -func (c *Client) Unsubscribe(subscription string) error { - return c.opts.transport.Unsubscribe(subscription) +func (c *Client) Subscribe(subscription string) (*subscription.Subscription, error) { + return c.opts.transport.Subscribe(subscription) } //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event diff --git a/subscription/subscription.go b/subscription/subscription.go new file mode 100644 index 0000000..7d0e5ad --- /dev/null +++ b/subscription/subscription.go @@ -0,0 +1,57 @@ +package subscription + +import ( + "github.com/thesyncim/faye/message" +) + +type Unsubscriber interface { + Unsubscribe(subscription *Subscription) error +} +type Subscription struct { + id string //request Subscription ID + channel string + ok chan error //used by + unsub Unsubscriber + msgCh chan *message.Message +} + +func NewSubscription(id string, chanel string, unsub Unsubscriber, msgCh chan *message.Message, ok chan error) *Subscription { + return &Subscription{ + ok: ok, + id: id, + channel: chanel, + unsub: unsub, + msgCh: msgCh, + } +} + +func (s *Subscription) OnMessage(onMessage func(msg message.Data)) error { + var inMsg *message.Message + for inMsg = range s.msgCh { + if inMsg.GetError() != nil { + return inMsg.GetError() + } + onMessage(inMsg.Data) + } + return nil +} + +func (s *Subscription) ID() string { + return s.id +} + +func (s *Subscription) MsgChannel() chan *message.Message { + return s.msgCh +} + +func (s *Subscription) Channel() string { + return s.channel +} + +func (s *Subscription) SubscriptionResult() chan error { + return s.ok +} + +func (s *Subscription) Unsubscribe() error { + return s.unsub.Unsubscribe(s) +} diff --git a/test/setup_test.go b/test/client_test.go similarity index 60% rename from test/setup_test.go rename to test/client_test.go index 70df94b..733d96d 100644 --- a/test/setup_test.go +++ b/test/client_test.go @@ -1,59 +1,60 @@ -package faye_test +package test import ( + "context" + "fmt" "github.com/pkg/errors" - "github.com/thesyncim/faye" + . "github.com/thesyncim/faye" "github.com/thesyncim/faye/extensions" "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/subscription" "log" "os" "os/exec" + "runtime" "sync" "testing" "time" ) -type cancelFn func() +var once sync.Once var unauthorizedErr = errors.New("500::unauthorized channel") -func setup(t *testing.T) (cancelFn, error) { - cmd := exec.Command("npm", "start") +func setup(t *testing.T) context.CancelFunc { + + //jump to test dir + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + cmd := exec.CommandContext(ctx, + "npm", "start") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Start() if err != nil { - return nil, err + t.Fatal(err) } - var cancel = func() { - cmd.Process.Signal(os.Kill) - log.Println("canceled") - } - go func() { - select { - case <-time.After(time.Second * 30): - cancel() - t.Fatal("test failed") - os.Exit(1) + var mcancel context.CancelFunc = func() { + if runtime.GOOS == "windows" { + exec.Command("taskkill", "/F", "/T", "/PID", fmt.Sprint(cmd.Process.Pid)).Run() } - }() + cancel() + } - return cancel, nil + return mcancel } func TestServerSubscribeAndPublish10Messages(t *testing.T) { - shutdown, err := setup(t) - if err != nil { - t.Fatal(err) - } + shutdown := setup(t) + defer shutdown() debug := extensions.NewDebugExtension(os.Stdout) - client, err := fayec.NewClient("ws://localhost:8000/faye", fayec.WithExtension(debug.InExtension, debug.OutExtension)) + client, err := NewClient("ws://localhost:8000/faye", WithExtension(debug.InExtension, debug.OutExtension)) if err != nil { t.Fatal(err) } @@ -68,13 +69,20 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { delivered++ done.Done() }) - + var sub *subscription.Subscription go func() { - client.Subscribe("/test", func(data message.Data) { + sub, err = client.Subscribe("/test") + if err != nil { + t.Fatal(err) + } + err = sub.OnMessage(func(data message.Data) { if data != "hello world" { t.Fatalf("expecting: `hello world` got : %s", data) } }) + if err != nil { + t.Fatal(err) + } }() //give some time for setup @@ -88,7 +96,7 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { } done.Wait() - err = client.Unsubscribe("/test") + err = sub.Unsubscribe() if err != nil { t.Fatal(err) } @@ -104,24 +112,21 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { } func TestSubscribeUnauthorizedChannel(t *testing.T) { - shutdown, err := setup(t) - if err != nil { - t.Fatal(err) - } + shutdown := setup(t) + defer shutdown() debug := extensions.NewDebugExtension(os.Stdout) - client, err := fayec.NewClient("ws://localhost:8000/faye", fayec.WithExtension(debug.InExtension, debug.OutExtension)) + client, err := NewClient("ws://localhost:8000/faye", WithExtension(debug.InExtension, debug.OutExtension)) if err != nil { t.Fatal(err) } - err = client.Subscribe("/unauthorized", func(data message.Data) { - t.Fatal("received message on unauthorized channel") - }) + _, err = client.Subscribe("/unauthorized") if err.Error() != unauthorizedErr.Error() { t.Fatalf("expecting `500::unauthorized channel` got : `%s`", err.Error()) } + log.Println(err) } diff --git a/test/server.js b/test/server.js index 554153b..e03f3fa 100644 --- a/test/server.js +++ b/test/server.js @@ -11,7 +11,6 @@ var unauthorized = [ bayeux.addExtension({ incoming: function (message, callback) { if (message.channel === '/meta/subscribe') { - console.log(message) if (unauthorized.indexOf(message.subscription) >= 0) { message.error = '500::unauthorized channel'; } diff --git a/transport/transport.go b/transport/transport.go index e12fdf6..86a1cd9 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -2,6 +2,7 @@ package transport import ( "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/subscription" "time" ) @@ -30,13 +31,13 @@ type Transport interface { //a connection is established by sending a message to the /meta/connect channel Connect() error //Disconnect closes all subscriptions and inform the server to remove any client-related state. - //any subsequent method call to the client object will result in undefined behaviour. + //any subsequent method call to the transport object will result in undefined behaviour. Disconnect() error //Subscribe informs the server that messages published to that channel are delivered to itself. - Subscribe(subscription string, onMessage func(message message.Data)) error + Subscribe(channel string) (*subscription.Subscription, error) //Unsubscribe informs the server that the client will no longer listen to incoming event messages on //the specified channel/subscription - Unsubscribe(subscription string) error + Unsubscribe(sub *subscription.Subscription) error //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event //if this feature is supported by the server use the OnPublishResponse to get the publish status. Publish(subscription string, message message.Data) (id string, err error) diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 5767a36..4141d2c 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -4,7 +4,9 @@ import ( "fmt" "github.com/gorilla/websocket" "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/subscription" "github.com/thesyncim/faye/transport" + "log" "strconv" "sync" @@ -30,8 +32,11 @@ type Websocket struct { stopCh chan error - subsMu sync.Mutex //todo sync.Map - subs map[string]chan *message.Message + //subsMu sync.Mutex //todo sync.Map + //subs map[string]chan *message.Message + + subsMu2 sync.Mutex //todo sync.Map + subs2 map[string][]*subscription.Subscription onPubResponseMu sync.Mutex //todo sync.Map onPublishResponse map[string]func(message *message.Message) @@ -47,7 +52,8 @@ func (w *Websocket) Init(options *transport.Options) error { ) w.TransportOpts = options w.msgID = &msgID - w.subs = map[string]chan *message.Message{} + //w.subs = map[string]chan *message.Message{} + w.subs2 = map[string][]*subscription.Subscription{} w.onPublishResponse = map[string]func(message *message.Message){} w.stopCh = make(chan error) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) @@ -82,10 +88,10 @@ func (w *Websocket) readWorker() error { switch msg.Channel { case transport.MetaSubscribe: //handle MetaSubscribe resp + w.subsMu2.Lock() + subscriptions, ok := w.subs2[msg.Subscription] if !msg.Successful { - w.subsMu.Lock() - subscription, ok := w.subs[msg.Subscription] - w.subsMu.Unlock() + if !ok { panic("BUG: subscription not registered `" + msg.Subscription + "`") } @@ -93,12 +99,38 @@ func (w *Websocket) readWorker() error { //inject the error if the server returns unsuccessful without error msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) } - subscription <- msg - close(subscription) - w.subsMu.Lock() - delete(w.subs, msg.Channel) - w.subsMu.Unlock() + var si = -1 + for i := range subscriptions { + if subscriptions[i].ID() == msg.Id { + si = i + select { + case subscriptions[i].SubscriptionResult() <- msg.GetError(): + close(subscriptions[i].MsgChannel()) + /*default: + log.Println("subscription has no listeners") //todo remove*/ + } + } + } + //remove subscription + if si > -1 { + subscriptions = subscriptions[:si+copy(subscriptions[si:], subscriptions[si+1:])] + } + + w.subs2[msg.Subscription] = subscriptions + //v2 + } else { + for i := range subscriptions { + if subscriptions[i].ID() == msg.Id { + select { + case subscriptions[i].SubscriptionResult() <- nil: + default: + log.Println("subscription has no listeners") //todo remove*/ + } + + } + } } + w.subsMu2.Unlock() } @@ -108,17 +140,25 @@ func (w *Websocket) readWorker() error { //there are 2 types of Event Message // 1. Publish // 2. Delivery - if transport.IsEventDelivery(msg) { - w.subsMu.Lock() - subscription, ok := w.subs[msg.Channel] - w.subsMu.Unlock() + w.subsMu2.Lock() + subscriptions, ok := w.subs2[msg.Channel] if ok { - if subscription != nil { - subscription <- msg + //send to all listeners + for i := range subscriptions { + if subscriptions[i].MsgChannel() != nil { + select { + case subscriptions[i].MsgChannel() <- msg: + default: + log.Println("subscription has no listeners") //todo remove + + } + } } + } + w.subsMu2.Unlock() continue } @@ -193,7 +233,6 @@ func (w *Websocket) Connect() error { ConnectionType: transportName, Id: w.nextMsgID(), } - //todo expect connect resp from server go w.readWorker() return w.sendMessage(&m) } @@ -209,16 +248,54 @@ func (w *Websocket) Disconnect() error { w.stopCh <- nil close(w.stopCh) - w.subsMu.Lock() - for i := range w.subs { - close(w.subs[i]) - delete(w.subs, i) + w.subsMu2.Lock() + for i := range w.subs2 { + //close all listeners + for j := range w.subs2[i] { + close(w.subs2[i][j].MsgChannel()) + } + delete(w.subs2, i) } - w.subsMu.Unlock() + w.subsMu2.Unlock() return w.sendMessage(&m) } +//Subscribe informs the server that messages published to that channel are delivered to itself. +func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error) { + id := w.nextMsgID() + m := &message.Message{ + Channel: transport.MetaSubscribe, + ClientId: w.clientID, + Subscription: channel, + Id: id, + } + + if err := w.sendMessage(m); err != nil { + return nil, err + } + + //todo validate + inMsgCh := make(chan *message.Message, 0) + subRes := make(chan error) + + sub := subscription.NewSubscription(id, channel, w, inMsgCh, subRes) + + w.subsMu2.Lock() + w.subs2[channel] = append(w.subs2[channel], sub) + w.subsMu2.Unlock() + + //todo timeout here + err := <-subRes + if err != nil { + log.Println(err) + return nil, err + } + log.Println(sub) + return sub, nil +} + +/* //Subscribe informs the server that messages published to that channel are delivered to itself. func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error { m := &message.Message{ @@ -250,30 +327,45 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(data message.D // return nil for now return nil } - +*/ //Unsubscribe informs the server that the client will no longer listen to incoming event messages on //the specified channel/subscription -func (w *Websocket) Unsubscribe(subscription string) error { +func (w *Websocket) Unsubscribe(subscription *subscription.Subscription) error { //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe - m := &message.Message{ - Channel: transport.MetaUnsubscribe, - Subscription: subscription, - ClientId: w.clientID, - Id: w.nextMsgID(), - } - w.subsMu.Lock() - sub, ok := w.subs[subscription] + w.subsMu2.Lock() + defer w.subsMu2.Unlock() + subs, ok := w.subs2[subscription.Channel()] if ok { - close(sub) - delete(w.subs, subscription) + var si = -1 + for i := range subs { + if subs[i] == subscription { + close(subs[i].MsgChannel()) + si = i + } + } + if si > -1 { + //remove the subscription + subs = subs[:si+copy(subs[si:], subs[si+1:])] + } + w.subs2[subscription.Channel()] = subs + //if no more listeners to this subscription send unsubscribe to server + if len(subs) == 0 { + delete(w.subs2, subscription.Channel()) + //remove onPublishResponse handler + w.onPubResponseMu.Lock() + delete(w.onPublishResponse, subscription.Channel()) + w.onPubResponseMu.Unlock() + m := &message.Message{ + Channel: transport.MetaUnsubscribe, + Subscription: subscription.Channel(), + ClientId: w.clientID, + Id: w.nextMsgID(), + } + return w.sendMessage(m) + } } - w.subsMu.Unlock() - //remove onPublishResponse handler - w.onPubResponseMu.Lock() - delete(w.onPublishResponse, subscription) - w.onPubResponseMu.Unlock() - return w.sendMessage(m) + return nil } //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event