diff --git a/real_time.go b/real_time.go index 37ef60d..0f59ca5 100644 --- a/real_time.go +++ b/real_time.go @@ -2,11 +2,8 @@ package groupme import ( "context" - "encoding/json" "errors" - "fmt" "log" - "strconv" "strings" "sync" "time" @@ -15,42 +12,43 @@ import ( ) const ( - pushServer = "https://push.groupme.com/faye" + PushServer = "https://push.groupme.com/faye" userChannel = "/user/" groupChannel = "/group/" dmChannel = "/direct_message/" subscribeChannel = "/meta/subscribe" ) +var ( + ErrHandlerNotFound = errors.New("Handler not found") + ErrListenerNotStarted = errors.New("GroupMe listener not started") +) + var concur = sync.Mutex{} var token string -type fayeLogger struct{} - -func (l fayeLogger) Infof(f string, a ...interface{}) { - log.Printf("[INFO] : "+f, a...) -} -func (l fayeLogger) Errorf(f string, a ...interface{}) { - log.Printf("[ERROR] : "+f, a...) -} -func (l fayeLogger) Debugf(f string, a ...interface{}) { - log.Printf("[DEBUG] : "+f, a...) -} -func (l fayeLogger) Warnf(f string, a ...interface{}) { - log.Printf("[WARN] : "+f, a...) -} - func init() { wray.RegisterTransports([]wray.Transport{&wray.HTTPTransport{}}) } type HandlerAll interface { Handler + + //of self HandlerText HandlerLike HandlerMembership - HandleGroupMembership - HandleGroupMetadata + + //of group + HandleGroupTopic + HandleGroupAvatar + HandleGroupName + HandleGroupLikeIcon + + //of group members + HandleMemberNewNickname + HandleMemberNewAvatar + HandleMembers } type Handler interface { HandleError(error) @@ -59,31 +57,58 @@ type HandlerText interface { HandleTextMessage(Message) } type HandlerLike interface { - HandleLike(messageID ID, favBy []string) + HandleLike(Message) } type HandlerMembership interface { HandleJoin(ID) } -type HandleGroupMetadata interface { +//Group Handlers +type HandleGroupTopic interface { HandleGroupTopic(group ID, newTopic string) +} + +type HandleGroupName interface { HandleGroupName(group ID, newName string) +} +type HandleGroupAvatar interface { HandleGroupAvatar(group ID, newAvatar string) +} +type HandleGroupLikeIcon interface { HandleLikeIcon(group ID, PackID, PackIndex int, Type string) } -type HandleGroupMembership interface { +//Group member handlers +type HandleMemberNewNickname interface { HandleNewNickname(group ID, user ID, newName string) - HandleNewAvatarInGroup(group ID, user ID, avatarURL string) +} +type HandleMemberNewAvatar interface { + HandleNewAvatarInGroup(group ID, user ID, avatarURL string) +} +type HandleMembers interface { //HandleNewMembers returns only partial member with id and nickname; added is false if removing HandleMembers(group ID, members []Member, added bool) } +type PushMessage interface { + Channel() string + Data() map[string]interface{} + Ext() map[string]interface{} + Error() string +} + +type FayeClient interface { + //Listen starts a blocking listen loop + Listen() + //WaitSubscribe is a blocking/synchronous subscribe method + WaitSubscribe(channel string, msgChannel chan PushMessage) +} + //PushSubscription manages real time subscription type PushSubscription struct { - channel chan wray.Message - fayeClient *wray.FayeClient + channel chan PushMessage + fayeClient FayeClient handlers []Handler LastConnected int64 } @@ -92,7 +117,7 @@ type PushSubscription struct { func NewPushSubscription(context context.Context) PushSubscription { r := PushSubscription{ - channel: make(chan wray.Message), + channel: make(chan PushMessage), } return r @@ -102,26 +127,17 @@ func (r *PushSubscription) AddHandler(h Handler) { r.handlers = append(r.handlers, h) } -//AddFullHandler is the same as AddHandler except to ensure interface implements everything +//AddFullHandler is the same as AddHandler except it ensures the interface implements everything func (r *PushSubscription) AddFullHandler(h HandlerAll) { r.handlers = append(r.handlers, h) } -type systemMessage struct { - Event struct { - Kind string `json:"type"` - Data interface{} - } -} +var RealTimeHandlers map[string]func(r *PushSubscription, channel string, data ...interface{}) +var RealTimeSystemHandlers map[string]func(r *PushSubscription, channel string, id ID, rawData []byte) //Listen connects to GroupMe. Runs in Goroutine. -func (r *PushSubscription) StartListening(context context.Context) { - r.fayeClient = wray.NewFayeClient(pushServer) - - r.fayeClient.SetLogger(fayeLogger{}) - - r.fayeClient.AddExtension(&authExtension{}) - //r.fayeClient.AddExtension(r.fayeClient) //verbose output +func (r *PushSubscription) StartListening(context context.Context, client FayeClient) { + r.fayeClient = client go r.fayeClient.Listen() @@ -129,241 +145,50 @@ func (r *PushSubscription) StartListening(context context.Context) { for msg := range r.channel { r.LastConnected = time.Now().Unix() data := msg.Data() - content := data["subject"] //TODO ok + content := data["subject"] contentType := data["type"].(string) channel := msg.Channel() - if strings.HasPrefix(channel, groupChannel) || strings.HasPrefix(channel, dmChannel) { - c, ok := content.(map[string]interface{}) - if !ok { - fmt.Println(content, data, "err") + handler, ok := RealTimeHandlers[contentType] + if !ok { + if contentType == "ping" || + len(contentType) == 0 || + content == nil { continue } - e, ok := c["line"] - if !ok { - fmt.Println(content, data, "err") - continue - } - d, _ := json.Marshal(e) - r.chatEvent(contentType, d) - continue - } - - switch contentType { - case "line.create": - b, _ := json.Marshal(content) - out := Message{} - _ = json.Unmarshal(b, &out) - - if out.UserID.String() == "system" { - event := systemMessage{} - err := json.Unmarshal(b, &event) - if err != nil { - fmt.Println(err) - } - - r.systemEvent(out.GroupID, event) - break - } - - for _, h := range r.handlers { - if h, ok := h.(HandlerText); ok { - h.HandleTextMessage(out) - } - } - - case "like.create": - //should be an associated chatEvent - break - case "membership.create": - c, _ := content.(map[string]interface{}) - id, _ := c["id"].(string) - - for _, h := range r.handlers { - if h, ok := h.(HandlerMembership); ok { - h.HandleJoin(ID(id)) - } - } - - case "ping": - break - default: //TODO: see if any other types are returned - if len(contentType) == 0 || content == nil { - break - } - log.Println(contentType) - b, _ := json.Marshal(content) - log.Fatalln(string(b)) - + log.Println("Unable to handle GroupMe message type", contentType) } + handler(r, channel, content) } }() } -func (r *PushSubscription) chatEvent(contentType string, b []byte) { - switch contentType { - case "favorite": - data := Message{} - _ = json.Unmarshal(b, &data) - for _, h := range r.handlers { - if h, ok := h.(HandlerLike); ok { - h.HandleLike(data.ID, data.FavoritedBy) - } - } - default: //TODO: see if any other types are returned - log.Println(contentType) - log.Fatalln(string(b)) - } - -} - -func (r *PushSubscription) systemEvent(groupID ID, msg systemMessage) { - kind := msg.Event.Kind - b, _ := json.Marshal(msg.Event.Data) - switch kind { - case "membership.nickname_changed": - data := struct { - Name string - User struct { - ID int - } - }{} - _ = json.Unmarshal(b, &data) - - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMembership); ok { - h.HandleNewNickname(groupID, ID(strconv.Itoa(data.User.ID)), data.Name) - } - } - case "membership.avatar_changed": - data := struct { - AvatarURL string `json:"avatar_url"` - User struct { - ID int - } - }{} - _ = json.Unmarshal(b, &data) - - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMembership); ok { - h.HandleNewAvatarInGroup(groupID, ID(strconv.Itoa(data.User.ID)), data.AvatarURL) - } - } - case "membership.announce.added": - data := struct { - Added []Member `json:"added_users"` - }{} - _ = json.Unmarshal(b, &data) - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMembership); ok { - h.HandleMembers(groupID, data.Added, true) - } - } - case "membership.notifications.removed": - data := struct { - Added Member `json:"removed_user"` - }{} - _ = json.Unmarshal(b, &data) - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMembership); ok { - h.HandleMembers(groupID, []Member{data.Added}, false) - } - } - case "group.role_change_admin": - //TODO - break - case "group.name_change": - data := struct { - Name string - }{} - _ = json.Unmarshal(b, &data) - - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMetadata); ok { - h.HandleGroupName(groupID, data.Name) - } - } - case "group.topic_change": - data := struct { - Topic string - }{} - _ = json.Unmarshal(b, &data) - - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMetadata); ok { - h.HandleGroupTopic(groupID, data.Topic) - } - } - case "group.avatar_change": - data := struct { - AvatarURL string `json:"avatar_url"` - }{} - _ = json.Unmarshal(b, &data) - - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMetadata); ok { - h.HandleGroupAvatar(groupID, data.AvatarURL) - } - } - case "group.like_icon_set": - data := struct { - LikeIcon struct { - PackID int `json:"pack_id"` - PackIndex int `json:"pack_index"` - Type string - } `json:"like_icon"` - }{} - _ = json.Unmarshal(b, &data) - - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMetadata); ok { - h.HandleLikeIcon(groupID, data.LikeIcon.PackID, data.LikeIcon.PackIndex, data.LikeIcon.Type) - } - } - case "group.like_icon_removed": - for _, h := range r.handlers { - if h, ok := h.(HandleGroupMetadata); ok { - h.HandleLikeIcon(groupID, 0, 0, "") - } - } - case "group.type_change", "group.required_approval_enabled", "group.required_approval_disabled": - //TODO: group joining - break - case "group.shared", "group.unshared": - //TODO - break - default: - log.Println(kind) - log.Fatalln(string(b)) - } -} - //SubscribeToUser to users -func (r *PushSubscription) SubscribeToUser(context context.Context, userID ID, authToken string) error { - concur.Lock() - defer concur.Unlock() - - if r.fayeClient == nil { - return errors.New("Not Listening") //TODO: Proper error - } - - token = authToken - r.fayeClient.WaitSubscribe(userChannel+userID.String(), r.channel) - - return nil +func (r *PushSubscription) SubscribeToUser(context context.Context, id ID, authToken string) error { + return r.subscribeWithPrefix(userChannel, context, id, authToken) } //SubscribeToGroup to groups for typing notification -func (r *PushSubscription) SubscribeToGroup(context context.Context, groupID ID, authToken string) error { +func (r *PushSubscription) SubscribeToGroup(context context.Context, id ID, authToken string) error { + return r.subscribeWithPrefix(groupChannel, context, id, authToken) +} + +//SubscribeToDM to users +func (r *PushSubscription) SubscribeToDM(context context.Context, id ID, authToken string) error { + id = ID(strings.Replace(id.String(), "+", "_", 1)) + return r.subscribeWithPrefix(dmChannel, context, id, authToken) +} + +func (r *PushSubscription) subscribeWithPrefix(prefix string, context context.Context, groupID ID, authToken string) error { concur.Lock() defer concur.Unlock() if r.fayeClient == nil { - return errors.New("Not Listening") //TODO: Proper error + return ErrListenerNotStarted } token = authToken - r.fayeClient.WaitSubscribe(groupChannel+groupID.String(), r.channel) + r.fayeClient.WaitSubscribe(prefix+groupID.String(), r.channel) return nil } @@ -373,27 +198,8 @@ func (r *PushSubscription) Connected() bool { return r.LastConnected+30 >= time.Now().Unix() } -// Stop listening to GroupMe after completing all other actions scheduled first -func (r *PushSubscription) Stop(context context.Context) { - concur.Lock() - defer concur.Unlock() - - //TODO: stop listening -} - -type authExtension struct { -} - -// In does nothing in this extension, but is needed to satisy the interface -func (e *authExtension) In(msg wray.Message) { - println(msg.Channel()) - if len(msg.Error()) > 0 { - log.Fatalln(msg.Error()) - } -} - // Out adds the authentication token to the messages ext field -func (e *authExtension) Out(msg wray.Message) { +func OutMsgProc(msg PushMessage) { if msg.Channel() == subscribeChannel { ext := msg.Ext() ext["access_token"] = token diff --git a/real_time_handler.go b/real_time_handler.go new file mode 100644 index 0000000..f608a53 --- /dev/null +++ b/real_time_handler.go @@ -0,0 +1,239 @@ +package groupme + +import ( + "encoding/json" + "fmt" + "log" + "strconv" +) + +func init() { + + RealTimeHandlers = make(map[string]func(r *PushSubscription, channel string, data ...interface{})) + + //Base Handlers on user channel + RealTimeHandlers["direct_message.create"] = func(r *PushSubscription, channel string, data ...interface{}) { + b, _ := json.Marshal(data[0]) + out := Message{} + _ = json.Unmarshal(b, &out) + + //maybe something with API versioning + out.ConversationID = out.ChatID + + if out.UserID.String() == "system" { + event := struct { + Event struct { + Kind string `json:"type"` + Data interface{} + } + }{} + + err := json.Unmarshal(b, &event) + if err != nil { + fmt.Println(err) + } + rawData, _ := json.Marshal(event.Event.Data) + handler, ok := RealTimeSystemHandlers[event.Event.Kind] + if !ok { + log.Println("Unable to handle system message of type", event.Event.Kind) + return + } + + id := out.GroupID + if len(id) == 0 { + id = out.ConversationID + } + + handler(r, channel, id, rawData) + return + } + + for _, h := range r.handlers { + if h, ok := h.(HandlerText); ok { + h.HandleTextMessage(out) + } + } + } + + RealTimeHandlers["line.create"] = RealTimeHandlers["direct_message.create"] + + RealTimeHandlers["like.create"] = func(r *PushSubscription, channel string, data ...interface{}) { //should be an associated chatEvent + } + + RealTimeHandlers["membership.create"] = func(r *PushSubscription, channel string, data ...interface{}) { + c, _ := data[0].(map[string]interface{}) + id, _ := c["id"].(string) + + for _, h := range r.handlers { + if h, ok := h.(HandlerMembership); ok { + h.HandleJoin(ID(id)) + } + } + + } + + //following are for each chat + RealTimeHandlers["favorite"] = func(r *PushSubscription, channel string, data ...interface{}) { + c, ok := data[0].(map[string]interface{}) + if !ok { + fmt.Println(data, "err") + return + } + e, ok := c["line"] + if !ok { + fmt.Println(data, "err") + return + } + d, _ := json.Marshal(e) + msg := Message{} + _ = json.Unmarshal(d, &msg) + for _, h := range r.handlers { + if h, ok := h.(HandlerLike); ok { + h.HandleLike(msg) + } + } + } + + //following are for messages from system (administrative/settings changes) + RealTimeSystemHandlers = make(map[string]func(r *PushSubscription, channel string, id ID, rawData []byte)) + + RealTimeSystemHandlers["membership.nickname_changed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + thing := struct { + Name string + User struct { + ID int + } + }{} + _ = json.Unmarshal(rawData, &thing) + + for _, h := range r.handlers { + if h, ok := h.(HandleMemberNewNickname); ok { + h.HandleNewNickname(id, ID(strconv.Itoa(thing.User.ID)), thing.Name) + } + } + + } + + RealTimeSystemHandlers["membership.avatar_changed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + content := struct { + AvatarURL string `json:"avatar_url"` + User struct { + ID int + } + }{} + _ = json.Unmarshal(rawData, &content) + + for _, h := range r.handlers { + if h, ok := h.(HandleMemberNewAvatar); ok { + h.HandleNewAvatarInGroup(id, ID(strconv.Itoa(content.User.ID)), content.AvatarURL) + } + } + + } + + RealTimeSystemHandlers["membership.announce.added"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + data := struct { + Added []Member `json:"added_users"` + }{} + _ = json.Unmarshal(rawData, &data) + for _, h := range r.handlers { + if h, ok := h.(HandleMembers); ok { + h.HandleMembers(id, data.Added, true) + } + } + } + + RealTimeSystemHandlers["membership.notifications.removed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + data := struct { + Added Member `json:"removed_user"` + }{} + _ = json.Unmarshal(rawData, &data) + for _, h := range r.handlers { + if h, ok := h.(HandleMembers); ok { + h.HandleMembers(id, []Member{data.Added}, false) + } + } + + } + + RealTimeSystemHandlers["membership.name_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + + data := struct { + Name string + }{} + _ = json.Unmarshal(rawData, &data) + + for _, h := range r.handlers { + if h, ok := h.(HandleGroupName); ok { + h.HandleGroupName(id, data.Name) + } + } + } + + RealTimeSystemHandlers["group.name_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + + data := struct { + Name string + }{} + _ = json.Unmarshal(rawData, &data) + + for _, h := range r.handlers { + if h, ok := h.(HandleGroupName); ok { + h.HandleGroupName(id, data.Name) + } + } + } + + RealTimeSystemHandlers["group.topic_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + + data := struct { + Topic string + }{} + _ = json.Unmarshal(rawData, &data) + + for _, h := range r.handlers { + if h, ok := h.(HandleGroupTopic); ok { + h.HandleGroupTopic(id, data.Topic) + } + } + } + + RealTimeSystemHandlers["group.avatar_change"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + data := struct { + AvatarURL string `json:"avatar_url"` + }{} + _ = json.Unmarshal(rawData, &data) + + for _, h := range r.handlers { + if h, ok := h.(HandleGroupAvatar); ok { + h.HandleGroupAvatar(id, data.AvatarURL) + } + } + } + + RealTimeSystemHandlers["group.like_icon_set"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + data := struct { + LikeIcon struct { + PackID int `json:"pack_id"` + PackIndex int `json:"pack_index"` + Type string + } `json:"like_icon"` + }{} + _ = json.Unmarshal(rawData, &data) + + for _, h := range r.handlers { + if h, ok := h.(HandleGroupLikeIcon); ok { + h.HandleLikeIcon(id, data.LikeIcon.PackID, data.LikeIcon.PackIndex, data.LikeIcon.Type) + } + } + } + + RealTimeSystemHandlers["group.like_icon_removed"] = func(r *PushSubscription, channel string, id ID, rawData []byte) { + for _, h := range r.handlers { + if h, ok := h.(HandleGroupLikeIcon); ok { + h.HandleLikeIcon(id, 0, 0, "") + } + } + } + +}