fayec/transport/websocket/websocket.go

144 lines
3.0 KiB
Go
Raw Normal View History

2018-09-04 22:55:52 +00:00
package websocket
import (
"github.com/gorilla/websocket"
"github.com/thesyncim/faye/message"
"github.com/thesyncim/faye/transport"
"strconv"
"sync/atomic"
)
2018-09-04 23:22:43 +00:00
const transportName = "websocket"
2018-09-04 22:55:52 +00:00
func init() {
transport.RegisterTransport(&Websocket{})
}
type Websocket struct {
TransportOpts *transport.Options
conn *websocket.Conn
clientID string
msgID *uint64
2018-09-04 23:56:25 +00:00
subs map[string]chan struct{}
2018-09-04 22:55:52 +00:00
}
var _ transport.Transport = (*Websocket)(nil)
func (w *Websocket) Init(options *transport.Options) error {
var (
err error
msgID uint64
)
w.TransportOpts = options
w.msgID = &msgID
2018-09-04 23:56:25 +00:00
w.subs = map[string]chan struct{}{}
2018-09-04 22:55:52 +00:00
w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil)
if err != nil {
return err
}
return nil
}
func (w *Websocket) Name() string {
2018-09-04 23:22:43 +00:00
return transportName
2018-09-04 22:55:52 +00:00
}
func (w *Websocket) nextMsgID() string {
return strconv.Itoa(int(atomic.AddUint64(w.msgID, 1)))
}
func (w *Websocket) Options() *transport.Options {
return w.TransportOpts
}
func (w *Websocket) Handshake() (err error) {
2018-09-05 08:56:45 +00:00
var payload []message.Message
if err = w.conn.WriteJSON(append(payload, message.Message{
2018-09-04 22:55:52 +00:00
Channel: string(transport.Handshake),
Version: "1.0", //todo const
2018-09-04 23:22:43 +00:00
SupportedConnectionTypes: []string{transportName},
2018-09-04 22:55:52 +00:00
})); err != nil {
return err
}
var hsResps []message.Message
2018-09-04 23:56:25 +00:00
if err = w.conn.ReadJSON(&hsResps); err != nil {
2018-09-04 22:55:52 +00:00
return err
}
resp := hsResps[0]
if resp.GetError() != nil {
return err
}
w.clientID = resp.ClientId
return nil
}
func (w *Websocket) Connect() error {
2018-09-05 08:56:45 +00:00
var payload []message.Message
2018-09-04 23:22:43 +00:00
//todo verify if extensions are applied on connect,verify if hs is complete
2018-09-05 08:56:45 +00:00
return w.conn.WriteJSON(append(payload, message.Message{
2018-09-04 23:22:43 +00:00
Channel: string(transport.Connect),
ClientId: w.clientID,
ConnectionType: transportName,
Id: w.nextMsgID(),
}))
2018-09-04 22:55:52 +00:00
}
func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error {
2018-09-04 23:56:25 +00:00
m := &message.Message{
Channel: string(transport.Subscribe),
ClientId: w.clientID,
Subscription: "/" + subscription,
Id: w.nextMsgID(),
}
if w.TransportOpts.OutExt != nil {
w.TransportOpts.OutExt(m)
}
2018-09-05 08:56:45 +00:00
var payload []message.Message
err := w.conn.WriteJSON(append(payload, *m))
2018-09-04 23:56:25 +00:00
if err != nil {
return err
}
var hsResps []message.Message
if err = w.conn.ReadJSON(&hsResps); err != nil {
return err
}
subResp := hsResps[0]
if subResp.GetError() != nil {
return err
}
if !subResp.Successful {
//report err just for sanity
}
unsubsCh := make(chan struct{}, 0)
2018-09-05 08:56:45 +00:00
2018-09-04 23:56:25 +00:00
w.subs[subscription] = unsubsCh
for {
select {
case <-unsubsCh:
return nil
default:
}
//todo guard unsusribe
2018-09-05 08:56:45 +00:00
var payload []message.Message
err := w.conn.ReadJSON(&payload)
2018-09-04 23:56:25 +00:00
if err != nil {
return err
}
2018-09-05 08:56:45 +00:00
msg := payload[0]
2018-09-04 23:56:25 +00:00
onMessage(&msg)
}
return nil
2018-09-04 22:55:52 +00:00
}
func (w *Websocket) Unsubscribe(subscription string) error {
panic("not implemented")
}
func (w *Websocket) Publish(subscription string, message *message.Message) error {
panic("not implemented")
}