diff --git a/client.go b/client.go index 5659477..7870541 100644 --- a/client.go +++ b/client.go @@ -22,8 +22,10 @@ var defaultOpts = options{ type client interface { Disconnect() error Subscribe(subscription string) (*subscription.Subscription, error) - Publish(subscription string, message message.Data) (string, error) - OnPublishResponse(subscription string, onMsg func(message *message.Message)) + Publish(subscription string, message message.Data) error + + //SetOnTransportDownHandler(onTransportDown func(err error)) + //SetOnTransportUpHandler(onTransportUp func()) } //Option set the Client options, such as Transport, message extensions,etc. @@ -62,18 +64,10 @@ func (c *Client) Subscribe(subscription string) (*subscription.Subscription, err //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event //if this feature is supported by the server use the OnPublishResponse to get the publish status. -func (c *Client) Publish(subscription string, data message.Data) (id string, err error) { +func (c *Client) Publish(subscription string, data message.Data) (err error) { return c.dispatcher.Publish(subscription, data) } -//OnPublishResponse sets the handler to be triggered if the server replies to the publish request. -//According to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will -//ever be triggered. -//can be used to identify the status of the published request and for example retry failed published requests. -func (c *Client) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { - c.dispatcher.OnPublishResponse(subscription, onMsg) -} - //Disconnect closes all subscriptions and inform the server to remove any client-related state. //any subsequent method call to the client object will result in undefined behaviour. func (c *Client) Disconnect() error { diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index d4f53d0..d92ea52 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -12,25 +12,6 @@ import ( "sync/atomic" ) -type dispatcher interface { - SetTransport(t transport.Transport) - //Subscribe informs the server that messages published to that channel are delivered to itself. - Subscribe(channel string) (*subscription.Subscription, error) - //Unsubscribe informs the server that the client will no longer listen to incoming event messages on - //the specified channel/subscription - Unsubscribe(sub *subscription.Subscription) error - //Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event - //if this feature is supported by the server use the OnPublishResponse to get the publish status. - Publish(subscription string, message message.Data) (id string, err error) - //OnPublishResponse sets the handler to be triggered if the server replies to the publish request - //according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will - //ever be triggered - //can be used to identify the status of the published request and for example retry failed published requests - OnPublishResponse(subscription string, onMsg func(message *message.Message)) -} - -var _ dispatcher = (*Dispatcher)(nil) - type Dispatcher struct { endpoint string //transports map[string]transport.Transport @@ -46,8 +27,8 @@ type Dispatcher struct { pendingSubsMu sync.Mutex store *store.SubscriptionsStore - onPublishResponseMu sync.Mutex //todo sync.Map - onPublishResponse map[string]func(message *message.Message) + publishACKmu sync.Mutex + publishACK map[string]chan error clientID string } @@ -55,13 +36,13 @@ type Dispatcher struct { func NewDispatcher(endpoint string, tOpts transport.Options, ext message.Extensions) *Dispatcher { var msgID uint64 return &Dispatcher{ - endpoint: endpoint, - msgID: &msgID, - store: store.NewStore(100), - transportOpts: tOpts, - extensions: ext, - onPublishResponse: map[string]func(message *message.Message){}, - pendingSubs: map[string]chan error{}, + endpoint: endpoint, + msgID: &msgID, + store: store.NewStore(100), + transportOpts: tOpts, + extensions: ext, + publishACK: map[string]chan error{}, + pendingSubs: map[string]chan error{}, } } @@ -165,11 +146,12 @@ func (d *Dispatcher) dispatchMessage(msg *message.Message) { } if message.IsEventPublish(msg) { - d.onPublishResponseMu.Lock() - onPublish, ok := d.onPublishResponse[msg.Channel] - d.onPublishResponseMu.Unlock() + d.publishACKmu.Lock() + publishACK, ok := d.publishACK[msg.Id] + d.publishACKmu.Unlock() if ok { - onPublish(msg) + publishACK <- msg.GetError() + close(publishACK) } } @@ -231,9 +213,9 @@ func (d *Dispatcher) Unsubscribe(sub *subscription.Subscription) error { d.store.Remove(sub) //if this is last subscription we will send meta unsubscribe to the server if d.store.Count(sub.Name()) == 0 { - d.onPublishResponseMu.Lock() - delete(d.onPublishResponse, sub.Name()) - d.onPublishResponseMu.Unlock() + d.publishACKmu.Lock() + delete(d.publishACK, sub.Name()) + d.publishACKmu.Unlock() m := &message.Message{ Channel: message.MetaUnsubscribe, @@ -247,8 +229,8 @@ func (d *Dispatcher) Unsubscribe(sub *subscription.Subscription) error { return nil } -func (d *Dispatcher) Publish(subscription string, data message.Data) (id string, err error) { - id = d.nextMsgID() +func (d *Dispatcher) Publish(subscription string, data message.Data) (err error) { + id := d.nextMsgID() m := &message.Message{ Channel: subscription, @@ -256,14 +238,28 @@ func (d *Dispatcher) Publish(subscription string, data message.Data) (id string, ClientId: d.clientID, Id: id, } - if err = d.sendMessage(m); err != nil { - return "", err - } - return id, nil -} -func (d *Dispatcher) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { - d.onPublishResponseMu.Lock() - d.onPublishResponse[subscription] = onMsg - d.onPublishResponseMu.Unlock() + //ack from server + ack := make(chan error) + d.publishACKmu.Lock() + d.publishACK[id] = ack + d.publishACKmu.Unlock() + + if err = d.sendMessage(m); err != nil { + return err + } + + select { //todo timeout + case err = <-ack: + } + + d.publishACKmu.Lock() + delete(d.publishACK, id) + d.publishACKmu.Unlock() + + if err != nil { //todo retries + return err + } + + return nil } diff --git a/internal/store/name.go b/internal/store/name.go index b7849f6..97c48f9 100644 --- a/internal/store/name.go +++ b/internal/store/name.go @@ -4,20 +4,20 @@ import ( "strings" ) -type Name struct { +type SubscriptionName struct { n string patterns []string } -func NewName(name string) *Name { - var n Name +func NewName(name string) *SubscriptionName { + var n SubscriptionName n.n = name //expand once n.patterns = n.expand() return &n } -func (n *Name) Match(channel string) bool { +func (n *SubscriptionName) Match(channel string) bool { for i := range n.patterns { if n.patterns[i] == channel { return true @@ -26,7 +26,7 @@ func (n *Name) Match(channel string) bool { return false } -func (n *Name) expand() []string { +func (n *SubscriptionName) expand() []string { segments := strings.Split(n.n, "/") num_segments := len(segments) patterns := make([]string, num_segments+1) diff --git a/internal/store/subscription.go b/internal/store/subscription.go index 3758851..7a1562e 100644 --- a/internal/store/subscription.go +++ b/internal/store/subscription.go @@ -10,13 +10,13 @@ type SubscriptionsStore struct { subs map[string][]*subscription.Subscription //cache for expanded channel names - cache map[string]*Name + cache map[string]*SubscriptionName } func NewStore(size int) *SubscriptionsStore { return &SubscriptionsStore{ subs: make(map[string][]*subscription.Subscription, size), - cache: map[string]*Name{}, + cache: map[string]*SubscriptionName{}, } } @@ -31,7 +31,7 @@ func (s *SubscriptionsStore) Add(sub *subscription.Subscription) { func (s *SubscriptionsStore) Match(channel string) []*subscription.Subscription { var ( matches []*subscription.Subscription - name *Name + name *SubscriptionName ok bool ) s.mutex.Lock() diff --git a/test/client_test.go b/test/client_test.go index b68726a..eb8db1b 100644 --- a/test/client_test.go +++ b/test/client_test.go @@ -64,14 +64,7 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { var delivered int var done sync.WaitGroup done.Add(10) - client.OnPublishResponse("/test", func(msg *message.Message) { - if !msg.Successful { - t.Fatalf("failed to send msg with id %s", msg.Id) - } - delivered++ - done.Done() - }) var sub *subscription.Subscription go func() { sub, err = client.Subscribe("/test") @@ -79,6 +72,8 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { t.Fatal(err) } err = sub.OnMessage(func(channel string, data message.Data) { + delivered++ + done.Done() if data != "hello world" { t.Fatalf("expecting: `hello world` got : %s", data) } @@ -91,11 +86,11 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { //give some time for setup time.Sleep(time.Second) for i := 0; i < 10; i++ { - id, err := client.Publish("/test", "hello world") + err := client.Publish("/test", "hello world") if err != nil { t.Fatal(err) } - log.Println(id, i) + } done.Wait() @@ -104,11 +99,11 @@ func TestServerSubscribeAndPublish10Messages(t *testing.T) { t.Fatal(err) } //try to publish one more message - id, err := client.Publish("/test", "hello world") + err = client.Publish("/test", "hello world") if err != nil { t.Fatal(err) } - log.Println(id) + if delivered != 10 { t.Fatal("message received after client unsubscribe") } @@ -169,7 +164,7 @@ func TestWildcardSubscription(t *testing.T) { for _, channel := range []string{"/wildcard/foo", "/wildcard/bar"} { for i := 0; i < 10; i++ { - _, err := client.Publish(channel, "hello world") + err := client.Publish(channel, "hello world") if err != nil { t.Fatal(err) } diff --git a/transport/transport.go b/transport/transport.go index 27ccf3d..e5b21ea 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -11,6 +11,7 @@ type Options struct { Headers http.Header Cookies http.CookieJar + MaxRetries int RetryInterval time.Duration DialDeadline time.Duration ReadDeadline time.Duration