This commit is contained in:
Brandon Watson 2023-09-03 19:26:53 -05:00
parent 47656ca0bb
commit 48e426dd67

239
user.go
View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"
@ -345,6 +346,7 @@ func (user *User) Connect() bool {
user.Conn = &conn
user.Conn.StartListening(context.Background(), groupmeext.NewFayeClient(user.log))
user.Conn.AddFullHandler(user)
user.intPostLogin()
//TODO: typing notification?
return user.RestoreSession()
@ -502,46 +504,46 @@ func (user *User) postConnPing() bool {
return true
}
// func (user *User) intPostLogin() {
// defer user.syncWait.Done()
// user.lastReconnection = time.Now().Unix()
// user.Client = groupmeext.NewClient(user.Token)
// if len(user.JID) == 0 {
// myuser, err := user.Client.MyUser(context.TODO())
// if err != nil {
// log.Fatal(err) //TODO
// }
// user.JID = myuser.ID.String()
// }
// user.Update()
func (user *User) intPostLogin() {
//defer user.syncWait.Done()
user.lastReconnection = time.Now().Unix()
user.Client = groupmeext.NewClient(user.Token)
if len(user.GMID) == 0 {
myuser, err := user.Client.MyUser(context.TODO())
if err != nil {
log.Fatal(err) //TODO
}
user.GMID = myuser.ID
}
user.Update()
// user.tryAutomaticDoublePuppeting()
user.tryAutomaticDoublePuppeting()
// user.log.Debugln("Waiting for chat list receive confirmation")
// user.HandleChatList()
// select {
// case <-user.chatListReceived:
// user.log.Debugln("Chat list receive confirmation received in PostLogin")
// case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
// user.log.Warnln("Timed out waiting for chat list to arrive!")
// user.postConnPing()
// return
// }
user.log.Debugln("Waiting for chat list receive confirmation")
user.HandleChatList()
select {
case <-user.chatListReceived:
user.log.Debugln("Chat list receive confirmation received in PostLogin")
case <-time.After(time.Duration(1000 /**user.bridge.Config.Bridge.ChatListWait**/) * time.Second):
user.log.Warnln("Timed out waiting for chat list to arrive!")
user.postConnPing()
return
}
// if !user.postConnPing() {
// user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
// return
// }
if !user.postConnPing() {
user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
return
}
// user.log.Debugln("Waiting for portal sync complete confirmation")
// select {
// case <-user.syncPortalsDone:
// user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
// // TODO this is too short, maybe a per-portal duration?
// case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
// user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
// }
// }
user.log.Debugln("Waiting for portal sync complete confirmation")
select {
case <-user.syncPortalsDone:
user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
// TODO this is too short, maybe a per-portal duration?
case <-time.After(time.Duration(1000 /**user.bridge.Config.Bridge.PortalSyncWai**/) * time.Second):
user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
}
}
func (user *User) HandleChatList() {
chatMap := map[groupme.ID]groupme.Group{}
@ -588,84 +590,107 @@ func (user *User) HandleChatList() {
go user.syncPortals(false)
}
// Syncs chats & group messages
func (user *User) syncPortals(createAll bool) {
// user.log.Infoln("Reading chat list")
user.log.Infoln("Reading chat list")
// chats := make(ChatList, 0, len(user.GroupList)+len(user.ChatList))
// portalKeys := make([]database.PortalKeyWithMeta, 0, cap(chats))
chats := make(ChatList, 0, len(user.GroupList)+len(user.ChatList))
portalKeys := make([]database.PortalKey, 0, cap(chats))
// for _, group := range user.GroupList {
for _, group := range user.GroupList {
// portal := user.bridge.GetPortalByJID(database.GroupPortalKey(group.ID.String()))
portal := user.bridge.GetPortalByGMID(database.GroupPortalKey(group.ID))
// chats = append(chats, Chat{
// Portal: portal,
// LastMessageTime: uint64(group.UpdatedAt.ToTime().Unix()),
// Group: &group,
// })
// }
// for _, dm := range user.ChatList {
// portal := user.bridge.GetPortalByJID(database.NewPortalKey(dm.OtherUser.ID.String(), user.JID))
// chats = append(chats, Chat{
// Portal: portal,
// LastMessageTime: uint64(dm.UpdatedAt.ToTime().Unix()),
// DM: &dm,
// })
chats = append(chats, Chat{
Portal: portal,
LastMessageTime: uint64(group.UpdatedAt.ToTime().Unix()),
Group: &group,
})
}
for _, dm := range user.ChatList {
portal := user.bridge.GetPortalByGMID(database.NewPortalKey(dm.OtherUser.ID, user.GMID))
chats = append(chats, Chat{
Portal: portal,
LastMessageTime: uint64(dm.UpdatedAt.ToTime().Unix()),
DM: &dm,
})
}
for _, chat := range chats {
//var inCommunity, ok bool
//if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity {
// inCommunity = user.addPortalToCommunity(chat.Portal)
// if chat.Portal.IsPrivateChat() {
// puppet := user.bridge.GetPuppetByGMID(chat.Portal.Key.GMID)
// user.addPuppetToCommunity(puppet)
// }
//}
portalKeys = append(portalKeys, chat.Portal.Key)
}
user.log.Infoln("Read chat list, updating user-portal mapping")
err := user.SetPortalKeys(portalKeys)
if err != nil {
user.log.Warnln("Failed to update user-portal mapping:", err)
}
sort.Sort(chats)
_ = user.bridge.Config.Bridge.SyncDirectChatList
user.log.Infoln("Syncing portals")
wg := sync.WaitGroup{}
for i, chat := range chats {
wg.Add(1)
go func(chat Chat, i int) {
create := (int64(chat.LastMessageTime) >= user.lastReconnection && user.lastReconnection > 0) || i < len(chats)
if len(chat.Portal.MXID) > 0 || create || createAll {
chat.Portal.Sync(user, chat.Group)
//err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
if err != nil {
chat.Portal.log.Errorln("Error backfilling history:", err)
}
}
wg.Done()
}(chat, i)
}
wg.Wait()
//TODO: handle leave from groupme side
user.UpdateDirectChats(nil)
user.log.Infoln("Finished syncing portals")
select {
case user.syncPortalsDone <- struct{}{}:
default:
}
}
func (user *User) SetPortalKeys(newKeys []database.PortalKey) error {
//database
//tx := user.db.Begin()
//ans := tx.Where("user_jid = ?", *user.jidPtr()).Delete(UserPortal{})
//
//if ans.Error != nil {
// _ = tx.Rollback()
// return ans.Error
//}
//
//for _, key := range newKeys {
// ans = tx.Create(&UserPortal{
// UserJID: *user.jidPtr(),
// PortalJID: key.JID,
// PortalReceiver: key.Receiver,
// InCommunity: key.InCommunity,
// })
// if ans.Error != nil {
// _ = tx.Rollback()
// return ans.Error
// }
//}
// for _, chat := range chats {
// var inCommunity, ok bool
// if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity {
// inCommunity = user.addPortalToCommunity(chat.Portal)
// if chat.Portal.IsPrivateChat() {
// puppet := user.bridge.GetPuppetByJID(chat.Portal.Key.GMID)
// user.addPuppetToCommunity(puppet)
// }
// }
// portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: chat.Portal.Key, InCommunity: inCommunity})
// }
// user.log.Infoln("Read chat list, updating user-portal mapping")
// err := user.SetPortalKeys(portalKeys)
// if err != nil {
// user.log.Warnln("Failed to update user-portal mapping:", err)
// }
// sort.Sort(chats)
// limit := user.bridge.Config.Bridge.InitialChatSync
// if limit < 0 {
// limit = len(chats)
// }
// now := uint64(time.Now().Unix())
// user.log.Infoln("Syncing portals")
// wg := sync.WaitGroup{}
// for i, chat := range chats {
// if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
// break
// }
// wg.Add(1)
// go func(chat Chat, i int) {
// create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
// if len(chat.Portal.MXID) > 0 || create || createAll {
// chat.Portal.Sync(user, chat.Group)
// err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
// if err != nil {
// chat.Portal.log.Errorln("Error backfilling history:", err)
// }
// }
// wg.Done()
// }(chat, i)
// }
// wg.Wait()
// //TODO: handle leave from groupme side
// user.UpdateDirectChats(nil)
// user.log.Infoln("Finished syncing portals")
// select {
// case user.syncPortalsDone <- struct{}{}:
// default:
// }
println("portalkey transaction complete")
return nil
//return tx.Commit().Error
}
func (user *User) getDirectChats() map[id.UserID][]id.RoomID {