Reformat real_time

This commit is contained in:
Karmanyaah Malhotra 2021-05-08 15:35:08 -04:00
parent 28b2d85660
commit aa216ed4af
2 changed files with 321 additions and 276 deletions

View File

@ -2,11 +2,8 @@ package groupme
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"strconv"
"strings"
"sync"
"time"
@ -15,42 +12,43 @@ import (
)
const (
pushServer = "https://push.groupme.com/faye"
PushServer = "https://push.groupme.com/faye"
userChannel = "/user/"
groupChannel = "/group/"
dmChannel = "/direct_message/"
subscribeChannel = "/meta/subscribe"
)
var (
ErrHandlerNotFound = errors.New("Handler not found")
ErrListenerNotStarted = errors.New("GroupMe listener not started")
)
var concur = sync.Mutex{}
var token string
type fayeLogger struct{}
func (l fayeLogger) Infof(f string, a ...interface{}) {
log.Printf("[INFO] : "+f, a...)
}
func (l fayeLogger) Errorf(f string, a ...interface{}) {
log.Printf("[ERROR] : "+f, a...)
}
func (l fayeLogger) Debugf(f string, a ...interface{}) {
log.Printf("[DEBUG] : "+f, a...)
}
func (l fayeLogger) Warnf(f string, a ...interface{}) {
log.Printf("[WARN] : "+f, a...)
}
func init() {
wray.RegisterTransports([]wray.Transport{&wray.HTTPTransport{}})
}
type HandlerAll interface {
Handler
//of self
HandlerText
HandlerLike
HandlerMembership
HandleGroupMembership
HandleGroupMetadata
//of group
HandleGroupTopic
HandleGroupAvatar
HandleGroupName
HandleGroupLikeIcon
//of group members
HandleMemberNewNickname
HandleMemberNewAvatar
HandleMembers
}
type Handler interface {
HandleError(error)
@ -59,31 +57,58 @@ type HandlerText interface {
HandleTextMessage(Message)
}
type HandlerLike interface {
HandleLike(messageID ID, favBy []string)
HandleLike(Message)
}
type HandlerMembership interface {
HandleJoin(ID)
}
type HandleGroupMetadata interface {
//Group Handlers
type HandleGroupTopic interface {
HandleGroupTopic(group ID, newTopic string)
}
type HandleGroupName interface {
HandleGroupName(group ID, newName string)
}
type HandleGroupAvatar interface {
HandleGroupAvatar(group ID, newAvatar string)
}
type HandleGroupLikeIcon interface {
HandleLikeIcon(group ID, PackID, PackIndex int, Type string)
}
type HandleGroupMembership interface {
//Group member handlers
type HandleMemberNewNickname interface {
HandleNewNickname(group ID, user ID, newName string)
HandleNewAvatarInGroup(group ID, user ID, avatarURL string)
}
type HandleMemberNewAvatar interface {
HandleNewAvatarInGroup(group ID, user ID, avatarURL string)
}
type HandleMembers interface {
//HandleNewMembers returns only partial member with id and nickname; added is false if removing
HandleMembers(group ID, members []Member, added bool)
}
type PushMessage interface {
Channel() string
Data() map[string]interface{}
Ext() map[string]interface{}
Error() string
}
type FayeClient interface {
//Listen starts a blocking listen loop
Listen()
//WaitSubscribe is a blocking/synchronous subscribe method
WaitSubscribe(channel string, msgChannel chan PushMessage)
}
//PushSubscription manages real time subscription
type PushSubscription struct {
channel chan wray.Message
fayeClient *wray.FayeClient
channel chan PushMessage
fayeClient FayeClient
handlers []Handler
LastConnected int64
}
@ -92,7 +117,7 @@ type PushSubscription struct {
func NewPushSubscription(context context.Context) PushSubscription {
r := PushSubscription{
channel: make(chan wray.Message),
channel: make(chan PushMessage),
}
return r
@ -102,26 +127,17 @@ func (r *PushSubscription) AddHandler(h Handler) {
r.handlers = append(r.handlers, h)
}
//AddFullHandler is the same as AddHandler except to ensure interface implements everything
//AddFullHandler is the same as AddHandler except it ensures the interface implements everything
func (r *PushSubscription) AddFullHandler(h HandlerAll) {
r.handlers = append(r.handlers, h)
}
type systemMessage struct {
Event struct {
Kind string `json:"type"`
Data interface{}
}
}
var RealTimeHandlers map[string]func(r *PushSubscription, channel string, data ...interface{})
var RealTimeSystemHandlers map[string]func(r *PushSubscription, channel string, id ID, rawData []byte)
//Listen connects to GroupMe. Runs in Goroutine.
func (r *PushSubscription) StartListening(context context.Context) {
r.fayeClient = wray.NewFayeClient(pushServer)
r.fayeClient.SetLogger(fayeLogger{})
r.fayeClient.AddExtension(&authExtension{})
//r.fayeClient.AddExtension(r.fayeClient) //verbose output
func (r *PushSubscription) StartListening(context context.Context, client FayeClient) {
r.fayeClient = client
go r.fayeClient.Listen()
@ -129,241 +145,50 @@ func (r *PushSubscription) StartListening(context context.Context) {
for msg := range r.channel {
r.LastConnected = time.Now().Unix()
data := msg.Data()
content := data["subject"] //TODO ok
content := data["subject"]
contentType := data["type"].(string)
channel := msg.Channel()
if strings.HasPrefix(channel, groupChannel) || strings.HasPrefix(channel, dmChannel) {
c, ok := content.(map[string]interface{})
if !ok {
fmt.Println(content, data, "err")
handler, ok := RealTimeHandlers[contentType]
if !ok {
if contentType == "ping" ||
len(contentType) == 0 ||
content == nil {
continue
}
e, ok := c["line"]
if !ok {
fmt.Println(content, data, "err")
continue
}
d, _ := json.Marshal(e)
r.chatEvent(contentType, d)
continue
}
switch contentType {
case "line.create":
b, _ := json.Marshal(content)
out := Message{}
_ = json.Unmarshal(b, &out)
if out.UserID.String() == "system" {
event := systemMessage{}
err := json.Unmarshal(b, &event)
if err != nil {
fmt.Println(err)
}
r.systemEvent(out.GroupID, event)
break
}
for _, h := range r.handlers {
if h, ok := h.(HandlerText); ok {
h.HandleTextMessage(out)
}
}
case "like.create":
//should be an associated chatEvent
break
case "membership.create":
c, _ := content.(map[string]interface{})
id, _ := c["id"].(string)
for _, h := range r.handlers {
if h, ok := h.(HandlerMembership); ok {
h.HandleJoin(ID(id))
}
}
case "ping":
break
default: //TODO: see if any other types are returned
if len(contentType) == 0 || content == nil {
break
}
log.Println(contentType)
b, _ := json.Marshal(content)
log.Fatalln(string(b))
log.Println("Unable to handle GroupMe message type", contentType)
}
handler(r, channel, content)
}
}()
}
func (r *PushSubscription) chatEvent(contentType string, b []byte) {
switch contentType {
case "favorite":
data := Message{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandlerLike); ok {
h.HandleLike(data.ID, data.FavoritedBy)
}
}
default: //TODO: see if any other types are returned
log.Println(contentType)
log.Fatalln(string(b))
}
}
func (r *PushSubscription) systemEvent(groupID ID, msg systemMessage) {
kind := msg.Event.Kind
b, _ := json.Marshal(msg.Event.Data)
switch kind {
case "membership.nickname_changed":
data := struct {
Name string
User struct {
ID int
}
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMembership); ok {
h.HandleNewNickname(groupID, ID(strconv.Itoa(data.User.ID)), data.Name)
}
}
case "membership.avatar_changed":
data := struct {
AvatarURL string `json:"avatar_url"`
User struct {
ID int
}
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMembership); ok {
h.HandleNewAvatarInGroup(groupID, ID(strconv.Itoa(data.User.ID)), data.AvatarURL)
}
}
case "membership.announce.added":
data := struct {
Added []Member `json:"added_users"`
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMembership); ok {
h.HandleMembers(groupID, data.Added, true)
}
}
case "membership.notifications.removed":
data := struct {
Added Member `json:"removed_user"`
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMembership); ok {
h.HandleMembers(groupID, []Member{data.Added}, false)
}
}
case "group.role_change_admin":
//TODO
break
case "group.name_change":
data := struct {
Name string
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMetadata); ok {
h.HandleGroupName(groupID, data.Name)
}
}
case "group.topic_change":
data := struct {
Topic string
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMetadata); ok {
h.HandleGroupTopic(groupID, data.Topic)
}
}
case "group.avatar_change":
data := struct {
AvatarURL string `json:"avatar_url"`
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMetadata); ok {
h.HandleGroupAvatar(groupID, data.AvatarURL)
}
}
case "group.like_icon_set":
data := struct {
LikeIcon struct {
PackID int `json:"pack_id"`
PackIndex int `json:"pack_index"`
Type string
} `json:"like_icon"`
}{}
_ = json.Unmarshal(b, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMetadata); ok {
h.HandleLikeIcon(groupID, data.LikeIcon.PackID, data.LikeIcon.PackIndex, data.LikeIcon.Type)
}
}
case "group.like_icon_removed":
for _, h := range r.handlers {
if h, ok := h.(HandleGroupMetadata); ok {
h.HandleLikeIcon(groupID, 0, 0, "")
}
}
case "group.type_change", "group.required_approval_enabled", "group.required_approval_disabled":
//TODO: group joining
break
case "group.shared", "group.unshared":
//TODO
break
default:
log.Println(kind)
log.Fatalln(string(b))
}
}
//SubscribeToUser to users
func (r *PushSubscription) SubscribeToUser(context context.Context, userID ID, authToken string) error {
concur.Lock()
defer concur.Unlock()
if r.fayeClient == nil {
return errors.New("Not Listening") //TODO: Proper error
}
token = authToken
r.fayeClient.WaitSubscribe(userChannel+userID.String(), r.channel)
return nil
func (r *PushSubscription) SubscribeToUser(context context.Context, id ID, authToken string) error {
return r.subscribeWithPrefix(userChannel, context, id, authToken)
}
//SubscribeToGroup to groups for typing notification
func (r *PushSubscription) SubscribeToGroup(context context.Context, groupID ID, authToken string) error {
func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID, authToken string) error {
return r.subscribeWithPrefix(groupChannel, context, id, authToken)
}
//SubscribeToDM to users
func (r *PushSubscription) SubscribeToDM(context context.Context, id ID, authToken string) error {
id = ID(strings.Replace(id.String(), "+", "_", 1))
return r.subscribeWithPrefix(dmChannel, context, id, authToken)
}
func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID, authToken string) error {
concur.Lock()
defer concur.Unlock()
if r.fayeClient == nil {
return errors.New("Not Listening") //TODO: Proper error
return ErrListenerNotStarted
}
token = authToken
r.fayeClient.WaitSubscribe(groupChannel+groupID.String(), r.channel)
r.fayeClient.WaitSubscribe(prefix+groupID.String(), r.channel)
return nil
}
@ -373,27 +198,8 @@ func (r *PushSubscription) Connected() bool {
return r.LastConnected+30 >= time.Now().Unix()
}
// Stop listening to GroupMe after completing all other actions scheduled first
func (r *PushSubscription) Stop(context context.Context) {
concur.Lock()
defer concur.Unlock()
//TODO: stop listening
}
type authExtension struct {
}
// In does nothing in this extension, but is needed to satisy the interface
func (e *authExtension) In(msg wray.Message) {
println(msg.Channel())
if len(msg.Error()) > 0 {
log.Fatalln(msg.Error())
}
}
// Out adds the authentication token to the messages ext field
func (e *authExtension) Out(msg wray.Message) {
func OutMsgProc(msg PushMessage) {
if msg.Channel() == subscribeChannel {
ext := msg.Ext()
ext["access_token"] = token

239
real_time_handler.go Normal file
View File

@ -0,0 +1,239 @@
package groupme
import (
"encoding/json"
"fmt"
"log"
"strconv"
)
func init() {
RealTimeHandlers = make(map[string]func(r *PushSubscription, channel string, data ...interface{}))
//Base Handlers on user channel
RealTimeHandlers["direct_message.create"] = func(r *PushSubscription, channel string, data ...interface{}) {
b, _ := json.Marshal(data[0])
out := Message{}
_ = json.Unmarshal(b, &out)
//maybe something with API versioning
out.ConversationID = out.ChatID
if out.UserID.String() == "system" {
event := struct {
Event struct {
Kind string `json:"type"`
Data interface{}
}
}{}
err := json.Unmarshal(b, &event)
if err != nil {
fmt.Println(err)
}
rawData, _ := json.Marshal(event.Event.Data)
handler, ok := RealTimeSystemHandlers[event.Event.Kind]
if !ok {
log.Println("Unable to handle system message of type", event.Event.Kind)
return
}
id := out.GroupID
if len(id) == 0 {
id = out.ConversationID
}
handler(r, channel, id, rawData)
return
}
for _, h := range r.handlers {
if h, ok := h.(HandlerText); ok {
h.HandleTextMessage(out)
}
}
}
RealTimeHandlers["line.create"] = RealTimeHandlers["direct_message.create"]
RealTimeHandlers["like.create"] = func(r *PushSubscription, channel string, data ...interface{}) { //should be an associated chatEvent
}
RealTimeHandlers["membership.create"] = func(r *PushSubscription, channel string, data ...interface{}) {
c, _ := data[0].(map[string]interface{})
id, _ := c["id"].(string)
for _, h := range r.handlers {
if h, ok := h.(HandlerMembership); ok {
h.HandleJoin(ID(id))
}
}
}
//following are for each chat
RealTimeHandlers["favorite"] = func(r *PushSubscription, channel string, data ...interface{}) {
c, ok := data[0].(map[string]interface{})
if !ok {
fmt.Println(data, "err")
return
}
e, ok := c["line"]
if !ok {
fmt.Println(data, "err")
return
}
d, _ := json.Marshal(e)
msg := Message{}
_ = json.Unmarshal(d, &msg)
for _, h := range r.handlers {
if h, ok := h.(HandlerLike); ok {
h.HandleLike(msg)
}
}
}
//following are for messages from system (administrative/settings changes)
RealTimeSystemHandlers = make(map[string]func(r *PushSubscription, channel string, id ID, rawData []byte))
RealTimeSystemHandlers["membership.nickname_changed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
thing := struct {
Name string
User struct {
ID int
}
}{}
_ = json.Unmarshal(rawData, &thing)
for _, h := range r.handlers {
if h, ok := h.(HandleMemberNewNickname); ok {
h.HandleNewNickname(id, ID(strconv.Itoa(thing.User.ID)), thing.Name)
}
}
}
RealTimeSystemHandlers["membership.avatar_changed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
content := struct {
AvatarURL string `json:"avatar_url"`
User struct {
ID int
}
}{}
_ = json.Unmarshal(rawData, &content)
for _, h := range r.handlers {
if h, ok := h.(HandleMemberNewAvatar); ok {
h.HandleNewAvatarInGroup(id, ID(strconv.Itoa(content.User.ID)), content.AvatarURL)
}
}
}
RealTimeSystemHandlers["membership.announce.added"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
Added []Member `json:"added_users"`
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleMembers); ok {
h.HandleMembers(id, data.Added, true)
}
}
}
RealTimeSystemHandlers["membership.notifications.removed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
Added Member `json:"removed_user"`
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleMembers); ok {
h.HandleMembers(id, []Member{data.Added}, false)
}
}
}
RealTimeSystemHandlers["membership.name_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
Name string
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupName); ok {
h.HandleGroupName(id, data.Name)
}
}
}
RealTimeSystemHandlers["group.name_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
Name string
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupName); ok {
h.HandleGroupName(id, data.Name)
}
}
}
RealTimeSystemHandlers["group.topic_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
Topic string
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupTopic); ok {
h.HandleGroupTopic(id, data.Topic)
}
}
}
RealTimeSystemHandlers["group.avatar_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
AvatarURL string `json:"avatar_url"`
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupAvatar); ok {
h.HandleGroupAvatar(id, data.AvatarURL)
}
}
}
RealTimeSystemHandlers["group.like_icon_set"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
data := struct {
LikeIcon struct {
PackID int `json:"pack_id"`
PackIndex int `json:"pack_index"`
Type string
} `json:"like_icon"`
}{}
_ = json.Unmarshal(rawData, &data)
for _, h := range r.handlers {
if h, ok := h.(HandleGroupLikeIcon); ok {
h.HandleLikeIcon(id, data.LikeIcon.PackID, data.LikeIcon.PackIndex, data.LikeIcon.Type)
}
}
}
RealTimeSystemHandlers["group.like_icon_removed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) {
for _, h := range r.handlers {
if h, ok := h.(HandleGroupLikeIcon); ok {
h.HandleLikeIcon(id, 0, 0, "")
}
}
}
}