From 3e9851b2c08cdffbf3e0565ee1269100b89b0faf Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Mon, 18 Sep 2023 21:34:54 -0500 Subject: [PATCH] Usinc fayec client instead of wray --- examples/real_time_updates/main.go | 70 ++---------- go.mod | 6 +- go.sum | 5 + real_time.go | 174 +++++++++++++---------------- 4 files changed, 101 insertions(+), 154 deletions(-) diff --git a/examples/real_time_updates/main.go b/examples/real_time_updates/main.go index 34e8664..eacca88 100644 --- a/examples/real_time_updates/main.go +++ b/examples/real_time_updates/main.go @@ -5,8 +5,6 @@ import ( "fmt" "log" - "github.com/karmanyaahm/wray" - "github.com/beeper/groupme-lib" ) @@ -15,73 +13,29 @@ import ( var authorizationToken = "ABCD" -// This adapts your faye library to an interface compatible with this library -type FayeClient struct { - *wray.FayeClient -} - -func (fc FayeClient) WaitSubscribe(channel string, msgChannel chan groupme.PushMessage) { - c_new := make(chan wray.Message) - fc.FayeClient.WaitSubscribe(channel, c_new) - //converting between types because channels don't support interfaces well - go func() { - for i := range c_new { - msgChannel <- i - } - }() -} - -// for authentication, specific implementation will vary based on faye library -type AuthExt struct{} - -func (a *AuthExt) In(wray.Message) {} -func (a *AuthExt) Out(m wray.Message) { - groupme.OutMsgProc(m) -} - -// specific to faye library -type fayeLogger struct{} - -func (l fayeLogger) Infof(f string, a ...interface{}) { - log.Printf("[INFO] : "+f, a...) -} -func (l fayeLogger) Errorf(f string, a ...interface{}) { - log.Printf("[ERROR] : "+f, a...) -} -func (l fayeLogger) Debugf(f string, a ...interface{}) { - log.Printf("[DEBUG] : "+f, a...) -} -func (l fayeLogger) Warnf(f string, a ...interface{}) { - log.Printf("[WARN] : "+f, a...) -} - // A short program that subscribes to 2 groups and 2 direct chats // and prints out all recognized events in those func main() { - - //Create and initialize fayeclient - fc := FayeClient{wray.NewFayeClient(groupme.PushServer)} - fc.SetLogger(fayeLogger{}) - fc.AddExtension(&AuthExt{}) - //for additional logging uncomment the following line - //fc.AddExtension(fc.FayeClient) - //create push subscription and start listening p := groupme.NewPushSubscription(context.Background()) - go p.StartListening(context.TODO(), fc) + err := p.Connect(context.TODO(), authorizationToken) + if err != nil { + return + } // Create a new client with your auth token client := groupme.NewClient(authorizationToken) User, _ := client.MyUser(context.Background()) - //Subscribe to get messages and events for the specific user - err := p.SubscribeToUser(context.Background(), User.ID, authorizationToken) - if err != nil { - log.Fatal(err) - } //handles (in this case prints) all messages p.AddFullHandler(Handler{User: User}) + //Subscribe to get messages and events for the specific user + err = p.SubscribeToUser(context.Background(), User.ID) + if err != nil { + log.Fatal(err) + } + // Get the groups your user is part of groups, err := client.IndexGroups( context.Background(), @@ -97,7 +51,7 @@ func main() { } //Subscribe to those groups for _, j := range groups { - err = p.SubscribeToGroup(context.TODO(), j.ID, authorizationToken) + err = p.SubscribeToGroup(context.TODO(), j.ID) if err != nil { log.Fatal(err) } @@ -111,7 +65,7 @@ func main() { }) //subscribe to all those chats for _, j := range chats { - err = p.SubscribeToDM(context.TODO(), j.LastMessage.ConversationID, authorizationToken) + err = p.SubscribeToDM(context.TODO(), j.LastMessage.ConversationID) if err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 9040090..a34fded 100755 --- a/go.mod +++ b/go.mod @@ -1,10 +1,12 @@ module github.com/beeper/groupme-lib -go 1.15 +go 1.21.0 require ( + gitea.watsonlabs.net/watsonb8/fayec v0.0.0-20230919020138-8f0db7048755 github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 - github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14 github.com/stretchr/testify v1.7.0 ) + +require github.com/gorilla/websocket v1.5.0 // indirect diff --git a/go.sum b/go.sum index bc7d5e6..6655d5c 100755 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +gitea.watsonlabs.net/watsonb8/fayec v0.0.0-20230919020138-8f0db7048755 h1:FEhNSjSNvZ+nVg5Z3ds6X8ys3qjM+mmyLTSqKhCUHuQ= +gitea.watsonlabs.net/watsonb8/fayec v0.0.0-20230919020138-8f0db7048755/go.mod h1:gv8CWMq6dFJQhH30u8bO3u4k2irKlclZktLNYDebQ/0= github.com/autogrowsystems/wray v0.0.0-20160519030252-f36984f6648c/go.mod h1:druJ8QMeBCUmwJ7ZSFowx77dWxEWF3SYlQlsqZaLZQg= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -7,10 +9,13 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14 h1:NrATjZKvkY+ojL8FXTWa3fQ+wihFrAxLNE6T+wOkIcY= github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14/go.mod h1:ysD86MIEevmAkdfdg5s6Qt3I07RN6fvMAyna7jCGG2o= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= diff --git a/real_time.go b/real_time.go index 0f59ca5..7e7b8fc 100644 --- a/real_time.go +++ b/real_time.go @@ -3,20 +3,20 @@ package groupme import ( "context" "errors" + "gitea.watsonlabs.net/watsonb8/fayec" + "gitea.watsonlabs.net/watsonb8/fayec/message" + "gitea.watsonlabs.net/watsonb8/fayec/subscription" "log" "strings" "sync" "time" - - "github.com/karmanyaahm/wray" ) const ( - PushServer = "https://push.groupme.com/faye" - userChannel = "/user/" - groupChannel = "/group/" - dmChannel = "/direct_message/" - subscribeChannel = "/meta/subscribe" + PushServer = "wss://push.groupme.com/faye" + userChannel = "/user/" + groupChannel = "/group/" + dmChannel = "/direct_message/" ) var ( @@ -27,10 +27,6 @@ var ( var concur = sync.Mutex{} var token string -func init() { - wray.RegisterTransports([]wray.Transport{&wray.HTTPTransport{}}) -} - type HandlerAll interface { Handler @@ -63,7 +59,7 @@ type HandlerMembership interface { HandleJoin(ID) } -//Group Handlers +// Group Handlers type HandleGroupTopic interface { HandleGroupTopic(group ID, newTopic string) } @@ -78,7 +74,7 @@ type HandleGroupLikeIcon interface { HandleLikeIcon(group ID, PackID, PackIndex int, Type string) } -//Group member handlers +// Group member handlers type HandleMemberNewNickname interface { HandleNewNickname(group ID, user ID, newName string) } @@ -91,33 +87,19 @@ type HandleMembers interface { HandleMembers(group ID, members []Member, added bool) } -type PushMessage interface { - Channel() string - Data() map[string]interface{} - Ext() map[string]interface{} - Error() string -} - -type FayeClient interface { - //Listen starts a blocking listen loop - Listen() - //WaitSubscribe is a blocking/synchronous subscribe method - WaitSubscribe(channel string, msgChannel chan PushMessage) -} - -//PushSubscription manages real time subscription +// PushSubscription manages real time subscription type PushSubscription struct { - channel chan PushMessage - fayeClient FayeClient + channel chan message.Data + client *fayec.Client handlers []Handler LastConnected int64 } -//NewPushSubscription creates and returns a push subscription object +// NewPushSubscription creates and returns a push subscription object func NewPushSubscription(context context.Context) PushSubscription { r := PushSubscription{ - channel: make(chan PushMessage), + channel: make(chan message.Data), } return r @@ -127,7 +109,7 @@ func (r *PushSubscription) AddHandler(h Handler) { r.handlers = append(r.handlers, h) } -//AddFullHandler is the same as AddHandler except it ensures the interface implements everything +// AddFullHandler is the same as AddHandler except it ensures the interface implements everything func (r *PushSubscription) AddFullHandler(h HandlerAll) { r.handlers = append(r.handlers, h) } @@ -135,74 +117,78 @@ func (r *PushSubscription) AddFullHandler(h HandlerAll) { var RealTimeHandlers map[string]func(r *PushSubscription, channel string, data ...interface{}) var RealTimeSystemHandlers map[string]func(r *PushSubscription, channel string, id ID, rawData []byte) -//Listen connects to GroupMe. Runs in Goroutine. -func (r *PushSubscription) StartListening(context context.Context, client FayeClient) { - r.fayeClient = client - - go r.fayeClient.Listen() - - go func() { - for msg := range r.channel { - r.LastConnected = time.Now().Unix() - data := msg.Data() - content := data["subject"] - contentType := data["type"].(string) - channel := msg.Channel() - - handler, ok := RealTimeHandlers[contentType] - if !ok { - if contentType == "ping" || - len(contentType) == 0 || - content == nil { - continue - } - log.Println("Unable to handle GroupMe message type", contentType) +// Listen connects to GroupMe. Runs in Goroutine. +func (r *PushSubscription) Connect(context context.Context, authorizationToken string) error { + token = authorizationToken + var authenticationExtension message.Extension = func(message *message.Message) { + if message.Channel == "/meta/subscribe" { + message.Ext = map[string]string{ + "access_token": authorizationToken, + "timestamp": string(time.Now().Unix()), } - - handler(r, channel, content) } - }() -} - -//SubscribeToUser to users -func (r *PushSubscription) SubscribeToUser(context context.Context, id ID, authToken string) error { - return r.subscribeWithPrefix(userChannel, context, id, authToken) -} - -//SubscribeToGroup to groups for typing notification -func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID, authToken string) error { - return r.subscribeWithPrefix(groupChannel, context, id, authToken) -} - -//SubscribeToDM to users -func (r *PushSubscription) SubscribeToDM(context context.Context, id ID, authToken string) error { - id = ID(strings.Replace(id.String(), "+", "_", 1)) - return r.subscribeWithPrefix(dmChannel, context, id, authToken) -} - -func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID, authToken string) error { - concur.Lock() - defer concur.Unlock() - if r.fayeClient == nil { - return ErrListenerNotStarted } - - token = authToken - r.fayeClient.WaitSubscribe(prefix+groupID.String(), r.channel) + c, err := fayec.NewClient(PushServer, fayec.WithOutExtension(authenticationExtension)) + if err != nil { + return err + } + r.client = c return nil } -//Connected check if connected +// SubscribeToUser to users +func (r *PushSubscription) SubscribeToUser(context context.Context, id ID) error { + return r.subscribeWithPrefix(userChannel, context, id) +} + +// SubscribeToGroup to groups for typing notification +func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID) error { + return r.subscribeWithPrefix(groupChannel, context, id) +} + +// SubscribeToDM to users +func (r *PushSubscription) SubscribeToDM(context context.Context, id ID) error { + id = ID(strings.Replace(id.String(), "+", "_", 1)) + return r.subscribeWithPrefix(dmChannel, context, id) +} + +func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID) error { + concur.Lock() + defer concur.Unlock() + if r.client == nil { + return ErrListenerNotStarted + } + + var sub *subscription.Subscription + sub, err := r.client.Subscribe(prefix + groupID.String()) + if err != nil { + panic(err) + } + + err = sub.OnMessage(func(channel string, data message.Data) { + r.LastConnected = time.Now().Unix() + dataMap := data.(map[string]interface{}) + content := dataMap["subject"] + contentType := dataMap["type"].(string) + + handler, ok := RealTimeHandlers[contentType] + if !ok { + if contentType == "ping" || + len(contentType) == 0 || + content == "" { + return + } + log.Println("Unable to handle GroupMe message type", contentType) + } + + handler(r, channel, content) + }) + + return nil +} + +// Connected check if connected func (r *PushSubscription) Connected() bool { return r.LastConnected+30 >= time.Now().Unix() } - -// Out adds the authentication token to the messages ext field -func OutMsgProc(msg PushMessage) { - if msg.Channel() == subscribeChannel { - ext := msg.Ext() - ext["access_token"] = token - ext["timestamp"] = time.Now().Unix() - } -}