first step to handle advise

This commit is contained in:
Marcelo Pires 2018-09-07 18:31:52 +02:00
parent 0fce9349d0
commit 23124a7de8

View File

@ -181,10 +181,21 @@ func (w *Websocket) sendMessage(m *message.Message) error {
w.connMu.Lock() w.connMu.Lock()
defer w.connMu.Unlock() defer w.connMu.Unlock()
w.applyOutExtensions(m) w.applyOutExtensions(m)
var payload []message.Message var payload []message.Message
payload = append(payload, *m) payload = append(payload, *m)
return w.conn.WriteJSON(payload)
again:
err := w.conn.WriteJSON(payload)
if websocket.IsUnexpectedCloseError(err) {
advise := w.advice.Load().(*message.Advise)
if advise.Reconnect == message.ReconnectNone {
return err
}
//reconnect
//we should re-register again subscriptions again
goto again
}
return nil
} }
func (w *Websocket) nextMsgID() string { func (w *Websocket) nextMsgID() string {
return strconv.Itoa(int(atomic.AddUint64(w.msgID, 1))) return strconv.Itoa(int(atomic.AddUint64(w.msgID, 1)))
@ -294,39 +305,6 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error
return sub, nil return sub, nil
} }
/*
//Subscribe informs the server that messages published to that channel are delivered to itself.
func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error {
m := &message.Message{
Channel: transport.MetaSubscribe,
ClientId: w.clientID,
Subscription: subscription,
Id: w.nextMsgID(),
}
if err := w.sendMessage(m); err != nil {
return err
}
//todo validate
inMsgCh := make(chan *message.Message, 0)
w.subsMu.Lock()
w.subs[subscription] = inMsgCh
w.subsMu.Unlock()
var inMsg *message.Message
for inMsg = range inMsgCh {
if inMsg.GetError() != nil {
return inMsg.GetError()
}
onMessage(inMsg.Data)
}
//we we got were means that the subscription was closed
// return nil for now
return nil
}
*/
//Unsubscribe informs the server that the client will no longer listen to incoming event messages on //Unsubscribe informs the server that the client will no longer listen to incoming event messages on
//the specified channel/subscription //the specified channel/subscription
func (w *Websocket) Unsubscribe(subscription *subscription.Subscription) error { func (w *Websocket) Unsubscribe(subscription *subscription.Subscription) error {