allow multiple subscriptions to the same channel

This commit is contained in:
Marcelo Pires 2018-09-07 14:32:26 +02:00
parent e98ccfdec8
commit d8a246f014
6 changed files with 238 additions and 89 deletions

View File

@ -2,6 +2,7 @@ package fayec
import ( import (
"github.com/thesyncim/faye/message" "github.com/thesyncim/faye/message"
"github.com/thesyncim/faye/subscription"
"github.com/thesyncim/faye/transport" "github.com/thesyncim/faye/transport"
_ "github.com/thesyncim/faye/transport/websocket" _ "github.com/thesyncim/faye/transport/websocket"
) )
@ -19,8 +20,8 @@ var defaultOpts = options{
//https://faye.jcoglan.com/architecture.html //https://faye.jcoglan.com/architecture.html
type client interface { type client interface {
Disconnect() error Disconnect() error
Subscribe(subscription string, onMessage func(message message.Data)) error Subscribe(subscription string) (*subscription.Subscription, error)
Unsubscribe(subscription string) error //Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) (string, error) Publish(subscription string, message message.Data) (string, error)
OnPublishResponse(subscription string, onMsg func(message *message.Message)) OnPublishResponse(subscription string, onMsg func(message *message.Message))
} }
@ -64,14 +65,8 @@ func NewClient(url string, opts ...Option) (*Client, error) {
} }
//Subscribe informs the server that messages published to that channel are delivered to itself. //Subscribe informs the server that messages published to that channel are delivered to itself.
func (c *Client) Subscribe(subscription string, onMsg func(message message.Data)) error { func (c *Client) Subscribe(subscription string) (*subscription.Subscription, error) {
return c.opts.transport.Subscribe(subscription, onMsg) return c.opts.transport.Subscribe(subscription)
}
//Unsubscribe informs the server that the client will no longer listen to incoming event messages on
//the specified channel/subscription
func (c *Client) Unsubscribe(subscription string) error {
return c.opts.transport.Unsubscribe(subscription)
} }
//Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event

View File

@ -0,0 +1,57 @@
package subscription
import (
"github.com/thesyncim/faye/message"
)
type Unsubscriber interface {
Unsubscribe(subscription *Subscription) error
}
type Subscription struct {
id string //request Subscription ID
channel string
ok chan error //used by
unsub Unsubscriber
msgCh chan *message.Message
}
func NewSubscription(id string, chanel string, unsub Unsubscriber, msgCh chan *message.Message, ok chan error) *Subscription {
return &Subscription{
ok: ok,
id: id,
channel: chanel,
unsub: unsub,
msgCh: msgCh,
}
}
func (s *Subscription) OnMessage(onMessage func(msg message.Data)) error {
var inMsg *message.Message
for inMsg = range s.msgCh {
if inMsg.GetError() != nil {
return inMsg.GetError()
}
onMessage(inMsg.Data)
}
return nil
}
func (s *Subscription) ID() string {
return s.id
}
func (s *Subscription) MsgChannel() chan *message.Message {
return s.msgCh
}
func (s *Subscription) Channel() string {
return s.channel
}
func (s *Subscription) SubscriptionResult() chan error {
return s.ok
}
func (s *Subscription) Unsubscribe() error {
return s.unsub.Unsubscribe(s)
}

View File

