From b8d1d7dc3dea2800b8cdc81a63d2057772a3732b Mon Sep 17 00:00:00 2001 From: Marcelo Pires Date: Wed, 5 Sep 2018 00:55:52 +0200 Subject: [PATCH] initial API definition --- client.go | 48 ++++++++++++++++++ message/message.go | 35 +++++++++++++ transport/transport.go | 43 ++++++++++++++++ transport/websocket/websocket.go | 84 ++++++++++++++++++++++++++++++++ 4 files changed, 210 insertions(+) create mode 100644 client.go create mode 100644 message/message.go create mode 100644 transport/transport.go create mode 100644 transport/websocket/websocket.go diff --git a/client.go b/client.go new file mode 100644 index 0000000..f5e02eb --- /dev/null +++ b/client.go @@ -0,0 +1,48 @@ +package faye + +import ( + "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/transport" +) + +type options struct { + inExtension message.Extension + outExtension message.Extension + transport transport.Transport +} + +var defaultOpts = options{ + transport: transport.GetTransport("websocket"), +} + +//https://faye.jcoglan.com/architecture.html +type client interface { + Subscribe(subscription string, onMsg func(message *message.Message)) error + Publish(subscription string, message *message.Message) error + //todo unsubscribe,etc +} + +type Option func(*options) + +var _ client = (*Client)(nil) + +type Client struct { + opts options +} + +func NewClient(url string, opts ...Option) (*Client, error) { + var c Client + c.opts = defaultOpts + for _, opt := range opts { + opt(&c.opts) + } + return &c, nil +} + +func (c *Client) Subscribe(subscription string, onMsg func(message *message.Message)) error { + panic("not implemented") +} + +func (c *Client) Publish(subscription string, message *message.Message) error { + panic("not implemented") +} diff --git a/message/message.go b/message/message.go new file mode 100644 index 0000000..67a0fa5 --- /dev/null +++ b/message/message.go @@ -0,0 +1,35 @@ +package message + +import "errors" + +type Extension func(message *Message) + +type Message struct { + Channel string `json:"channel,omitempty"` + Version string `json:"version,omitempty"` + SupportedConnectionTypes []string `json:"supportedConnectionTypes,omitempty"` + MinimumVersion string `json:"minimumVersion,omitempty"` + Successful bool `json:"successful,omitempty"` + Ext interface{} `json:"ext,omitempty"` + Id string `json:"id,omitempty"` + ClientId string `json:"clientId,omitempty"` + Advice Advise `json:"advice,omitempty"` + Data interface{} `json:"data,omitempty"` + Timestamp uint64 `json:"timestamp,omitempty"` + AuthSuccessful bool `json:"authSuccessful,omitempty"` + Error string `json:"error,omitempty"` + Subscription string `json:"subscription,omitempty"` +} + +func (m *Message) GetError() error { + if m.Error == "" { + return nil + } + return errors.New(m.Error) +} + +type Advise struct { + Reconnect string `json:"reconnect,omitempty"` + Interval int64 `json:"interval,omitempty"` + Timeout int64 `json:"timeout,omitempty"` +} diff --git a/transport/transport.go b/transport/transport.go new file mode 100644 index 0000000..80fcd8a --- /dev/null +++ b/transport/transport.go @@ -0,0 +1,43 @@ +package transport + +import "github.com/thesyncim/faye/message" + +// handshake, connect, disconnect, subscribe, unsubscribe and publish + +type Options struct { + Url string + InExt message.Extension + outExt message.Extension + //todo dial timeout + //todo read/write deadline +} + +type Transport interface { + Name() string + Init(options *Options) error + Options() *Options + Handshake() error + Connect() error + Subscribe(subscription string, onMessage func(message *message.Message)) error + Unsubscribe(subscription string) error + Publish(subscription string, message *message.Message) error +} + +type Event string + +const ( + Subscribe Event = "/meta/subscribe" + Unsubscribe Event = "/meta/unsubscribe" + Handshake Event = "/meta/handshake" + Disconnect Event = "/meta/disconnect" +) + +var registeredTransports = map[string]Transport{} + +func RegisterTransport(t Transport) { + registeredTransports[t.Name()] = t //todo validate +} + +func GetTransport(name string) Transport { + return registeredTransports[name] +} diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go new file mode 100644 index 0000000..4834f68 --- /dev/null +++ b/transport/websocket/websocket.go @@ -0,0 +1,84 @@ +package websocket + +import ( + "github.com/gorilla/websocket" + "github.com/thesyncim/faye/message" + "github.com/thesyncim/faye/transport" + "strconv" + "sync/atomic" +) + +func init() { + transport.RegisterTransport(&Websocket{}) +} + +type Websocket struct { + TransportOpts *transport.Options + conn *websocket.Conn + clientID string + msgID *uint64 +} + +var _ transport.Transport = (*Websocket)(nil) + +func (w *Websocket) Init(options *transport.Options) error { + var ( + err error + msgID uint64 + ) + + w.TransportOpts = options + w.msgID = &msgID + w.conn, _, err = websocket.DefaultDialer.Dial(options.Url, nil) + if err != nil { + return err + } + return nil +} +func (w *Websocket) Name() string { + return "websocket" +} +func (w *Websocket) nextMsgID() string { + return strconv.Itoa(int(atomic.AddUint64(w.msgID, 1))) +} + +func (w *Websocket) Options() *transport.Options { + return w.TransportOpts +} +func (w *Websocket) Handshake() (err error) { + if err = w.conn.WriteJSON(append(nil, message.Message{ + Channel: string(transport.Handshake), + Version: "1.0", //todo const + SupportedConnectionTypes: []string{"websocket"}, + })); err != nil { + return err + } + + var hsResps []message.Message + if err = w.conn.ReadJSON(hsResps); err != nil { + return err + } + + resp := hsResps[0] + if resp.GetError() != nil { + return err + } + w.clientID = resp.ClientId + return nil +} + +func (w *Websocket) Connect() error { + panic("not implemented") +} + +func (w *Websocket) Subscribe(subscription string, onMessage func(message *message.Message)) error { + panic("not implemented") +} + +func (w *Websocket) Unsubscribe(subscription string) error { + panic("not implemented") +} + +func (w *Websocket) Publish(subscription string, message *message.Message) error { + panic("not implemented") +}