fayec/transport/websocket/websocket.go

158 lines
3.6 KiB
Go
Raw Normal View History

2018-09-04 22:55:52 +00:00
package websocket
import (
"github.com/gorilla/websocket"
"github.com/thesyncim/faye/message"
"github.com/thesyncim/faye/transport"
"log"
2018-09-05 12:56:32 +00:00
"sync"
2018-09-04 22:55:52 +00:00
"sync/atomic"
)
2018-09-04 23:22:43 +00:00
const transportName = "websocket"
2018-09-04 22:55:52 +00:00
func init() {
transport.RegisterTransport(&Websocket{})
}
2018-09-06 14:31:05 +00:00
//Websocket represents an websocket transport for the faye protocol
2018-09-04 22:55:52 +00:00
type Websocket struct {
topts *transport.Options
2018-09-06 15:59:56 +00:00
connMu sync.Mutex
conn *websocket.Conn
2018-09-05 12:56:32 +00:00
advice atomic.Value //type message.Advise //todo move to dispatcher
stopCh chan error //todo replace wth context
2018-09-06 13:29:49 +00:00
onMsg func(msg *message.Message)
onError func(err error)
onTransportDown func(err error)
onTransportUp func()
2018-09-04 22:55:52 +00:00
}
var _ transport.Transport = (*Websocket)(nil)
2018-09-06 14:31:05 +00:00
//Init initializes the transport with the provided options
func (w *Websocket) Init(endpoint string, options *transport.Options) error {
2018-09-04 22:55:52 +00:00
var (
err error
2018-09-04 22:55:52 +00:00
)
w.topts = options
2018-09-06 13:29:49 +00:00
w.stopCh = make(chan error)
2018-09-11 11:30:07 +00:00
w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers)
2018-09-04 22:55:52 +00:00
if err != nil {
return err
}
w.conn.SetPingHandler(func(appData string) error {
return w.conn.WriteJSON(make([]struct{}, 0))
})
if err != nil {
return err
}
2018-09-04 22:55:52 +00:00
return nil
}
2018-09-05 12:56:32 +00:00
//Init initializes the transport with the provided options
func (w *Websocket) SetOnErrorHandler(onError func(err error)) {
w.onError = onError
}
2018-09-05 12:56:32 +00:00
func (w *Websocket) readWorker() error {
for {
select {
2018-09-06 13:29:49 +00:00
case err := <-w.stopCh:
return err
default:
}
2018-09-06 09:16:36 +00:00
var payload []message.Message
2018-09-05 12:56:32 +00:00
err := w.conn.ReadJSON(&payload)
if err != nil {
2018-09-05 12:56:32 +00:00
return err
}
//dispatch
msg := &payload[0]
w.onMsg(msg)
2018-09-05 12:56:32 +00:00
}
}
//name returns the transport name (websocket)
2018-09-04 22:55:52 +00:00
func (w *Websocket) Name() string {
2018-09-04 23:22:43 +00:00
return transportName
2018-09-04 22:55:52 +00:00
}
2018-09-05 12:56:32 +00:00
func (w *Websocket) SendMessage(m *message.Message) error {
2018-09-06 15:59:56 +00:00
w.connMu.Lock()
defer w.connMu.Unlock()
2018-09-05 12:56:32 +00:00
var payload []message.Message
payload = append(payload, *m)
2018-09-07 16:31:52 +00:00
again: //todo move this to scheduler
2018-09-07 16:31:52 +00:00
err := w.conn.WriteJSON(payload)
if websocket.IsUnexpectedCloseError(err) {
advise := w.advice.Load().(*message.Advise)
if advise.Reconnect == message.ReconnectNone {
return err
}
//reconnect
//we should re-register again subscriptions again
goto again
}
return nil
2018-09-05 12:56:32 +00:00
}
2018-09-04 22:55:52 +00:00
2018-09-06 14:31:05 +00:00
//Options return the transport Options
2018-09-04 22:55:52 +00:00
func (w *Websocket) Options() *transport.Options {
return w.topts
2018-09-04 22:55:52 +00:00
}
2018-09-05 12:56:32 +00:00
2018-09-06 14:31:05 +00:00
//Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel.
func (w *Websocket) Handshake(msg *message.Message) (resp *message.Message, err error) {
err = w.SendMessage(msg)
2018-09-05 12:56:32 +00:00
if err != nil {
return nil, err
2018-09-04 22:55:52 +00:00
}
var hsResps []message.Message
2018-09-04 23:56:25 +00:00
if err = w.conn.ReadJSON(&hsResps); err != nil {
return nil, err
2018-09-04 22:55:52 +00:00
}
resp = &hsResps[0]
return resp, nil
2018-09-04 22:55:52 +00:00
}
//Init is called after a client has discovered the servers capabilities with a handshake exchange,
2018-09-06 14:31:05 +00:00
//a connection is established by sending a message to the /meta/connect channel
func (w *Websocket) Connect(msg *message.Message) error {
go func() {
log.Fatal(w.readWorker())
}()
return w.SendMessage(msg)
2018-09-04 22:55:52 +00:00
}
func (w *Websocket) SetOnTransportDownHandler(onTransportDown func(err error)) {
w.onTransportDown = onTransportDown
}
func (w *Websocket) SetOnTransportUpHandler(onTransportUp func()) {
w.onTransportUp = onTransportUp
2018-09-06 13:29:49 +00:00
}
//Disconnect closes all subscriptions and inform the server to remove any client-related state.
//any subsequent method call to the client object will result in undefined behaviour.
func (w *Websocket) Disconnect(m *message.Message) error {
w.stopCh <- nil
close(w.stopCh)
return w.SendMessage(m)
2018-09-04 22:55:52 +00:00
}
func (w *Websocket) SetOnMessageReceivedHandler(onMsg func(*message.Message)) {
w.onMsg = onMsg
2018-09-06 11:23:53 +00:00
}