allow subscription to publish to itself

This commit is contained in:
Marcelo Pires 2018-09-07 15:00:35 +02:00
parent d8a246f014
commit 58a5ed8a3f
2 changed files with 20 additions and 13 deletions

View File

@ -4,19 +4,22 @@ import (
"github.com/thesyncim/faye/message"
)
type Unsubscriber interface {
Unsubscribe(subscription *Subscription) error
}
type Unsubscriber func(subscription *Subscription) error
type Publisher func(msg message.Data) (string, error)
type Subscription struct {
id string //request Subscription ID
channel string
ok chan error //used by
unsub Unsubscriber
pub Publisher
msgCh chan *message.Message
}
func NewSubscription(id string, chanel string, unsub Unsubscriber, msgCh chan *message.Message, ok chan error) *Subscription {
func NewSubscription(id string, chanel string, unsub Unsubscriber, pub Publisher, msgCh chan *message.Message, ok chan error) *Subscription {
return &Subscription{
pub: pub,
ok: ok,
id: id,
channel: chanel,
@ -53,5 +56,9 @@ func (s *Subscription) SubscriptionResult() chan error {
}
func (s *Subscription) Unsubscribe() error {
return s.unsub.Unsubscribe(s)
return s.unsub(s)
}
func (s *Subscription) Publish(msg message.Data) (string, error) {
return s.pub(msg)
}

View File

@ -90,11 +90,10 @@ func (w *Websocket) readWorker() error {
//handle MetaSubscribe resp
w.subsMu2.Lock()
subscriptions, ok := w.subs2[msg.Subscription]
if !msg.Successful {
if !ok {
panic("BUG: subscription not registered `" + msg.Subscription + "`")
}
if !msg.Successful {
if msg.GetError() == nil {
//inject the error if the server returns unsuccessful without error
msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription)
@ -106,7 +105,7 @@ func (w *Websocket) readWorker() error {
select {
case subscriptions[i].SubscriptionResult() <- msg.GetError():
close(subscriptions[i].MsgChannel())
/*default:
default:
log.Println("subscription has no listeners") //todo remove*/
}
}
@ -126,7 +125,6 @@ func (w *Websocket) readWorker() error {
default:
log.Println("subscription has no listeners") //todo remove*/
}
}
}
}
@ -275,11 +273,13 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error
return nil, err
}
//todo validate
inMsgCh := make(chan *message.Message, 0)
subRes := make(chan error)
sub := subscription.NewSubscription(id, channel, w, inMsgCh, subRes)
var pub = func(data message.Data) (string, error) {
return w.Publish(channel, data)
}
sub := subscription.NewSubscription(id, channel, w.Unsubscribe, pub, inMsgCh, subRes)
w.subsMu2.Lock()
w.subs2[channel] = append(w.subs2[channel], sub)