From 01f041d2fcc520e53aac71453f908adf5e211417 Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Thu, 6 Sep 2018 15:29:49 +0200 Subject: [PATCH] support Publish event --- client.go | 14 ++++++-- transport/transport.go | 45 +++++++++++++++++------ transport/websocket/websocket.go | 62 ++++++++++++++++++++++++-------- 3 files changed, 92 insertions(+), 29 deletions(-) diff --git a/client.go b/client.go index 1568ac6..4c9de72 100644 --- a/client.go +++ b/client.go @@ -21,8 +21,8 @@ type client interface { Disconnect() error Subscribe(subscription string, onMessage func(message message.Data)) error Unsubscribe(subscription string) error - Publish(subscription string, message message.Data) error - //todo unsubscribe,etc + Publish(subscription string, message message.Data) (string, error) + OnPublishResponse(subscription string, onMsg func(message *message.Message)) } type Option func(*options) @@ -68,10 +68,18 @@ func (c *Client) Unsubscribe(subscription string) error { return c.opts.transport.Unsubscribe(subscription) } -func (c *Client) Publish(subscription string, data message.Data) error { +func (c *Client) Publish(subscription string, data message.Data) (id string, err error) { return c.opts.transport.Publish(subscription, data) } +//OnPublishResponse sets the handler to be triggered if the server replies to the publish request +//according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will +//ever be triggered +//can be used to identify the status of the published request and for example retry failed published requests +func (c *Client) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { + c.opts.transport.OnPublishResponse(subscription, onMsg) +} + func (c *Client) Disconnect() error { return c.opts.transport.Disconnect() } diff --git a/transport/transport.go b/transport/transport.go index d843b11..69b8e52 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -26,17 +26,29 @@ type Transport interface { Disconnect() error Subscribe(subscription string, onMessage func(message message.Data)) error Unsubscribe(subscription string) error - Publish(subscription string, message message.Data) error + Publish(subscription string, message message.Data) (id string, err error) + //OnPublishResponse sets the handler to be triggered if the server replies to the publish request + //according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will + //ever be triggered + //can be used to identify the status of the published request and for example retry failed published requests + OnPublishResponse(subscription string, onMsg func(message *message.Message)) } -type Meta = string +type MetaMessage = string const ( - MetaSubscribe Meta = "/meta/subscribe" - MetaConnect Meta = "/meta/connect" - MetaDisconnect Meta = "/meta/disconnect" - MetaUnsubscribe Meta = "/meta/unsubscribe" - MetaHandshake Meta = "/meta/handshake" + MetaSubscribe MetaMessage = "/meta/subscribe" + MetaConnect MetaMessage = "/meta/connect" + MetaDisconnect MetaMessage = "/meta/disconnect" + MetaUnsubscribe MetaMessage = "/meta/unsubscribe" + MetaHandshake MetaMessage = "/meta/handshake" +) + +type EventMessage = int + +const ( + EventPublish EventMessage = iota + EventDelivery ) type Reconnect = string @@ -56,17 +68,28 @@ const ( ReconnectNone Reconnect = "none" ) -var MetaEvents = []Meta{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect} +var metaMessages = []MetaMessage{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect} -func IsMetaEvent(channel string) bool { - for i := range MetaEvents { - if channel == MetaEvents[i] { +func IsMetaMessage(msg *message.Message) bool { + for i := range metaMessages { + if msg.Channel == metaMessages[i] { return true } } return false } +func IsEventDelivery(msg *message.Message) bool { + if msg.Data != nil { + return true + } + return false +} + +func IsEventPublish(msg *message.Message) bool { + return !IsEventDelivery(msg) +} + var registeredTransports = map[string]Transport{} func RegisterTransport(t Transport) { diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index fa937bb..c85362b 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -25,10 +25,13 @@ type Websocket struct { once sync.Once advice atomic.Value //type message.Advise - stopCh chan struct{} + stopCh chan error subsMu sync.Mutex //todo sync.Map subs map[string]chan *message.Message + + onPubResponseMu sync.Mutex //todo sync.Map + onPublishResponse map[string]func(message *message.Message) } var _ transport.Transport = (*Websocket)(nil) @@ -41,7 +44,7 @@ func (w *Websocket) Init(options *transport.Options) error { w.TransportOpts = options w.msgID = &msgID w.subs = map[string]chan *message.Message{} - w.stopCh = make(chan struct{}) + w.stopCh = make(chan error) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) if err != nil { return err @@ -52,8 +55,8 @@ func (w *Websocket) Init(options *transport.Options) error { func (w *Websocket) readWorker() error { for { select { - case <-w.stopCh: - return nil + case err := <-w.stopCh: + return err default: } var payload []message.Message @@ -68,7 +71,7 @@ func (w *Websocket) readWorker() error { w.handleAdvise(msg.Advice) } - if transport.IsMetaEvent(msg.Channel) { + if transport.IsMetaMessage(msg) { //handle it switch msg.Channel { case transport.MetaSubscribe: @@ -104,16 +107,33 @@ func (w *Websocket) readWorker() error { continue } + //is Event Message + //there are 2 types of Event Message + // 1. Publish + // 2. Delivery - w.subsMu.Lock() - subscription := w.subs[msg.Channel] - w.subsMu.Unlock() + if transport.IsEventDelivery(msg) { + w.subsMu.Lock() + subscription := w.subs[msg.Channel] + w.subsMu.Unlock() - w.applyInExtensions(msg) + w.applyInExtensions(msg) - if subscription != nil { - subscription <- msg + if subscription != nil { + subscription <- msg + } + continue } + + if transport.IsEventPublish(msg) { + w.onPubResponseMu.Lock() + onPublish, ok := w.onPublishResponse[msg.Channel] + w.onPubResponseMu.Unlock() + if ok { + onPublish(msg) + } + } + } } @@ -180,7 +200,7 @@ func (w *Websocket) Disconnect() error { Id: w.nextMsgID(), } - w.stopCh <- struct{}{} + w.stopCh <- nil close(w.stopCh) return w.sendMessage(&m) @@ -212,6 +232,8 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(data message.D } onMessage(inMsg.Data) } + //we we got were means that the subscription was closed + // return nil for now return nil } @@ -226,14 +248,24 @@ func (w *Websocket) Unsubscribe(subscription string) error { return w.sendMessage(m) } -func (w *Websocket) Publish(subscription string, data message.Data) error { +func (w *Websocket) Publish(subscription string, data message.Data) (id string, err error) { + id = w.nextMsgID() m := &message.Message{ Channel: subscription, Data: data, ClientId: w.clientID, - Id: w.nextMsgID(), + Id: id, } - return w.sendMessage(m) + if err = w.sendMessage(m); err != nil { + return "", err + } + return id, nil +} + +func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *message.Message)) { + w.onPubResponseMu.Lock() + w.onPublishResponse[subscription] = onMsg + w.onPubResponseMu.Unlock() } func (w *Websocket) applyOutExtensions(m *message.Message) {