diff --git a/client.go b/client.go index 915e885..839a60e 100644 --- a/client.go +++ b/client.go @@ -58,14 +58,14 @@ func NewClient(url string, opts ...Option) (*Client, error) { } // Subscribe informs the server that messages published to that channel are delivered to itself. -func (c *Client) Subscribe(subscription string) (*subscription.Subscription, error) { - return c.dispatcher.Subscribe(subscription) +func (c *Client) Subscribe(channel string) (*subscription.Subscription, error) { + return c.dispatcher.Subscribe(channel) } // Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event // if this feature is supported by the server use the OnPublishResponse to get the publish status. -func (c *Client) Publish(subscription string, data message.Data) (err error) { - return c.dispatcher.Publish(subscription, data) +func (c *Client) Publish(channel string, data message.Data) (err error) { + return c.dispatcher.Publish(channel, data) } // Disconnect closes all subscriptions and inform the server to remove any client-related state. diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index a385f37..0db734e 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -22,6 +22,8 @@ type Dispatcher struct { extensions message.Extensions + advice atomic.Value + //map requestID pendingSubs map[string]chan error //todo wrap in structure pendingSubsMu sync.Mutex @@ -103,6 +105,19 @@ func (d *Dispatcher) dispatchMessage(msg *message.Message) { if message.IsMetaMessage(msg) { //handle it switch msg.Channel { + case message.MetaConnect: + if msg.Advice.Reconnect == message.ReconnectNone { + d.Disconnect() + log.Println("Websocket terminated") + return + } + log.Println("Websocket terminated: reconnecting") + d.Connect() + subsList := d.store.GetAll() + for i := range subsList { + sub := subsList[i] + d.Subscribe(sub.Name()) + } case message.MetaSubscribe: //handle MetaSubscribe resp d.pendingSubsMu.Lock() diff --git a/internal/store/subscription.go b/internal/store/subscription.go index 2176300..ee9aba0 100644 --- a/internal/store/subscription.go +++ b/internal/store/subscription.go @@ -84,6 +84,19 @@ func (s *SubscriptionsStore) RemoveAll() { s.mutex.Unlock() } +func (s *SubscriptionsStore) GetAll() []*subscription.Subscription { + s.mutex.Lock() + subsList := make([]*subscription.Subscription, 0) + for i := range s.subs { + //close all listeners + for j := range s.subs[i] { + subsList = append(subsList, s.subs[i][j]) + } + } + s.mutex.Unlock() + return subsList +} + // Count return the number of subscriptions associated with the specified channel func (s *SubscriptionsStore) Count(channel string) int { return len(s.Match(channel)) diff --git a/internal/store/subscription_test.go b/internal/store/subscription_test.go index 27c2b8a..5924a14 100644 --- a/internal/store/subscription_test.go +++ b/internal/store/subscription_test.go @@ -1,7 +1,7 @@ package store import ( - "github.com/thesyncim/faye/subscription" + "gitea.watsonlabs.net/watsonb8/fayec/subscription" "reflect" "testing" ) diff --git a/subscription/subscription.go b/subscription/subscription.go index 314291c..a41052e 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -30,12 +30,16 @@ func NewSubscription(chanel string, unsub Unsubscriber, msgCh chan *message.Mess func (s *Subscription) OnMessage(onMessage func(channel string, msg message.Data)) error { var inMsg *message.Message - for inMsg = range s.msgCh { - if inMsg.GetError() != nil { - return inMsg.GetError() + go func() error { + for inMsg = range s.msgCh { + if inMsg.GetError() != nil { + return inMsg.GetError() + } + onMessage(inMsg.Channel, inMsg.Data) } - onMessage(inMsg.Channel, inMsg.Data) - } + return nil + }() + return nil } diff --git a/test/client_test.go b/test/client_test.go index eb8db1b..cc5c4dd 100644 --- a/test/client_test.go +++ b/test/client_test.go @@ -3,11 +3,11 @@ package test import ( "context" "fmt" + . "gitea.watsonlabs.net/watsonb8/fayec" + "gitea.watsonlabs.net/watsonb8/fayec/extensions" + "gitea.watsonlabs.net/watsonb8/fayec/message" + "gitea.watsonlabs.net/watsonb8/fayec/subscription" "github.com/pkg/errors" - . "github.com/thesyncim/faye" - "github.com/thesyncim/faye/extensions" - "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/subscription" "log" "os" "os/exec"