support Publish event

This commit is contained in:
Marcelo Pires 2018-09-06 15:29:49 +02:00
parent 7bfdcc9f48
commit 01f041d2fc
3 changed files with 92 additions and 29 deletions

View File

@ -21,8 +21,8 @@ type client interface {
Disconnect() error Disconnect() error
Subscribe(subscription string, onMessage func(message message.Data)) error Subscribe(subscription string, onMessage func(message message.Data)) error
Unsubscribe(subscription string) error Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) error Publish(subscription string, message message.Data) (string, error)
//todo unsubscribe,etc OnPublishResponse(subscription string, onMsg func(message *message.Message))
} }
type Option func(*options) type Option func(*options)
@ -68,10 +68,18 @@ func (c *Client) Unsubscribe(subscription string) error {
return c.opts.transport.Unsubscribe(subscription) return c.opts.transport.Unsubscribe(subscription)
} }
func (c *Client) Publish(subscription string, data message.Data) error { func (c *Client) Publish(subscription string, data message.Data) (id string, err error) {
return c.opts.transport.Publish(subscription, data) return c.opts.transport.Publish(subscription, data)
} }
//OnPublishResponse sets the handler to be triggered if the server replies to the publish request
//according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will
//ever be triggered
//can be used to identify the status of the published request and for example retry failed published requests
func (c *Client) OnPublishResponse(subscription string, onMsg func(message *message.Message)) {
c.opts.transport.OnPublishResponse(subscription, onMsg)
}
func (c *Client) Disconnect() error { func (c *Client) Disconnect() error {
return c.opts.transport.Disconnect() return c.opts.transport.Disconnect()
} }

View File

