groupme-lib/real_time.go

209 lines
5.0 KiB
Go
Raw Permalink Normal View History

2021-01-29 04:47:48 +00:00
package groupme
import (
"context"
"errors"
"log"
"strings"
2021-01-29 04:47:48 +00:00
"sync"
"time"
"github.com/karmanyaahm/wray"
)
const (
2021-05-08 19:35:08 +00:00
PushServer = "https://push.groupme.com/faye"
2021-01-29 04:47:48 +00:00
userChannel = "/user/"
groupChannel = "/group/"
dmChannel = "/direct_message/"
2021-01-29 04:47:48 +00:00
subscribeChannel = "/meta/subscribe"
)
2021-05-08 19:35:08 +00:00
var (
ErrHandlerNotFound = errors.New("Handler not found")
ErrListenerNotStarted = errors.New("GroupMe listener not started")
)
2021-01-29 04:47:48 +00:00
var concur = sync.Mutex{}
var token string
func init() {
wray.RegisterTransports([]wray.Transport{&wray.HTTPTransport{}})
}
type HandlerAll interface {
Handler
2021-05-08 19:35:08 +00:00
//of self
HandlerText
HandlerLike
HandlerMembership
2021-05-08 19:35:08 +00:00
//of group
HandleGroupTopic
HandleGroupAvatar
HandleGroupName
HandleGroupLikeIcon
//of group members
HandleMemberNewNickname
HandleMemberNewAvatar
HandleMembers
2021-01-29 04:47:48 +00:00
}
2021-02-22 03:53:43 +00:00
type Handler interface {
HandleError(error)
}
type HandlerText interface {
HandleTextMessage(Message)
}
type HandlerLike interface {
2021-05-08 19:35:08 +00:00
HandleLike(Message)
2021-02-22 03:53:43 +00:00
}
2021-03-01 15:06:01 +00:00
type HandlerMembership interface {
HandleJoin(ID)
}
2021-02-22 03:53:43 +00:00
2021-05-08 19:35:08 +00:00
//Group Handlers
type HandleGroupTopic interface {
HandleGroupTopic(group ID, newTopic string)
2021-05-08 19:35:08 +00:00
}
type HandleGroupName interface {
HandleGroupName(group ID, newName string)
2021-05-08 19:35:08 +00:00
}
type HandleGroupAvatar interface {
HandleGroupAvatar(group ID, newAvatar string)
2021-05-08 19:35:08 +00:00
}
type HandleGroupLikeIcon interface {
HandleLikeIcon(group ID, PackID, PackIndex int, Type string)
}
2021-05-08 19:35:08 +00:00
//Group member handlers
type HandleMemberNewNickname interface {
HandleNewNickname(group ID, user ID, newName string)
2021-05-08 19:35:08 +00:00
}
2021-03-05 05:43:05 +00:00
2021-05-08 19:35:08 +00:00
type HandleMemberNewAvatar interface {
HandleNewAvatarInGroup(group ID, user ID, avatarURL string)
}
type HandleMembers interface {
2021-03-05 05:43:05 +00:00
//HandleNewMembers returns only partial member with id and nickname; added is false if removing
HandleMembers(group ID, members []Member, added bool)
}
2021-05-08 19:35:08 +00:00
type PushMessage interface {
Channel() string
Data() map[string]interface{}
Ext() map[string]interface{}
Error() string
}
type FayeClient interface {
//Listen starts a blocking listen loop
Listen()
//WaitSubscribe is a blocking/synchronous subscribe method
WaitSubscribe(channel string, msgChannel chan PushMessage)
}
2021-01-29 04:47:48 +00:00
//PushSubscription manages real time subscription
type PushSubscription struct {
2021-05-08 19:35:08 +00:00
channel chan PushMessage
fayeClient FayeClient
handlers []Handler
LastConnected int64
2021-01-29 04:47:48 +00:00
}
//NewPushSubscription creates and returns a push subscription object
func NewPushSubscription(context context.Context) PushSubscription {
r := PushSubscription{
2021-05-08 19:35:08 +00:00
channel: make(chan PushMessage),
2021-01-29 04:47:48 +00:00
}
return r
}
2021-02-22 03:53:43 +00:00
func (r *PushSubscription) AddHandler(h Handler) {
r.handlers = append(r.handlers, h)
}
2021-05-08 19:35:08 +00:00
//AddFullHandler is the same as AddHandler except it ensures the interface implements everything
func (r *PushSubscription) AddFullHandler(h HandlerAll) {
r.handlers = append(r.handlers, h)
}
2021-05-08 19:35:08 +00:00
var RealTimeHandlers map[string]func(r *PushSubscription, channel string, data ...interface{})
var RealTimeSystemHandlers map[string]func(r *PushSubscription, channel string, id ID, rawData []byte)
2021-01-29 04:47:48 +00:00
//Listen connects to GroupMe. Runs in Goroutine.
2021-05-08 19:35:08 +00:00
func (r *PushSubscription) StartListening(context context.Context, client FayeClient) {
r.fayeClient = client
2021-01-29 04:47:48 +00:00
go r.fayeClient.Listen()
2021-02-22 03:53:43 +00:00
go func() {
for msg := range r.channel {
r.LastConnected = time.Now().Unix()
2021-02-22 03:53:43 +00:00
data := msg.Data()
2021-05-08 19:35:08 +00:00
content := data["subject"]
2021-02-22 03:53:43 +00:00
contentType := data["type"].(string)
channel := msg.Channel()
2021-05-08 19:35:08 +00:00
handler, ok := RealTimeHandlers[contentType]
if !ok {
if contentType == "ping" ||
len(contentType) == 0 ||
content == nil {
2021-03-07 16:56:48 +00:00
continue
}
2021-05-08 19:35:08 +00:00
log.Println("Unable to handle GroupMe message type", contentType)
2021-01-29 04:47:48 +00:00
}
2021-05-08 19:35:08 +00:00
handler(r, channel, content)
2021-01-29 04:47:48 +00:00
}
2021-02-22 03:53:43 +00:00
}()
2021-01-29 04:47:48 +00:00
}
2021-05-08 19:35:08 +00:00
//SubscribeToUser to users
func (r *PushSubscription) SubscribeToUser(context context.Context, id ID, authToken string) error {
return r.subscribeWithPrefix(userChannel, context, id, authToken)
}
2021-05-08 19:35:08 +00:00
//SubscribeToGroup to groups for typing notification
func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID, authToken string) error {
return r.subscribeWithPrefix(groupChannel, context, id, authToken)
}
2021-05-08 19:35:08 +00:00
//SubscribeToDM to users
func (r *PushSubscription) SubscribeToDM(context context.Context, id ID, authToken string) error {
id = ID(strings.Replace(id.String(), "+", "_", 1))
return r.subscribeWithPrefix(dmChannel, context, id, authToken)
2021-01-29 04:47:48 +00:00
}
2021-05-08 19:35:08 +00:00
func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID, authToken string) error {
2021-01-29 04:47:48 +00:00
concur.Lock()
defer concur.Unlock()
if r.fayeClient == nil {
2021-05-08 19:35:08 +00:00
return ErrListenerNotStarted
2021-01-29 04:47:48 +00:00
}
token = authToken
2021-05-08 19:35:08 +00:00
r.fayeClient.WaitSubscribe(prefix+groupID.String(), r.channel)
2021-01-29 04:47:48 +00:00
return nil
}
//Connected check if connected
func (r *PushSubscription) Connected() bool {
return r.LastConnected+30 >= time.Now().Unix()
}
2021-01-29 04:47:48 +00:00
// Out adds the authentication token to the messages ext field
2021-05-08 19:35:08 +00:00
func OutMsgProc(msg PushMessage) {
2021-01-29 04:47:48 +00:00
if msg.Channel() == subscribeChannel {
ext := msg.Ext()
ext["access_token"] = token
ext["timestamp"] = time.Now().Unix()
}
}