diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 0528ebd..7597158 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -181,10 +181,21 @@ func (w *Websocket) sendMessage(m *message.Message) error { w.connMu.Lock() defer w.connMu.Unlock() w.applyOutExtensions(m) - var payload []message.Message payload = append(payload, *m) - return w.conn.WriteJSON(payload) + +again: + err := w.conn.WriteJSON(payload) + if websocket.IsUnexpectedCloseError(err) { + advise := w.advice.Load().(*message.Advise) + if advise.Reconnect == message.ReconnectNone { + return err + } + //reconnect + //we should re-register again subscriptions again + goto again + } + return nil } func (w *Websocket) nextMsgID() string { return strconv.Itoa(int(atomic.AddUint64(w.msgID, 1))) @@ -294,39 +305,6 @@ func (w *Websocket) Subscribe(channel string) (*subscription.Subscription, error return sub, nil } -/* -//Subscribe informs the server that messages published to that channel are delivered to itself. -func (w *Websocket) Subscribe(subscription string, onMessage func(data message.Data)) error { - m := &message.Message{ - Channel: transport.MetaSubscribe, - ClientId: w.clientID, - Subscription: subscription, - Id: w.nextMsgID(), - } - - if err := w.sendMessage(m); err != nil { - return err - } - - //todo validate - inMsgCh := make(chan *message.Message, 0) - - w.subsMu.Lock() - w.subs[subscription] = inMsgCh - w.subsMu.Unlock() - - var inMsg *message.Message - for inMsg = range inMsgCh { - if inMsg.GetError() != nil { - return inMsg.GetError() - } - onMessage(inMsg.Data) - } - //we we got were means that the subscription was closed - // return nil for now - return nil -} -*/ //Unsubscribe informs the server that the client will no longer listen to incoming event messages on //the specified channel/subscription func (w *Websocket) Unsubscribe(subscription *subscription.Subscription) error {