@ -26,17 +26,29 @@ type Transport interface {
Disconnect() error Disconnect() error
Subscribe(subscription string, onMessage func(message message.Data)) error Subscribe(subscription string, onMessage func(message message.Data)) error
Unsubscribe(subscription string) error Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) error Publish(subscription string, message message.Data) (id string, err error)
//OnPublishResponse sets the handler to be triggered if the server replies to the publish request
//according to the spec the server MAY reply to the publish request, so its not guaranteed that this handler will
//ever be triggered
//can be used to identify the status of the published request and for example retry failed published requests
OnPublishResponse(subscription string, onMsg func(message *message.Message))
} }
type Meta = string type MetaMessage = string
const ( const (
MetaSubscribe Meta = "/meta/subscribe" MetaSubscribe MetaMessage = "/meta/subscribe"
MetaConnect Meta = "/meta/connect" MetaConnect MetaMessage = "/meta/connect"
MetaDisconnect Meta = "/meta/disconnect" MetaDisconnect MetaMessage = "/meta/disconnect"
MetaUnsubscribe Meta = "/meta/unsubscribe" MetaUnsubscribe MetaMessage = "/meta/unsubscribe"
MetaHandshake Meta = "/meta/handshake" MetaHandshake MetaMessage = "/meta/handshake"
)
type EventMessage = int
const (
EventPublish EventMessage = iota
EventDelivery
) )
type Reconnect = string type Reconnect = string
@ -56,17 +68,28 @@ const (
ReconnectNone Reconnect = "none" ReconnectNone Reconnect = "none"
) )
var MetaEvents = []Meta{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect} var metaMessages = []MetaMessage{MetaSubscribe, MetaConnect, MetaUnsubscribe, MetaHandshake, MetaDisconnect}
func IsMetaEvent(channel string) bool { func IsMetaMessage(msg *message.Message) bool {
for i := range MetaEvents { for i := range metaMessages {
if channel == MetaEvents[i] { if msg.Channel == metaMessages[i] {
return true return true
} }
} }
return false return false
} }
func IsEventDelivery(msg *message.Message) bool {
if msg.Data != nil {
return true
}
return false
}
func IsEventPublish(msg *message.Message) bool {
return !IsEventDelivery(msg)
}
var registeredTransports = map[string]Transport{} var registeredTransports = map[string]Transport{}
func RegisterTransport(t Transport) { func RegisterTransport(t Transport) {

View File

@ -25,10 +25,13 @@ type Websocket struct {
once sync.Once once sync.Once
advice atomic.Value //type message.Advise advice atomic.Value //type message.Advise
stopCh chan struct{} stopCh chan error
subsMu sync.Mutex //todo sync.Map subsMu sync.Mutex //todo sync.Map
subs map[string]chan *message.Message subs map[string]chan *message.Message
onPubResponseMu sync.Mutex //todo sync.Map
onPublishResponse map[string]func(message *message.Message)
} }
var _ transport.Transport = (*Websocket)(nil) var _ transport.Transport = (*Websocket)(nil)
@ -41,7 +44,7 @@ func (w *Websocket) Init(options *transport.Options) error {
w.TransportOpts = options w.TransportOpts = options
w.msgID = &msgID w.msgID = &msgID
w.subs = map[string]chan *message.Message{} w.subs = map[string]chan *message.Message{}
w.stopCh = make(chan struct{}) w.stopCh = make(chan error)
w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil)
if err != nil { if err != nil {
return err return err
@ -52,8 +55,8 @@ func (w *Websocket) Init(options *transport.Options) error {
func (w *Websocket) readWorker() error { func (w *Websocket) readWorker() error {
for { for {
select { select {
case <-w.stopCh: case err := <-w.stopCh:
return nil return err
default: default:
} }
var payload []message.Message var payload []message.Message
@ -68,7 +71,7 @@ func (w *Websocket) readWorker() error {
w.handleAdvise(msg.Advice) w.handleAdvise(msg.Advice)
} }
if transport.IsMetaEvent(msg.Channel) { if transport.IsMetaMessage(msg) {
//handle it //handle it
switch msg.Channel { switch msg.Channel {
case transport.MetaSubscribe: case transport.MetaSubscribe:
@ -104,16 +107,33 @@ func (w *Websocket) readWorker() error {
continue continue
} }
//is Event Message
//there are 2 types of Event Message
// 1. Publish
// 2. Delivery
w.subsMu.Lock() if transport.IsEventDelivery(msg) {
subscription := w.subs[msg.Channel] w.subsMu.Lock()
w.subsMu.Unlock() subscription := w.subs[msg.Channel]
w.subsMu.Unlock()
w.applyInExtensions(msg) w.applyInExtensions(msg)
if subscription != nil { if subscription != nil {
subscription <- msg subscription <- msg
}
continue
} }
if transport.IsEventPublish(msg) {
w.onPubResponseMu.Lock()
onPublish, ok := w.onPublishResponse[msg.Channel]
w.onPubResponseMu.Unlock()
if ok {
onPublish(msg)
}
}
} }
} }
@ -180,7 +200,7 @@ func (w *Websocket) Disconnect() error {
Id: w.nextMsgID(), Id: w.nextMsgID(),
} }
w.stopCh <- struct{}{} w.stopCh <- nil
close(w.stopCh) close(w.stopCh)
return w.sendMessage(&m) return w.sendMessage(&m)
@ -212,6 +232,8 @@ func (w *Websocket) Subscribe(subscription string, onMessage func(data message.D
} }
onMessage(inMsg.Data) onMessage(inMsg.Data)
} }
//we we got were means that the subscription was closed
// return nil for now
return nil return nil
} }
@ -226,14 +248,24 @@ func (w *Websocket) Unsubscribe(subscription string) error {
return w.sendMessage(m) return w.sendMessage(m)
} }
func (w *Websocket) Publish(subscription string, data message.Data) error { func (w *Websocket) Publish(subscription string, data message.Data) (id string, err error) {
id = w.nextMsgID()
m := &message.Message{ m := &message.Message{
Channel: subscription, Channel: subscription,
Data: data, Data: data,
ClientId: w.clientID, ClientId: w.clientID,
Id: w.nextMsgID(), Id: id,
} }
return w.sendMessage(m) if err = w.sendMessage(m); err != nil {
return "", err
}
return id, nil
}
func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *message.Message)) {
w.onPubResponseMu.Lock()
w.onPublishResponse[subscription] = onMsg
w.onPubResponseMu.Unlock()
} }
func (w *Websocket) applyOutExtensions(m *message.Message) { func (w *Websocket) applyOutExtensions(m *message.Message) {