Usinc fayec client instead of wray

This commit is contained in:
Brandon Watson 2023-09-18 21:34:54 -05:00
parent 8f23e04eea
commit 3e9851b2c0
4 changed files with 101 additions and 154 deletions

View File

@ -5,8 +5,6 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/karmanyaahm/wray"
"github.com/beeper/groupme-lib" "github.com/beeper/groupme-lib"
) )
@ -15,73 +13,29 @@ import (
var authorizationToken = "ABCD" var authorizationToken = "ABCD"
// This adapts your faye library to an interface compatible with this library
type FayeClient struct {
*wray.FayeClient
}
func (fc FayeClient) WaitSubscribe(channel string, msgChannel chan groupme.PushMessage) {
c_new := make(chan wray.Message)
fc.FayeClient.WaitSubscribe(channel, c_new)
//converting between types because channels don't support interfaces well
go func() {
for i := range c_new {
msgChannel <- i
}
}()
}
// for authentication, specific implementation will vary based on faye library
type AuthExt struct{}
func (a *AuthExt) In(wray.Message) {}
func (a *AuthExt) Out(m wray.Message) {
groupme.OutMsgProc(m)
}
// specific to faye library
type fayeLogger struct{}
func (l fayeLogger) Infof(f string, a ...interface{}) {
log.Printf("[INFO] : "+f, a...)
}
func (l fayeLogger) Errorf(f string, a ...interface{}) {
log.Printf("[ERROR] : "+f, a...)
}
func (l fayeLogger) Debugf(f string, a ...interface{}) {
log.Printf("[DEBUG] : "+f, a...)
}
func (l fayeLogger) Warnf(f string, a ...interface{}) {
log.Printf("[WARN] : "+f, a...)
}
// A short program that subscribes to 2 groups and 2 direct chats // A short program that subscribes to 2 groups and 2 direct chats
// and prints out all recognized events in those // and prints out all recognized events in those
func main() { func main() {
//Create and initialize fayeclient
fc := FayeClient{wray.NewFayeClient(groupme.PushServer)}
fc.SetLogger(fayeLogger{})
fc.AddExtension(&AuthExt{})
//for additional logging uncomment the following line
//fc.AddExtension(fc.FayeClient)
//create push subscription and start listening //create push subscription and start listening
p := groupme.NewPushSubscription(context.Background()) p := groupme.NewPushSubscription(context.Background())
go p.StartListening(context.TODO(), fc) err := p.Connect(context.TODO(), authorizationToken)
if err != nil {
return
}
// Create a new client with your auth token // Create a new client with your auth token
client := groupme.NewClient(authorizationToken) client := groupme.NewClient(authorizationToken)
User, _ := client.MyUser(context.Background()) User, _ := client.MyUser(context.Background())
//Subscribe to get messages and events for the specific user
err := p.SubscribeToUser(context.Background(), User.ID, authorizationToken)
if err != nil {
log.Fatal(err)
}
//handles (in this case prints) all messages //handles (in this case prints) all messages
p.AddFullHandler(Handler{User: User}) p.AddFullHandler(Handler{User: User})
//Subscribe to get messages and events for the specific user
err = p.SubscribeToUser(context.Background(), User.ID)
if err != nil {
log.Fatal(err)
}
// Get the groups your user is part of // Get the groups your user is part of
groups, err := client.IndexGroups( groups, err := client.IndexGroups(
context.Background(), context.Background(),
@ -97,7 +51,7 @@ func main() {
} }
//Subscribe to those groups //Subscribe to those groups
for _, j := range groups { for _, j := range groups {
err = p.SubscribeToGroup(context.TODO(), j.ID, authorizationToken) err = p.SubscribeToGroup(context.TODO(), j.ID)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -111,7 +65,7 @@ func main() {
}) })
//subscribe to all those chats //subscribe to all those chats
for _, j := range chats { for _, j := range chats {
err = p.SubscribeToDM(context.TODO(), j.LastMessage.ConversationID, authorizationToken) err = p.SubscribeToDM(context.TODO(), j.LastMessage.ConversationID)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

6
go.mod
View File

@ -1,10 +1,12 @@
module github.com/beeper/groupme-lib module github.com/beeper/groupme-lib
go 1.15 go 1.21.0
require ( require (
gitea.watsonlabs.net/watsonb8/fayec v0.0.0-20230919020138-8f0db7048755
github.com/google/uuid v1.2.0 github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
) )
require github.com/gorilla/websocket v1.5.0 // indirect

5
go.sum
View File

@ -1,3 +1,5 @@
gitea.watsonlabs.net/watsonb8/fayec v0.0.0-20230919020138-8f0db7048755 h1:FEhNSjSNvZ+nVg5Z3ds6X8ys3qjM+mmyLTSqKhCUHuQ=
gitea.watsonlabs.net/watsonb8/fayec v0.0.0-20230919020138-8f0db7048755/go.mod h1:gv8CWMq6dFJQhH30u8bO3u4k2irKlclZktLNYDebQ/0=
github.com/autogrowsystems/wray v0.0.0-20160519030252-f36984f6648c/go.mod h1:druJ8QMeBCUmwJ7ZSFowx77dWxEWF3SYlQlsqZaLZQg= github.com/autogrowsystems/wray v0.0.0-20160519030252-f36984f6648c/go.mod h1:druJ8QMeBCUmwJ7ZSFowx77dWxEWF3SYlQlsqZaLZQg=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -7,10 +9,13 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14 h1:NrATjZKvkY+ojL8FXTWa3fQ+wihFrAxLNE6T+wOkIcY= github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14 h1:NrATjZKvkY+ojL8FXTWa3fQ+wihFrAxLNE6T+wOkIcY=
github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14/go.mod h1:ysD86MIEevmAkdfdg5s6Qt3I07RN6fvMAyna7jCGG2o= github.com/karmanyaahm/wray v0.0.0-20210303233435-756d58657c14/go.mod h1:ysD86MIEevmAkdfdg5s6Qt3I07RN6fvMAyna7jCGG2o=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=

View File

@ -3,20 +3,20 @@ package groupme
import ( import (
"context" "context"
"errors" "errors"
"gitea.watsonlabs.net/watsonb8/fayec"
"gitea.watsonlabs.net/watsonb8/fayec/message"
"gitea.watsonlabs.net/watsonb8/fayec/subscription"
"log" "log"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/karmanyaahm/wray"
) )
const ( const (
PushServer = "https://push.groupme.com/faye" PushServer = "wss://push.groupme.com/faye"
userChannel = "/user/" userChannel = "/user/"
groupChannel = "/group/" groupChannel = "/group/"
dmChannel = "/direct_message/" dmChannel = "/direct_message/"
subscribeChannel = "/meta/subscribe"
) )
var ( var (
@ -27,10 +27,6 @@ var (
var concur = sync.Mutex{} var concur = sync.Mutex{}
var token string var token string
func init() {
wray.RegisterTransports([]wray.Transport{&wray.HTTPTransport{}})
}
type HandlerAll interface { type HandlerAll interface {
Handler Handler
@ -63,7 +59,7 @@ type HandlerMembership interface {
HandleJoin(ID) HandleJoin(ID)
} }
//Group Handlers // Group Handlers
type HandleGroupTopic interface { type HandleGroupTopic interface {
HandleGroupTopic(group ID, newTopic string) HandleGroupTopic(group ID, newTopic string)
} }
@ -78,7 +74,7 @@ type HandleGroupLikeIcon interface {
HandleLikeIcon(group ID, PackID, PackIndex int, Type string) HandleLikeIcon(group ID, PackID, PackIndex int, Type string)
} }
//Group member handlers // Group member handlers
type HandleMemberNewNickname interface { type HandleMemberNewNickname interface {
HandleNewNickname(group ID, user ID, newName string) HandleNewNickname(group ID, user ID, newName string)
} }
@ -91,33 +87,19 @@ type HandleMembers interface {
HandleMembers(group ID, members []Member, added bool) HandleMembers(group ID, members []Member, added bool)
} }
type PushMessage interface { // PushSubscription manages real time subscription
Channel() string
Data() map[string]interface{}
Ext() map[string]interface{}
Error() string
}
type FayeClient interface {
//Listen starts a blocking listen loop
Listen()
//WaitSubscribe is a blocking/synchronous subscribe method
WaitSubscribe(channel string, msgChannel chan PushMessage)
}
//PushSubscription manages real time subscription
type PushSubscription struct { type PushSubscription struct {
channel chan PushMessage channel chan message.Data
fayeClient FayeClient client *fayec.Client
handlers []Handler handlers []Handler
LastConnected int64 LastConnected int64
} }
//NewPushSubscription creates and returns a push subscription object // NewPushSubscription creates and returns a push subscription object
func NewPushSubscription(context context.Context) PushSubscription { func NewPushSubscription(context context.Context) PushSubscription {
r := PushSubscription{ r := PushSubscription{
channel: make(chan PushMessage), channel: make(chan message.Data),
} }
return r return r
@ -127,7 +109,7 @@ func (r *PushSubscription) AddHandler(h Handler) {
r.handlers = append(r.handlers, h) r.handlers = append(r.handlers, h)
} }
//AddFullHandler is the same as AddHandler except it ensures the interface implements everything // AddFullHandler is the same as AddHandler except it ensures the interface implements everything
func (r *PushSubscription) AddFullHandler(h HandlerAll) { func (r *PushSubscription) AddFullHandler(h HandlerAll) {
r.handlers = append(r.handlers, h) r.handlers = append(r.handlers, h)
} }
@ -135,74 +117,78 @@ func (r *PushSubscription) AddFullHandler(h HandlerAll) {
var RealTimeHandlers map[string]func(r *PushSubscription, channel string, data ...interface{}) var RealTimeHandlers map[string]func(r *PushSubscription, channel string, data ...interface{})
var RealTimeSystemHandlers map[string]func(r *PushSubscription, channel string, id ID, rawData []byte) var RealTimeSystemHandlers map[string]func(r *PushSubscription, channel string, id ID, rawData []byte)
//Listen connects to GroupMe. Runs in Goroutine. // Listen connects to GroupMe. Runs in Goroutine.
func (r *PushSubscription) StartListening(context context.Context, client FayeClient) { func (r *PushSubscription) Connect(context context.Context, authorizationToken string) error {
r.fayeClient = client token = authorizationToken
var authenticationExtension message.Extension = func(message *message.Message) {
if message.Channel == "/meta/subscribe" {
message.Ext = map[string]string{
"access_token": authorizationToken,
"timestamp": string(time.Now().Unix()),
}
}
}
c, err := fayec.NewClient(PushServer, fayec.WithOutExtension(authenticationExtension))
if err != nil {
return err
}
r.client = c
go r.fayeClient.Listen() return nil
}
go func() { // SubscribeToUser to users
for msg := range r.channel { func (r *PushSubscription) SubscribeToUser(context context.Context, id ID) error {
return r.subscribeWithPrefix(userChannel, context, id)
}
// SubscribeToGroup to groups for typing notification
func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID) error {
return r.subscribeWithPrefix(groupChannel, context, id)
}
// SubscribeToDM to users
func (r *PushSubscription) SubscribeToDM(context context.Context, id ID) error {
id = ID(strings.Replace(id.String(), "+", "_", 1))
return r.subscribeWithPrefix(dmChannel, context, id)
}
func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID) error {
concur.Lock()
defer concur.Unlock()
if r.client == nil {
return ErrListenerNotStarted
}
var sub *subscription.Subscription
sub, err := r.client.Subscribe(prefix + groupID.String())
if err != nil {
panic(err)
}
err = sub.OnMessage(func(channel string, data message.Data) {
r.LastConnected = time.Now().Unix() r.LastConnected = time.Now().Unix()
data := msg.Data() dataMap := data.(map[string]interface{})
content := data["subject"] content := dataMap["subject"]
contentType := data["type"].(string) contentType := dataMap["type"].(string)
channel := msg.Channel()
handler, ok := RealTimeHandlers[contentType] handler, ok := RealTimeHandlers[contentType]
if !ok { if !ok {
if contentType == "ping" || if contentType == "ping" ||
len(contentType) == 0 || len(contentType) == 0 ||
content == nil { content == "" {
continue return
} }
log.Println("Unable to handle GroupMe message type", contentType) log.Println("Unable to handle GroupMe message type", contentType)
} }
handler(r, channel, content) handler(r, channel, content)
} })
}()
}
//SubscribeToUser to users
func (r *PushSubscription) SubscribeToUser(context context.Context, id ID, authToken string) error {
return r.subscribeWithPrefix(userChannel, context, id, authToken)
}
//SubscribeToGroup to groups for typing notification
func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID, authToken string) error {
return r.subscribeWithPrefix(groupChannel, context, id, authToken)
}
//SubscribeToDM to users
func (r *PushSubscription) SubscribeToDM(context context.Context, id ID, authToken string) error {
id = ID(strings.Replace(id.String(), "+", "_", 1))
return r.subscribeWithPrefix(dmChannel, context, id, authToken)
}
func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID, authToken string) error {
concur.Lock()
defer concur.Unlock()
if r.fayeClient == nil {
return ErrListenerNotStarted
}
token = authToken
r.fayeClient.WaitSubscribe(prefix+groupID.String(), r.channel)
return nil return nil
} }
//Connected check if connected // Connected check if connected
func (r *PushSubscription) Connected() bool { func (r *PushSubscription) Connected() bool {
return r.LastConnected+30 >= time.Now().Unix() return r.LastConnected+30 >= time.Now().Unix()
} }
// Out adds the authentication token to the messages ext field
func OutMsgProc(msg PushMessage) {
if msg.Channel() == subscribeChannel {
ext := msg.Ext()
ext["access_token"] = token
ext["timestamp"] = time.Now().Unix()
}
}