keep connected && transport error handler

This commit is contained in:
Sergei Vizel 2018-09-17 08:28:54 +03:00
parent 8583e29645
commit 88420d4737
3 changed files with 36 additions and 5 deletions

View File

@ -23,6 +23,7 @@ type client interface {
//Unsubscribe(subscription string) error //Unsubscribe(subscription string) error
Publish(subscription string, message message.Data) (string, error) Publish(subscription string, message message.Data) (string, error)
OnPublishResponse(subscription string, onMsg func(message *message.Message)) OnPublishResponse(subscription string, onMsg func(message *message.Message))
OnTransportError(onErr func(err error))
} }
//Option set the Client options, such as Transport, message extensions,etc. //Option set the Client options, such as Transport, message extensions,etc.
@ -78,6 +79,10 @@ func (c *Client) OnPublishResponse(subscription string, onMsg func(message *mess
c.opts.transport.OnPublishResponse(subscription, onMsg) c.opts.transport.OnPublishResponse(subscription, onMsg)
} }
func (c *Client) OnTransportError(onErr func(err error)) {
c.opts.transport.OnError(onErr)
}
//Disconnect closes all subscriptions and inform the server to remove any client-related state. //Disconnect closes all subscriptions and inform the server to remove any client-related state.
//any subsequent method call to the client object will result in undefined behaviour. //any subsequent method call to the client object will result in undefined behaviour.
func (c *Client) Disconnect() error { func (c *Client) Disconnect() error {

View File

@ -48,6 +48,8 @@ type Transport interface {
//ever be triggered //ever be triggered
//can be used to identify the status of the published request and for example retry failed published requests //can be used to identify the status of the published request and for example retry failed published requests
OnPublishResponse(subscription string, onMsg func(message *message.Message)) OnPublishResponse(subscription string, onMsg func(message *message.Message))
//OnError sets the handler to be triggered if some error appears
OnError(onErr func(err error))
} }
//MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself. //MetaMessage are channels commencing with the /meta/ segment ans, are the channels used by the faye protocol itself.

View File

@ -37,6 +37,7 @@ type Websocket struct {
onPubResponseMu sync.Mutex //todo sync.Map onPubResponseMu sync.Mutex //todo sync.Map
onPublishResponse map[string]func(message *message.Message) onPublishResponse map[string]func(message *message.Message)
onError func(err error)
} }
var _ transport.Transport = (*Websocket)(nil) var _ transport.Transport = (*Websocket)(nil)
@ -52,6 +53,7 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error {
//w.subs = map[string]chan *message.Message{} //w.subs = map[string]chan *message.Message{}
w.subscriptions = map[string][]*subscription.Subscription{} w.subscriptions = map[string][]*subscription.Subscription{}
w.onPublishResponse = map[string]func(message *message.Message){} w.onPublishResponse = map[string]func(message *message.Message){}
w.onError = func(err error){}
w.stopCh = make(chan error) w.stopCh = make(chan error)
w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers) w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers)
if err != nil { if err != nil {
@ -60,17 +62,17 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error {
return nil return nil
} }
func (w *Websocket) readWorker() error { func (w *Websocket) readWorker() {
for { for {
select { select {
case err := <-w.stopCh: case err := <-w.stopCh:
return err panic(err)
default: default:
} }
var payload []message.Message var payload []message.Message
err := w.conn.ReadJSON(&payload) err := w.conn.ReadJSON(&payload)
if err != nil { if err != nil {
return err panic(err)
} }
//dispatch //dispatch
msg := &payload[0] msg := &payload[0]
@ -83,6 +85,16 @@ func (w *Websocket) readWorker() error {
if transport.IsMetaMessage(msg) { if transport.IsMetaMessage(msg) {
//handle it //handle it
switch msg.Channel { switch msg.Channel {
case transport.MetaConnect:
m := message.Message{
Channel: transport.MetaConnect,
ClientId: w.clientID,
ConnectionType: transportName,
Id: w.nextMsgID(),
}
if err = w.sendMessage(&m); err != nil {
panic(err)
}
case transport.MetaSubscribe: case transport.MetaSubscribe:
//handle MetaSubscribe resp //handle MetaSubscribe resp
w.subscriptionsMu.Lock() w.subscriptionsMu.Lock()
@ -126,7 +138,6 @@ func (w *Websocket) readWorker() error {
} }
} }
w.subscriptionsMu.Unlock() w.subscriptionsMu.Unlock()
} }
continue continue
@ -238,7 +249,16 @@ func (w *Websocket) Connect() error {
ConnectionType: transportName, ConnectionType: transportName,
Id: w.nextMsgID(), Id: w.nextMsgID(),
} }
go w.readWorker()
go func () {
defer func() {
if r := recover(); r != nil {
w.onError(fmt.Errorf("%v", r))
}
}()
w.readWorker()
}()
return w.sendMessage(&m) return w.sendMessage(&m)
} }
@ -368,6 +388,10 @@ func (w *Websocket) OnPublishResponse(subscription string, onMsg func(message *m
w.onPubResponseMu.Unlock() w.onPubResponseMu.Unlock()
} }
func (w *Websocket) OnError(onErr func(err error)) {
w.onError = onErr
}
func (w *Websocket) handleAdvise(m *message.Advise) { func (w *Websocket) handleAdvise(m *message.Advise) {
//todo actually handle the advice //todo actually handle the advice
w.advice.Store(m) w.advice.Store(m)