fayec/transport/websocket/websocket.go

170 lines
3.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package websocket
import (
"crypto/tls"
"encoding/json"
"gitea.watsonlabs.net/watsonb8/fayec/message"
"gitea.watsonlabs.net/watsonb8/fayec/transport"
"github.com/gorilla/websocket"
"log"
"net"
"sync"
"sync/atomic"
)
const transportName = "websocket"
func init() {
transport.RegisterTransport(&Websocket{})
}
// Websocket represents an websocket transport for the faye protocol
type Websocket struct {
topts *transport.Options
connMu sync.Mutex
conn *websocket.Conn
advice atomic.Value //type message.Advise //todo move to dispatcher
stopCh chan error //todo replace wth context
onMsg func(msg *message.Message)
onError func(err error)
onTransportDown func(err error)
onTransportUp func()
}
var _ transport.Transport = (*Websocket)(nil)
// Init initializes the transport with the provided options
func (w *Websocket) Init(endpoint string, options *transport.Options) error {
var (
err error
)
w.topts = options
w.stopCh = make(chan error)
w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers)
if err != nil {
return err
}
err = w.conn.UnderlyingConn().(*tls.Conn).NetConn().(*net.TCPConn).SetKeepAlive(true)
if err != nil {
return err
}
if err != nil {
return err
}
return nil
}
// Init initializes the transport with the provided options
func (w *Websocket) SetOnErrorHandler(onError func(err error)) {
w.onError = onError
}
func (w *Websocket) readWorker() error {
for {
select {
case err := <-w.stopCh:
return err
default:
var payload []message.Message
_, data, err := w.conn.ReadMessage()
if err != nil {
return err
}
err = json.Unmarshal(data, &payload)
if err != nil {
return err
}
//dispatch
msg := &payload[0]
w.onMsg(msg)
}
}
}
// name returns the transport name (websocket)
func (w *Websocket) Name() string {
return transportName
}
func (w *Websocket) SendMessage(m *message.Message) error {
w.connMu.Lock()
defer w.connMu.Unlock()
var payload []message.Message
payload = append(payload, *m)
again: //todo move this to scheduler
err := w.conn.WriteJSON(payload)
if websocket.IsUnexpectedCloseError(err) {
advise := w.advice.Load().(*message.Advise)
if advise.Reconnect == message.ReconnectNone {
return err
}
//reconnect
//we should re-register again subscriptions again
goto again
}
return nil
}
// Options return the transport Options
func (w *Websocket) Options() *transport.Options {
return w.topts
}
// Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel.
func (w *Websocket) Handshake(msg *message.Message) (resp *message.Message, err error) {
err = w.SendMessage(msg)
if err != nil {
return nil, err
}
var hsResps []message.Message
if err = w.conn.ReadJSON(&hsResps); err != nil {
return nil, err
}
resp = &hsResps[0]
return resp, nil
}
// Init is called after a client has discovered the servers capabilities with a handshake exchange,
// a connection is established by sending a message to the /meta/connect channel
func (w *Websocket) Connect(msg *message.Message) error {
go func() {
err := w.readWorker()
log.Fatal(err)
}()
return w.SendMessage(msg)
}
func (w *Websocket) SetOnTransportDownHandler(onTransportDown func(err error)) {
w.onTransportDown = onTransportDown
}
func (w *Websocket) SetOnTransportUpHandler(onTransportUp func()) {
w.onTransportUp = onTransportUp
}
// Disconnect closes all subscriptions and inform the server to remove any client-related state.
// any subsequent method call to the client object will result in undefined behaviour.
func (w *Websocket) Disconnect(m *message.Message) error {
w.stopCh <- nil
close(w.stopCh)
return w.SendMessage(m)
}
func (w *Websocket) SetOnMessageReceivedHandler(onMsg func(*message.Message)) {
w.onMsg = onMsg
}