@ -1,59 +1,60 @@
package faye_test package test
import ( import (
"context"
"fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/thesyncim/faye" . "github.com/thesyncim/faye"
"github.com/thesyncim/faye/extensions" "github.com/thesyncim/faye/extensions"
"github.com/thesyncim/faye/message" "github.com/thesyncim/faye/message"
"github.com/thesyncim/faye/subscription"
"log" "log"
"os" "os"
"os/exec" "os/exec"
"runtime"
"sync" "sync"
"testing" "testing"
"time" "time"
) )
type cancelFn func() var once sync.Once
var unauthorizedErr = errors.New("500::unauthorized channel") var unauthorizedErr = errors.New("500::unauthorized channel")
func setup(t *testing.T) (cancelFn, error) { func setup(t *testing.T) context.CancelFunc {
cmd := exec.Command("npm", "start")
//jump to test dir
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
cmd := exec.CommandContext(ctx,
"npm", "start")
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
err := cmd.Start() err := cmd.Start()
if err != nil { if err != nil {
return nil, err t.Fatal(err)
} }
var cancel = func() { var mcancel context.CancelFunc = func() {
cmd.Process.Signal(os.Kill)
log.Println("canceled")
}
go func() {
select {
case <-time.After(time.Second * 30):
cancel()
t.Fatal("test failed")
os.Exit(1)
if runtime.GOOS == "windows" {
exec.Command("taskkill", "/F", "/T", "/PID", fmt.Sprint(cmd.Process.Pid)).Run()
} }
}() cancel()
}
return cancel, nil return mcancel
} }
func TestServerSubscribeAndPublish10Messages(t *testing.T) { func TestServerSubscribeAndPublish10Messages(t *testing.T) {
shutdown, err := setup(t) shutdown := setup(t)
if err != nil {
t.Fatal(err)
}
defer shutdown() defer shutdown()
debug := extensions.NewDebugExtension(os.Stdout) debug := extensions.NewDebugExtension(os.Stdout)
client, err := fayec.NewClient("ws://localhost:8000/faye", fayec.WithExtension(debug.InExtension, debug.OutExtension)) client, err := NewClient("ws://localhost:8000/faye", WithExtension(debug.InExtension, debug.OutExtension))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -68,13 +69,20 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) {
delivered++ delivered++
done.Done() done.Done()
}) })
var sub *subscription.Subscription
go func() { go func() {
client.Subscribe("/test", func(data message.Data) { sub, err = client.Subscribe("/test")
if err != nil {
t.Fatal(err)
}
err = sub.OnMessage(func(data message.Data) {
if data != "hello world" { if data != "hello world" {
t.Fatalf("expecting: `hello world` got : %s", data) t.Fatalf("expecting: `hello world` got : %s", data)
} }
}) })
if err != nil {
t.Fatal(err)
}
}() }()
//give some time for setup //give some time for setup
@ -88,7 +96,7 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) {
} }
done.Wait() done.Wait()
err = client.Unsubscribe("/test") err = sub.Unsubscribe()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -104,24 +112,21 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) {
} }
func TestSubscribeUnauthorizedChannel(t *testing.T) { func TestSubscribeUnauthorizedChannel(t *testing.T) {
shutdown, err := setup(t) shutdown := setup(t)
if err != nil {
t.Fatal(err)
}
defer shutdown() defer shutdown()
debug := extensions.NewDebugExtension(os.Stdout) debug := extensions.NewDebugExtension(os.Stdout)
client, err := fayec.NewClient("ws://localhost:8000/faye", fayec.WithExtension(debug.InExtension, debug.OutExtension)) client, err := NewClient("ws://localhost:8000/faye", WithExtension(debug.InExtension, debug.OutExtension))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = client.Subscribe("/unauthorized", func(data message.Data) { _, err = client.Subscribe("/unauthorized")
t.Fatal("received message on unauthorized channel")
})
if err.Error() != unauthorizedErr.Error() { if err.Error() != unauthorizedErr.Error() {
t.Fatalf("expecting `500::unauthorized channel` got : `%s`", err.Error()) t.Fatalf("expecting `500::unauthorized channel` got : `%s`", err.Error())
} }
log.Println(err)
} }

View File

