From c89756630f4e0b99ec6cd5d7f40323fa6f5bcd93 Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Mon, 18 Sep 2023 20:58:15 -0500 Subject: [PATCH] Working authenticated connection --- client.go | 40 +++++++++++------------ examples/groupme.go | 42 +++++++++++++++++++++++++ go.mod | 8 +++++ go.sum | 6 ++++ internal/dispatcher/dispatcher.go | 13 ++++---- internal/store/subscription.go | 10 +++--- internal/store/subscription_test.go | 4 +-- subscription/subscription.go | 10 +++--- transport/transport.go | 6 ++-- transport/websocket/websocket.go | 49 ++++++++++++++++------------- 10 files changed, 126 insertions(+), 62 deletions(-) create mode 100644 examples/groupme.go create mode 100644 go.mod create mode 100644 go.sum diff --git a/client.go b/client.go index 7870541..ef35bc2 100644 --- a/client.go +++ b/client.go @@ -1,11 +1,11 @@ package fayec import ( - "github.com/thesyncim/faye/internal/dispatcher" - "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/subscription" - "github.com/thesyncim/faye/transport" - _ "github.com/thesyncim/faye/transport/websocket" + "github.com/thesyncim/fayec/internal/dispatcher" + "github.com/thesyncim/fayec/message" + "github.com/thesyncim/fayec/subscription" + "github.com/thesyncim/fayec/transport" + _ "github.com/thesyncim/fayec/transport/websocket" ) type options struct { @@ -18,7 +18,7 @@ var defaultOpts = options{ transport: transport.GetTransport("websocket"), } -//https://faye.jcoglan.com/architecture.html +// https://faye.jcoglan.com/architecture.html type client interface { Disconnect() error Subscribe(subscription string) (*subscription.Subscription, error) @@ -28,7 +28,7 @@ type client interface { //SetOnTransportUpHandler(onTransportUp func()) } -//Option set the Client options, such as Transport, message extensions,etc. +// Option set the Client options, such as Transport, message extensions,etc. type Option func(*options) var _ client = (*Client)(nil) @@ -39,7 +39,7 @@ type Client struct { dispatcher *dispatcher.Dispatcher } -//NewClient creates a new faye client with the provided options and connect to the specified url. +// NewClient creates a new faye client with the provided options and connect to the specified url. func NewClient(url string, opts ...Option) (*Client, error) { var c Client c.opts = defaultOpts @@ -57,33 +57,33 @@ func NewClient(url string, opts ...Option) (*Client, error) { return &c, nil } -//Subscribe informs the server that messages published to that channel are delivered to itself. +// Subscribe informs the server that messages published to that channel are delivered to itself. func (c *Client) Subscribe(subscription string) (*subscription.Subscription, error) { return c.dispatcher.Subscribe(subscription) } -//Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event -//if this feature is supported by the server use the OnPublishResponse to get the publish status. +// Publish publishes events on a channel by sending event messages, the server MAY respond to a publish event +// if this feature is supported by the server use the OnPublishResponse to get the publish status. func (c *Client) Publish(subscription string, data message.Data) (err error) { return c.dispatcher.Publish(subscription, data) } -//Disconnect closes all subscriptions and inform the server to remove any client-related state. -//any subsequent method call to the client object will result in undefined behaviour. +// Disconnect closes all subscriptions and inform the server to remove any client-related state. +// any subsequent method call to the client object will result in undefined behaviour. func (c *Client) Disconnect() error { return c.dispatcher.Disconnect() } -//WithOutExtension append the provided outgoing extension to the the default transport options -//extensions run in the order that they are provided +// WithOutExtension append the provided outgoing extension to the the default transport options +// extensions run in the order that they are provided func WithOutExtension(extension message.Extension) Option { return func(o *options) { o.extensions.Out = append(o.extensions.Out, extension) } } -//WithExtension append the provided incoming extension and outgoing to the list of incoming and outgoing extensions. -//extensions run in the order that they are provided +// WithExtension append the provided incoming extension and outgoing to the list of incoming and outgoing extensions. +// extensions run in the order that they are provided func WithExtension(inExt message.Extension, outExt message.Extension) Option { return func(o *options) { o.extensions.In = append(o.extensions.In, inExt) @@ -91,15 +91,15 @@ func WithExtension(inExt message.Extension, outExt message.Extension) Option { } } -//WithInExtension append the provided incoming extension to the list of incoming extensions. -//extensions run in the order that they are provided +// WithInExtension append the provided incoming extension to the list of incoming extensions. +// extensions run in the order that they are provided func WithInExtension(extension message.Extension) Option { return func(o *options) { o.extensions.In = append(o.extensions.In, extension) } } -//WithTransport sets the client transport to be used to communicate with server. +// WithTransport sets the client transport to be used to communicate with server. func WithTransport(t transport.Transport) Option { return func(o *options) { o.transport = t diff --git a/examples/groupme.go b/examples/groupme.go new file mode 100644 index 0000000..dca441f --- /dev/null +++ b/examples/groupme.go @@ -0,0 +1,42 @@ +package main + +import ( + "fmt" + "github.com/thesyncim/fayec" + "github.com/thesyncim/fayec/message" + "github.com/thesyncim/fayec/subscription" + "time" +) + +var authorizationToken = "ABC" + +var authenticationExtension message.Extension = func(message *message.Message) { + if message.Channel == "/meta/subscribe" { + message.Ext = map[string]string{ + "access_token": authorizationToken, + "timestamp": string(time.Now().Unix()), + } + } +} + +func main() { + client, err := fayec.NewClient("wss://push.groupme.com/faye", fayec.WithOutExtension(authenticationExtension)) + defer client.Disconnect() + + if err != nil { + fmt.Errorf("Error connecting to groupme", err) + return + } + var sub *subscription.Subscription + sub, err = client.Subscribe("/user/13685836") + if err != nil { + panic(err) + } + err = sub.OnMessage(func(channel string, data message.Data) { + fmt.Println(data) + }) + if err != nil { + panic(err) + } + +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..51f4e93 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/thesyncim/fayec + +go 1.21.0 + +require ( + github.com/gorilla/websocket v1.5.0 + github.com/pkg/errors v0.9.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..10ea61f --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/thesyncim/faye v0.0.0-20180924151438-11b176dcbcb9 h1:Pz1lvYNPL7e4krAyn30sMi8bWJKHH1wbl7VMBmIybws= +github.com/thesyncim/faye v0.0.0-20180924151438-11b176dcbcb9/go.mod h1:lYnF5uSFlOcMkYn8aWvtnD1Qx59LxZW2sIQgiDXNoUc= diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index d92ea52..dabcdc7 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -2,10 +2,10 @@ package dispatcher import ( "fmt" - "github.com/thesyncim/faye/internal/store" - "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/subscription" - "github.com/thesyncim/faye/transport" + "github.com/thesyncim/fayec/internal/store" + "github.com/thesyncim/fayec/message" + "github.com/thesyncim/fayec/subscription" + "github.com/thesyncim/fayec/transport" "log" "strconv" "sync" @@ -46,7 +46,7 @@ func NewDispatcher(endpoint string, tOpts transport.Options, ext message.Extensi } } -//todo allow multiple transports +// todo allow multiple transports func (d *Dispatcher) Connect() error { var err error if err = d.transport.Init(d.endpoint, &d.transportOpts); err != nil { @@ -166,7 +166,7 @@ func (d *Dispatcher) nextMsgID() string { return strconv.Itoa(int(atomic.AddUint64(d.msgID, 1))) } -//sendMessage send applies the out extensions and sends a message throught the transport +// sendMessage send applies the out extensions and sends a message throught the transport func (d *Dispatcher) sendMessage(m *message.Message) error { d.extensions.ApplyOutExtensions(m) return d.transport.SendMessage(m) @@ -180,6 +180,7 @@ func (d *Dispatcher) Subscribe(channel string) (*subscription.Subscription, erro Subscription: channel, Id: id, } + d.extensions.ApplyOutExtensions(m) if err := d.transport.SendMessage(m); err != nil { return nil, err diff --git a/internal/store/subscription.go b/internal/store/subscription.go index 7a1562e..1374423 100644 --- a/internal/store/subscription.go +++ b/internal/store/subscription.go @@ -1,7 +1,7 @@ package store import ( - "github.com/thesyncim/faye/subscription" + "github.com/thesyncim/fayec/subscription" "sync" ) @@ -26,8 +26,8 @@ func (s *SubscriptionsStore) Add(sub *subscription.Subscription) { s.mutex.Unlock() } -//Match returns the subscriptions that match with the specified channel name -//Wildcard subscriptions are matched +// Match returns the subscriptions that match with the specified channel name +// Wildcard subscriptions are matched func (s *SubscriptionsStore) Match(channel string) []*subscription.Subscription { var ( matches []*subscription.Subscription @@ -70,7 +70,7 @@ end: s.mutex.Unlock() } -//RemoveAll removel all subscriptions and close all channels +// RemoveAll removel all subscriptions and close all channels func (s *SubscriptionsStore) RemoveAll() { s.mutex.Lock() for i := range s.subs { @@ -84,7 +84,7 @@ func (s *SubscriptionsStore) RemoveAll() { s.mutex.Unlock() } -//Count return the number of subscriptions associated with the specified channel +// Count return the number of subscriptions associated with the specified channel func (s *SubscriptionsStore) Count(channel string) int { return len(s.Match(channel)) } diff --git a/internal/store/subscription_test.go b/internal/store/subscription_test.go index 8a5ab77..27c2b8a 100644 --- a/internal/store/subscription_test.go +++ b/internal/store/subscription_test.go @@ -7,8 +7,8 @@ import ( ) var ( - wildcardSubscription, _ = subscription.NewSubscription("a", "/wildcard/*", nil, nil, nil) - simpleSubscription, _ = subscription.NewSubscription("b", "/foo/bar", nil, nil, nil) + wildcardSubscription, _ = subscription.NewSubscription("a", nil, nil) + simpleSubscription, _ = subscription.NewSubscription("b", nil, nil) ) func TestStore_Add(t *testing.T) { diff --git a/subscription/subscription.go b/subscription/subscription.go index 0a4f4ff..960099e 100644 --- a/subscription/subscription.go +++ b/subscription/subscription.go @@ -2,7 +2,7 @@ package subscription import ( "errors" - "github.com/thesyncim/faye/message" + "github.com/thesyncim/fayec/message" "regexp" ) @@ -16,7 +16,7 @@ type Subscription struct { msgCh chan *message.Message } -//todo error +// todo error func NewSubscription(chanel string, unsub Unsubscriber, msgCh chan *message.Message) (*Subscription, error) { if !IsValidSubscriptionName(chanel) { return nil, ErrInvalidChannelName @@ -47,12 +47,12 @@ func (s *Subscription) Name() string { return s.channel } -//Unsubscribe ... +// Unsubscribe ... func (s *Subscription) Unsubscribe() error { return s.unsub(s) } -//validChannelName channel specifies is the channel is in the format /foo/432/bar +// validChannelName channel specifies is the channel is in the format /foo/432/bar var validChannelName = regexp.MustCompile(`^\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+(\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+)*$`) var validChannelPattern = regexp.MustCompile(`^(\/(((([a-z]|[A-Z])|[0-9])|(\-|\_|\!|\~|\(|\)|\$|\@)))+)*\/\*{1,2}$`) @@ -61,7 +61,7 @@ func IsValidSubscriptionName(channel string) bool { return validChannelName.MatchString(channel) || validChannelPattern.MatchString(channel) } -//isValidPublishName +// isValidPublishName func IsValidPublishName(channel string) bool { return validChannelName.MatchString(channel) } diff --git a/transport/transport.go b/transport/transport.go index d753dec..4a52b52 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -2,12 +2,12 @@ package transport import ( "crypto/tls" - "github.com/thesyncim/faye/message" + "github.com/thesyncim/fayec/message" "net/http" "time" ) -//Options represents the connection options to be used by a transport +// Options represents the connection options to be used by a transport type Options struct { Headers http.Header Cookies http.CookieJar @@ -20,7 +20,7 @@ type Options struct { WriteDeadline time.Duration } -//Transport represents the transport to be used to comunicate with the faye server +// Transport represents the transport to be used to comunicate with the faye server type Transport interface { //name returns the transport name Name() string diff --git a/transport/websocket/websocket.go b/transport/websocket/websocket.go index 090499c..ef5ccb0 100644 --- a/transport/websocket/websocket.go +++ b/transport/websocket/websocket.go @@ -1,10 +1,13 @@ package websocket import ( + "crypto/tls" + "encoding/json" "github.com/gorilla/websocket" - "github.com/thesyncim/faye/message" - "github.com/thesyncim/faye/transport" + "github.com/thesyncim/fayec/message" + "github.com/thesyncim/fayec/transport" "log" + "net" "sync" "sync/atomic" ) @@ -15,7 +18,7 @@ func init() { transport.RegisterTransport(&Websocket{}) } -//Websocket represents an websocket transport for the faye protocol +// Websocket represents an websocket transport for the faye protocol type Websocket struct { topts *transport.Options @@ -34,7 +37,7 @@ type Websocket struct { var _ transport.Transport = (*Websocket)(nil) -//Init initializes the transport with the provided options +// Init initializes the transport with the provided options func (w *Websocket) Init(endpoint string, options *transport.Options) error { var ( err error @@ -43,20 +46,18 @@ func (w *Websocket) Init(endpoint string, options *transport.Options) error { w.stopCh = make(chan error) w.conn, _, err = websocket.DefaultDialer.Dial(endpoint, options.Headers) + err = w.conn.UnderlyingConn().(*tls.Conn).NetConn().(*net.TCPConn).SetKeepAlive(true) if err != nil { return err } - w.conn.SetPingHandler(func(appData string) error { - return w.conn.WriteJSON(make([]struct{}, 0)) - }) if err != nil { return err } return nil } -//Init initializes the transport with the provided options +// Init initializes the transport with the provided options func (w *Websocket) SetOnErrorHandler(onError func(err error)) { w.onError = onError } @@ -67,20 +68,26 @@ func (w *Websocket) readWorker() error { case err := <-w.stopCh: return err default: - } - var payload []message.Message - err := w.conn.ReadJSON(&payload) - if err != nil { + var payload []message.Message + _, data, err := w.conn.ReadMessage() + if err != nil { + return err + } + + err = json.Unmarshal(data, &payload) + if err != nil { + return err + } + + //dispatch + msg := &payload[0] + w.onMsg(msg) - return err } - //dispatch - msg := &payload[0] - w.onMsg(msg) } } -//name returns the transport name (websocket) +// name returns the transport name (websocket) func (w *Websocket) Name() string { return transportName } @@ -105,12 +112,12 @@ again: //todo move this to scheduler return nil } -//Options return the transport Options +// Options return the transport Options func (w *Websocket) Options() *transport.Options { return w.topts } -//Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel. +// Handshake initiates a connection negotiation by sending a message to the /meta/handshake channel. func (w *Websocket) Handshake(msg *message.Message) (resp *message.Message, err error) { err = w.SendMessage(msg) @@ -144,8 +151,8 @@ func (w *Websocket) SetOnTransportUpHandler(onTransportUp func()) { w.onTransportUp = onTransportUp } -//Disconnect closes all subscriptions and inform the server to remove any client-related state. -//any subsequent method call to the client object will result in undefined behaviour. +// Disconnect closes all subscriptions and inform the server to remove any client-related state. +// any subsequent method call to the client object will result in undefined behaviour. func (w *Websocket) Disconnect(m *message.Message) error { w.stopCh <- nil close(w.stopCh)