diff --git a/subscription/subscription.go b/subscription/subscription.go index 7d0e5ad..466ffaa 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -4,19 +4,22 @@ import ( "github.com/thesyncim/faye/message" ) -type Unsubscriber interface { - Unsubscribe(subscription *Subscription) error -} +type Unsubscriber func(subscription *Subscription) error + +type Publisher func(msg message.Data) (string, error) + type Subscription struct { id string //request Subscription ID channel string ok chan error //used by unsub Unsubscriber + pub Publisher msgCh chan *message.Message } -func NewSubscription(id string, chanel string, unsub Unsubscriber, msgCh chan *message.Message, ok chan error) *Subscription { +func NewSubscription(id string, chanel string, unsub Unsubscriber, pub Publisher, msgCh chan *message.Message, ok chan error) *Subscription { return &Subscription{ + pub: pub, ok: ok, id: id, channel: chanel, @@ -53,5 +56,9 @@ func (s *Subscription) SubscriptionResult() chan error { } func (s *Subscription) Unsubscribe() error { - return s.unsub.Unsubscribe(s) + return s.unsub(s) +} + +func (s *Subscription) Publish(msg message.Data) (string, error) { + return s.pub(msg) } diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 4141d2c..c11f21e 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -90,11 +90,10 @@ func (w *Websocket) readWorker() error { //handle MetaSubscribe resp w.subsMu2.Lock() subscriptions, ok := w.subs2[msg.Subscription] + if !ok { + panic("BUG: subscription not registered `" + msg.Subscription + "`") + } if !msg.Successful { - - if !ok { - panic("BUG: subscription not registered `" + msg.Subscription + "`") - } if msg.GetError() == nil { //inject the error if the server returns unsuccessful without error msg.Error = fmt.Sprintf("susbscription `%s` failed", msg.Subscription) @@ -106,7 +105,7 @@ func (w *Websocket) readWorker() error { select { case subscriptions[i].SubscriptionResult() <- msg.GetError(): close(subscriptions[i].MsgChannel()) - /*default: + default: log.Println("subscription has no listeners") //todo remove*/ } } @@ -126,7 +125,6 @@ func (w *Websocket) readWorker() error { default: log.Println("subscription has no listeners") //todo remove*/ } - } } } @@ -275,11 +273,13 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error return nil, err } - //todo validate inMsgCh := make(chan *message.Message, 0) subRes := make(chan error) - sub := subscription.NewSubscription(id, channel, w, inMsgCh, subRes) + var pub = func(data message.Data) (string, error) { + return w.Publish(channel, data) + } + sub := subscription.NewSubscription(id, channel, w.Unsubscribe, pub, inMsgCh, subRes) w.subsMu2.Lock() w.subs2[channel] = append(w.subs2[channel], sub)