@ -11,7 +11,6 @@ var unauthorized = [
bayeux.addExtension({ bayeux.addExtension({
incoming: function (message, callback) { incoming: function (message, callback) {
if (message.channel === '/meta/subscribe') { if (message.channel === '/meta/subscribe') {
console.log(message)
if (unauthorized.indexOf(message.subscription) >= 0) { if (unauthorized.indexOf(message.subscription) >= 0) {
message.error = '500::unauthorized channel'; message.error = '500::unauthorized channel';
} }

View File

@ -2,6 +2,7 @@ package transport
import ( import (
"github.com/thesyncim/faye/message" "github.com/thesyncim/faye/message"
"github.com/thesyncim/faye/subscription"
"time" "time"
) )
@ -30,13 +31,13 @@ type Transport interface {
//a connection is established by sending a message to the /meta/connect channel //a connection is established by sending a message to the /meta/connect channel
Connect() error Connect() error
//Disconnect closes all subscriptions and inform the server to remove any client-related state. //Disconnect closes all subscriptions and inform the server to remove any client-related state.
//any subsequent method call to the client object will result in undefined behaviour. //any subsequent method call to the transport object will result in undefined behaviour.
Disconnect() error Disconnect() error
//Subscribe informs the server that messages published to that channel are delivered to itself. //Subscribe informs the server that messages published to that channel are delivered to itself.
Subscribe(subscription string, onMessage func(message message.Data)) error Subscribe(channel string) (*subscription.Subscription, error)
//Unsubscribe informs the server that the client will no longer listen to incoming event messages on //Unsubscribe informs the server that the client will no longer listen to incoming event messages on
//the specified channel/subscription //the specified channel/subscription
Unsubscribe(subscription string) error Unsubscribe(sub *subscription.Subscription) error
//Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event
//if this feature is supported by the server use the OnPublishResponse to get the publish status. //if this feature is supported by the server use the OnPublishResponse to get the publish status.
Publish(subscription string, message message.Data) (id string, err error) Publish(subscription string, message message.Data) (id string, err error)

View File

@ -4,7 +4,9 @@ import (
"fmt" "fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/thesyncim/faye/message" "github.com/thesyncim/faye/message"
"github.com/thesyncim/faye/subscription"
"github.com/thesyncim/faye/transport" "github.com/thesyncim/faye/transport"
"log"
"strconv" "strconv"
"sync" "sync"
@ -30,8 +32,11 @@ type Websocket struct {
stopCh chan error stopCh chan error
subsMu sync.Mutex //todo sync.Map //subsMu sync.Mutex //todo sync.Map
subs map[string]chan *message.Message //subs map[string]chan *message.Message
subsMu2 sync.Mutex //todo sync.Map
subs2 map[string][]*subscription.Subscription
onPubResponseMu sync.Mutex //todo sync.Map onPubResponseMu sync.Mutex //todo sync.Map
onPublishResponse map[string]func(message *message.Message) onPublishResponse map[string]func(message *message.Message)
@ -47,7 +52,8 @@ func (w *Websocket) Init(options *transport.Options) error {
) )
w.TransportOpts = options w.TransportOpts = options
w.msgID = &msgID w.msgID = &msgID
w.subs = map[string]chan *message.Message{} //w.subs = map[string]chan *message.Message{}
w.subs2 = map[string][]*subscription.Subscription{}
w.onPublishResponse = map[string]func(message *message.Message){} w.onPublishResponse = map[string]func(message *message.Message){}
w.stopCh = make(chan error) w.stopCh = make(chan error)
w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil)
@ -82,10 +88,10 @@ func (w *Websocket) readWorker() error {
switch msg.Channel { switch msg.Channel {
case transport.MetaSubscribe: case transport.MetaSubscribe:
//handle MetaSubscribe resp //handle MetaSubscribe resp
w.subsMu2.Lock()
subscriptions, ok := w.subs2[msg.Subscription]
if !msg.Successful { if !msg.Successful {
w.subsMu.Lock()
subscription, ok := w.subs[msg.Subscription]
w.subsMu.Unlock()
if !ok { if !ok {
panic("BUG: subscription not registered `" + msg.Subscription + "`") panic("BUG: subscription not registered `" + msg.Subscription + "`")
} }
@ -93,12 +99,38 @@ func (w *Websocket) readWorker() error {
//inject the error if the server returns unsuccessful without error //inject the error if the server returns unsuccessful without error
msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription)
} }
subscription <- msg var si = -1
close(subscription) for i := range subscriptions {
w.subsMu.Lock() if subscriptions[i].ID() == msg.Id {
delete(w.subs, msg.Channel) si = i
w.subsMu.Unlock() select {
case subscriptions[i].SubscriptionResult() <- msg.GetError():
close(subscriptions[i].MsgChannel())
/*default:
log.Println("subscription has no listeners") //todo remove*/
}
}
}
//remove subscription
if si > -1 {
subscriptions = subscriptions[:si+copy(subscriptions[si:], subscriptions[si+1:])]
}
w.subs2[msg.Subscription] = subscriptions
//v2
} else {
for i := range subscriptions {
if subscriptions[i].ID() == msg.Id {
select {
case subscriptions[i].SubscriptionResult() <- nil:
default:
log.Println("subscription has no listeners") //todo remove*/
}
}
}
} }
w.subsMu2.Unlock()
} }
@ -108,17 +140,25 @@ func (w *Websocket) readWorker() error {
//there are 2 types of Event Message //there are 2 types of Event Message
// 1. Publish // 1. Publish
// 2. Delivery // 2. Delivery
if transport.IsEventDelivery(msg) { if transport.IsEventDelivery(msg) {
w.subsMu.Lock() w.subsMu2.Lock()
subscription, ok := w.subs[msg.Channel] subscriptions, ok := w.subs2[msg.Channel]
w.subsMu.Unlock()
if ok { if ok {
if subscription != nil { //send to all listeners
subscription <- msg for i := range subscriptions {
if subscriptions[i].MsgChannel() != nil {
select {
case subscriptions[i].MsgChannel() <- msg:
default:
log.Println("subscription has no listeners") //todo remove
}
}
} }
} }
w.subsMu2.Unlock()
continue continue
} }
@ -193,7 +233,6 @@ func (w *Websocket) Connect() error {
ConnectionType: transportName, ConnectionType: transportName,
Id: w.nextMsgID(), Id: w.nextMsgID(),
} }
//todo expect connect resp from server
go w.readWorker() go w.readWorker()
return w.sendMessage(&m) return w.sendMessage(&m)
} }
@ -209,16 +248,54 @@ func (w *Websocket) Disconnect() error {
w.stopCh <- nil w.stopCh <- nil
close(w.stopCh) close(w.stopCh)
w.subsMu.Lock() w.subsMu2.Lock()
for i := range w.subs { for i := range w.subs2 {
close(w.subs[i]) //close all listeners
delete(w.subs, i) for j := range w.subs2[i] {
close(w.subs2[i][j].MsgChannel())
}
delete(w.subs2, i)
} }
w.subsMu.Unlock() w.subsMu2.Unlock()
return w.sendMessage(&m) return w.sendMessage(&m)
} }
//Subscribe informs the server that messages published to that channel are delivered to itself.
func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error) {
id := w.nextMsgID()
m := &message.Message{
Channel: transport.MetaSubscribe,
ClientId: w.clientID,
Subscription: channel,
Id: id,
}
if err := w.sendMessage(m); err != nil {
return nil, err
}
//todo validate
inMsgCh := make(chan *message.Message, 0)
subRes := make(chan error)
sub := subscription.NewSubscription(id, channel, w, inMsgCh, subRes)
w.subsMu2.Lock()
w.subs2[channel] = append(w.subs2[channel], sub)
w.subsMu2.Unlock()
//todo timeout here
err := <-subRes
if err != nil {
log.Println(err)
return nil, err
}
log.Println(sub)
return sub, nil
}
/*
//Subscribe informs the server that messages published to that channel are delivered to itself. //Subscribe informs the server that messages published to that channel are delivered to itself.
func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error { func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error {
m := &message.Message{ m := &message.Message{
@ -250,30 +327,45 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(data message.D
// return nil for now // return nil for now
return nil return nil
} }
*/
//Unsubscribe informs the server that the client will no longer listen to incoming event messages on //Unsubscribe informs the server that the client will no longer listen to incoming event messages on
//the specified channel/subscription //the specified channel/subscription
func (w *Websocket) Unsubscribe(subscription string) error { func (w *Websocket) Unsubscribe(subscription *subscription.Subscription) error {
//https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe //https://docs.cometd.org/current/reference/#_bayeux_meta_unsubscribe
m := &message.Message{ w.subsMu2.Lock()
Channel: transport.MetaUnsubscribe, defer w.subsMu2.Unlock()
Subscription: subscription, subs, ok := w.subs2[subscription.Channel()]
ClientId: w.clientID,
Id: w.nextMsgID(),
}
w.subsMu.Lock()
sub, ok := w.subs[subscription]
if ok { if ok {
close(sub) var si = -1
delete(w.subs, subscription) for i := range subs {
if subs[i] == subscription {
close(subs[i].MsgChannel())
si = i
}
}
if si > -1 {
//remove the subscription
subs = subs[:si+copy(subs[si:], subs[si+1:])]
}
w.subs2[subscription.Channel()] = subs
//if no more listeners to this subscription send unsubscribe to server
if len(subs) == 0 {
delete(w.subs2, subscription.Channel())
//remove onPublishResponse handler
w.onPubResponseMu.Lock()
delete(w.onPublishResponse, subscription.Channel())
w.onPubResponseMu.Unlock()
m := &message.Message{
Channel: transport.MetaUnsubscribe,
Subscription: subscription.Channel(),
ClientId: w.clientID,
Id: w.nextMsgID(),
}
return w.sendMessage(m)
}
} }
w.subsMu.Unlock()
//remove onPublishResponse handler
w.onPubResponseMu.Lock()
delete(w.onPublishResponse, subscription)
w.onPubResponseMu.Unlock()
return w.sendMessage(m) return nil
} }
//Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event