diff --git a/user.go b/user.go index 469579d..427695e 100644 --- a/user.go +++ b/user.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "net/http" + "sort" "strings" "sync" "time" @@ -345,6 +346,7 @@ func (user *User) Connect() bool { user.Conn = &conn user.Conn.StartListening(context.Background(), groupmeext.NewFayeClient(user.log)) user.Conn.AddFullHandler(user) + user.intPostLogin() //TODO: typing notification? return user.RestoreSession() @@ -502,46 +504,46 @@ func (user *User) postConnPing() bool { return true } -// func (user *User) intPostLogin() { -// defer user.syncWait.Done() -// user.lastReconnection = time.Now().Unix() -// user.Client = groupmeext.NewClient(user.Token) -// if len(user.JID) == 0 { -// myuser, err := user.Client.MyUser(context.TODO()) -// if err != nil { -// log.Fatal(err) //TODO -// } -// user.JID = myuser.ID.String() -// } -// user.Update() +func (user *User) intPostLogin() { + //defer user.syncWait.Done() + user.lastReconnection = time.Now().Unix() + user.Client = groupmeext.NewClient(user.Token) + if len(user.GMID) == 0 { + myuser, err := user.Client.MyUser(context.TODO()) + if err != nil { + log.Fatal(err) //TODO + } + user.GMID = myuser.ID + } + user.Update() -// user.tryAutomaticDoublePuppeting() + user.tryAutomaticDoublePuppeting() -// user.log.Debugln("Waiting for chat list receive confirmation") -// user.HandleChatList() -// select { -// case <-user.chatListReceived: -// user.log.Debugln("Chat list receive confirmation received in PostLogin") -// case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second): -// user.log.Warnln("Timed out waiting for chat list to arrive!") -// user.postConnPing() -// return -// } + user.log.Debugln("Waiting for chat list receive confirmation") + user.HandleChatList() + select { + case <-user.chatListReceived: + user.log.Debugln("Chat list receive confirmation received in PostLogin") + case <-time.After(time.Duration(1000 /**user.bridge.Config.Bridge.ChatListWait**/) * time.Second): + user.log.Warnln("Timed out waiting for chat list to arrive!") + user.postConnPing() + return + } -// if !user.postConnPing() { -// user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.") -// return -// } + if !user.postConnPing() { + user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.") + return + } -// user.log.Debugln("Waiting for portal sync complete confirmation") -// select { -// case <-user.syncPortalsDone: -// user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.") -// // TODO this is too short, maybe a per-portal duration? -// case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second): -// user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.") -// } -// } + user.log.Debugln("Waiting for portal sync complete confirmation") + select { + case <-user.syncPortalsDone: + user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.") + // TODO this is too short, maybe a per-portal duration? + case <-time.After(time.Duration(1000 /**user.bridge.Config.Bridge.PortalSyncWai**/) * time.Second): + user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.") + } +} func (user *User) HandleChatList() { chatMap := map[groupme.ID]groupme.Group{} @@ -588,84 +590,107 @@ func (user *User) HandleChatList() { go user.syncPortals(false) } +// Syncs chats & group messages func (user *User) syncPortals(createAll bool) { - // user.log.Infoln("Reading chat list") + user.log.Infoln("Reading chat list") - // chats := make(ChatList, 0, len(user.GroupList)+len(user.ChatList)) - // portalKeys := make([]database.PortalKeyWithMeta, 0, cap(chats)) + chats := make(ChatList, 0, len(user.GroupList)+len(user.ChatList)) + portalKeys := make([]database.PortalKey, 0, cap(chats)) - // for _, group := range user.GroupList { + for _, group := range user.GroupList { - // portal := user.bridge.GetPortalByJID(database.GroupPortalKey(group.ID.String())) + portal := user.bridge.GetPortalByGMID(database.GroupPortalKey(group.ID)) - // chats = append(chats, Chat{ - // Portal: portal, - // LastMessageTime: uint64(group.UpdatedAt.ToTime().Unix()), - // Group: &group, - // }) - // } - // for _, dm := range user.ChatList { - // portal := user.bridge.GetPortalByJID(database.NewPortalKey(dm.OtherUser.ID.String(), user.JID)) - // chats = append(chats, Chat{ - // Portal: portal, - // LastMessageTime: uint64(dm.UpdatedAt.ToTime().Unix()), - // DM: &dm, - // }) + chats = append(chats, Chat{ + Portal: portal, + LastMessageTime: uint64(group.UpdatedAt.ToTime().Unix()), + Group: &group, + }) + } + for _, dm := range user.ChatList { + portal := user.bridge.GetPortalByGMID(database.NewPortalKey(dm.OtherUser.ID, user.GMID)) + chats = append(chats, Chat{ + Portal: portal, + LastMessageTime: uint64(dm.UpdatedAt.ToTime().Unix()), + DM: &dm, + }) + } + + for _, chat := range chats { + //var inCommunity, ok bool + //if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity { + // inCommunity = user.addPortalToCommunity(chat.Portal) + // if chat.Portal.IsPrivateChat() { + // puppet := user.bridge.GetPuppetByGMID(chat.Portal.Key.GMID) + // user.addPuppetToCommunity(puppet) + // } + //} + portalKeys = append(portalKeys, chat.Portal.Key) + } + user.log.Infoln("Read chat list, updating user-portal mapping") + + err := user.SetPortalKeys(portalKeys) + if err != nil { + user.log.Warnln("Failed to update user-portal mapping:", err) + } + sort.Sort(chats) + _ = user.bridge.Config.Bridge.SyncDirectChatList + + user.log.Infoln("Syncing portals") + + wg := sync.WaitGroup{} + for i, chat := range chats { + wg.Add(1) + go func(chat Chat, i int) { + create := (int64(chat.LastMessageTime) >= user.lastReconnection && user.lastReconnection > 0) || i < len(chats) + if len(chat.Portal.MXID) > 0 || create || createAll { + chat.Portal.Sync(user, chat.Group) + //err := chat.Portal.BackfillHistory(user, chat.LastMessageTime) + if err != nil { + chat.Portal.log.Errorln("Error backfilling history:", err) + } + } + + wg.Done() + }(chat, i) + + } + wg.Wait() + //TODO: handle leave from groupme side + user.UpdateDirectChats(nil) + user.log.Infoln("Finished syncing portals") + select { + case user.syncPortalsDone <- struct{}{}: + default: + } +} + +func (user *User) SetPortalKeys(newKeys []database.PortalKey) error { + //database + //tx := user.db.Begin() + //ans := tx.Where("user_jid = ?", *user.jidPtr()).Delete(UserPortal{}) + // + //if ans.Error != nil { + // _ = tx.Rollback() + // return ans.Error + //} + // + //for _, key := range newKeys { + // ans = tx.Create(&UserPortal{ + // UserJID: *user.jidPtr(), + // PortalJID: key.JID, + // PortalReceiver: key.Receiver, + // InCommunity: key.InCommunity, + // }) + // if ans.Error != nil { + // _ = tx.Rollback() + // return ans.Error // } + //} - // for _, chat := range chats { - // var inCommunity, ok bool - // if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity { - // inCommunity = user.addPortalToCommunity(chat.Portal) - // if chat.Portal.IsPrivateChat() { - // puppet := user.bridge.GetPuppetByJID(chat.Portal.Key.GMID) - // user.addPuppetToCommunity(puppet) - // } - // } - // portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: chat.Portal.Key, InCommunity: inCommunity}) - // } - // user.log.Infoln("Read chat list, updating user-portal mapping") - - // err := user.SetPortalKeys(portalKeys) - // if err != nil { - // user.log.Warnln("Failed to update user-portal mapping:", err) - // } - // sort.Sort(chats) - // limit := user.bridge.Config.Bridge.InitialChatSync - // if limit < 0 { - // limit = len(chats) - // } - // now := uint64(time.Now().Unix()) - // user.log.Infoln("Syncing portals") - - // wg := sync.WaitGroup{} - // for i, chat := range chats { - // if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now { - // break - // } - // wg.Add(1) - // go func(chat Chat, i int) { - // create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit - // if len(chat.Portal.MXID) > 0 || create || createAll { - // chat.Portal.Sync(user, chat.Group) - // err := chat.Portal.BackfillHistory(user, chat.LastMessageTime) - // if err != nil { - // chat.Portal.log.Errorln("Error backfilling history:", err) - // } - // } - - // wg.Done() - // }(chat, i) - - // } - // wg.Wait() - // //TODO: handle leave from groupme side - // user.UpdateDirectChats(nil) - // user.log.Infoln("Finished syncing portals") - // select { - // case user.syncPortalsDone <- struct{}{}: - // default: - // } + println("portalkey transaction complete") + return nil + //return tx.Commit().Error } func (user *User) getDirectChats() map[id.UserID][]id.RoomID {