fixup! treewide: upgrading to latest mautrix standards

Signed-off-by: Sumner Evans <sumner@beeper.com>
This commit is contained in:
Sumner Evans
2022-10-21 17:04:21 -05:00
parent 853fad9576
commit 5b3be968d2
8 changed files with 442 additions and 472 deletions

284
user.go
View File

@@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"
@@ -200,9 +199,8 @@ func (br *GMBridge) NewUser(dbUser *database.User) *User {
chatListReceived: make(chan struct{}, 1),
syncPortalsDone: make(chan struct{}, 1),
syncStart: make(chan struct{}, 1),
messageInput: make(chan PortalMessage),
messageOutput: make(chan PortalMessage, br.Config.Bridge.UserMessageBuffer),
messageOutput: make(chan PortalMessage, br.Config.Bridge.PortalMessageBuffer),
}
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
@@ -349,7 +347,7 @@ func (user *User) Connect() bool {
return false
}
user.log.Debugln("Connecting to WhatsApp")
user.log.Debugln("Connecting to GroupMe")
timeout := time.Duration(user.bridge.Config.GroupMe.ConnectionTimeout)
if timeout == 0 {
timeout = 20
@@ -365,7 +363,7 @@ func (user *User) Connect() bool {
func (user *User) RestoreSession() bool {
if len(user.Token) > 0 {
err := user.Conn.SubscribeToUser(context.TODO(), groupme.ID(user.GMID), user.Token.String())
err := user.Conn.SubscribeToUser(context.TODO(), groupme.ID(user.GMID), user.Token)
if err != nil {
fmt.Println(err)
}
@@ -447,19 +445,15 @@ func (user *User) PostLogin() {
user.bridge.Metrics.TrackConnectionState(user.GMID, true)
user.bridge.Metrics.TrackLoginState(user.GMID, true)
user.bridge.Metrics.TrackBufferLength(user.MXID, 0)
go user.intPostLogin()
// go user.intPostLogin()
}
func (user *User) tryAutomaticDoublePuppeting() {
if len(user.bridge.Config.Bridge.LoginSharedSecret) == 0 {
// Automatic login not enabled
return
} else if _, homeserver, _ := user.MXID.Parse(); homeserver != user.bridge.Config.Homeserver.Domain {
// user is on another homeserver
if !user.bridge.Config.CanAutoDoublePuppet(user.MXID) {
return
}
user.log.Debugln("Checking if double puppeting needs to be enabled")
puppet := user.bridge.GetPuppetByGMID(user.JID)
puppet := user.bridge.GetPuppetByGMID(user.GMID)
if len(puppet.CustomMXID) > 0 {
user.log.Debugln("User already has double-puppeting enabled")
// Custom puppet already enabled
@@ -519,85 +513,84 @@ func (user *User) postConnPing() bool {
return true
}
func (user *User) intPostLogin() {
defer user.syncWait.Done()
user.lastReconnection = time.Now().Unix()
user.Client = groupmeext.NewClient(user.Token)
if len(user.JID) == 0 {
myuser, err := user.Client.MyUser(context.TODO())
if err != nil {
log.Fatal(err) //TODO
}
user.JID = myuser.ID.String()
}
user.Update()
// func (user *User) intPostLogin() {
// defer user.syncWait.Done()
// user.lastReconnection = time.Now().Unix()
// user.Client = groupmeext.NewClient(user.Token)
// if len(user.JID) == 0 {
// myuser, err := user.Client.MyUser(context.TODO())
// if err != nil {
// log.Fatal(err) //TODO
// }
// user.JID = myuser.ID.String()
// }
// user.Update()
user.createCommunity()
user.tryAutomaticDoublePuppeting()
// user.tryAutomaticDoublePuppeting()
user.log.Debugln("Waiting for chat list receive confirmation")
user.HandleChatList()
select {
case <-user.chatListReceived:
user.log.Debugln("Chat list receive confirmation received in PostLogin")
case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
user.log.Warnln("Timed out waiting for chat list to arrive!")
user.postConnPing()
return
}
// user.log.Debugln("Waiting for chat list receive confirmation")
// user.HandleChatList()
// select {
// case <-user.chatListReceived:
// user.log.Debugln("Chat list receive confirmation received in PostLogin")
// case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second):
// user.log.Warnln("Timed out waiting for chat list to arrive!")
// user.postConnPing()
// return
// }
if !user.postConnPing() {
user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
return
}
// if !user.postConnPing() {
// user.log.Debugln("Post-connection ping failed, unlocking processing of incoming messages.")
// return
// }
user.log.Debugln("Waiting for portal sync complete confirmation")
select {
case <-user.syncPortalsDone:
user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
// TODO this is too short, maybe a per-portal duration?
case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
}
}
// user.log.Debugln("Waiting for portal sync complete confirmation")
// select {
// case <-user.syncPortalsDone:
// user.log.Debugln("Post-connection portal sync complete, unlocking processing of incoming messages.")
// // TODO this is too short, maybe a per-portal duration?
// case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second):
// user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.")
// }
// }
func (user *User) HandleChatList() {
chatMap := make(map[string]groupme.Group)
chatMap := map[groupme.ID]groupme.Group{}
chats, err := user.Client.IndexAllGroups()
if err != nil {
user.log.Errorln("chat sync error", err) //TODO: handle
return
}
for _, chat := range chats {
chatMap[chat.ID.String()] = *chat
chatMap[chat.ID] = *chat
}
user.GroupList = chatMap
dmMap := make(map[string]groupme.Chat)
dmMap := map[groupme.ID]groupme.Chat{}
dms, err := user.Client.IndexAllChats()
if err != nil {
user.log.Errorln("chat sync error", err) //TODO: handle
return
}
for _, dm := range dms {
dmMap[dm.OtherUser.ID.String()] = *dm
dmMap[dm.OtherUser.ID] = *dm
}
user.ChatList = dmMap
userMap := make(map[string]groupme.User)
userMap := map[groupme.ID]groupme.User{}
users, err := user.Client.IndexAllRelations()
if err != nil {
user.log.Errorln("Error syncing user list, continuing sync", err)
}
for _, u := range users {
puppet := user.bridge.GetPuppetByJID(u.ID.String())
puppet := user.bridge.GetPuppetByGMID(u.ID)
// "" for overall user not related to one group
puppet.Sync(nil, "", groupme.Member{
puppet.Sync(nil, &groupme.Member{
UserID: u.ID,
Nickname: u.Name,
ImageURL: u.AvatarURL,
})
userMap[u.ID.String()] = *u
}, false, false)
userMap[u.ID] = *u
}
user.RelationList = userMap
@@ -607,90 +600,88 @@ func (user *User) HandleChatList() {
}
func (user *User) syncPortals(createAll bool) {
// user.log.Infoln("Reading chat list")
user.log.Infoln("Reading chat list")
// chats := make(ChatList, 0, len(user.GroupList)+len(user.ChatList))
// portalKeys := make([]database.PortalKeyWithMeta, 0, cap(chats))
chats := make(ChatList, 0, len(user.GroupList)+len(user.ChatList))
existingKeys := user.GetInCommunityMap()
portalKeys := make([]database.PortalKeyWithMeta, 0, cap(chats))
// for _, group := range user.GroupList {
for _, group := range user.GroupList {
// portal := user.bridge.GetPortalByJID(database.GroupPortalKey(group.ID.String()))
portal := user.bridge.GetPortalByJID(database.GroupPortalKey(group.ID.String()))
// chats = append(chats, Chat{
// Portal: portal,
// LastMessageTime: uint64(group.UpdatedAt.ToTime().Unix()),
// Group: &group,
// })
// }
// for _, dm := range user.ChatList {
// portal := user.bridge.GetPortalByJID(database.NewPortalKey(dm.OtherUser.ID.String(), user.JID))
// chats = append(chats, Chat{
// Portal: portal,
// LastMessageTime: uint64(dm.UpdatedAt.ToTime().Unix()),
// DM: &dm,
// })
// }
chats = append(chats, Chat{
Portal: portal,
LastMessageTime: uint64(group.UpdatedAt.ToTime().Unix()),
Group: &group,
})
}
for _, dm := range user.ChatList {
portal := user.bridge.GetPortalByJID(database.NewPortalKey(dm.OtherUser.ID.String(), user.JID))
chats = append(chats, Chat{
Portal: portal,
LastMessageTime: uint64(dm.UpdatedAt.ToTime().Unix()),
DM: &dm,
})
}
// for _, chat := range chats {
// var inCommunity, ok bool
// if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity {
// inCommunity = user.addPortalToCommunity(chat.Portal)
// if chat.Portal.IsPrivateChat() {
// puppet := user.bridge.GetPuppetByJID(chat.Portal.Key.GMID)
// user.addPuppetToCommunity(puppet)
// }
// }
// portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: chat.Portal.Key, InCommunity: inCommunity})
// }
// user.log.Infoln("Read chat list, updating user-portal mapping")
for _, chat := range chats {
var inCommunity, ok bool
if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity {
inCommunity = user.addPortalToCommunity(chat.Portal)
if chat.Portal.IsPrivateChat() {
puppet := user.bridge.GetPuppetByJID(chat.Portal.Key.GMID)
user.addPuppetToCommunity(puppet)
}
}
portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: chat.Portal.Key, InCommunity: inCommunity})
}
user.log.Infoln("Read chat list, updating user-portal mapping")
// err := user.SetPortalKeys(portalKeys)
// if err != nil {
// user.log.Warnln("Failed to update user-portal mapping:", err)
// }
// sort.Sort(chats)
// limit := user.bridge.Config.Bridge.InitialChatSync
// if limit < 0 {
// limit = len(chats)
// }
// now := uint64(time.Now().Unix())
// user.log.Infoln("Syncing portals")
err := user.SetPortalKeys(portalKeys)
if err != nil {
user.log.Warnln("Failed to update user-portal mapping:", err)
}
sort.Sort(chats)
limit := user.bridge.Config.Bridge.InitialChatSync
if limit < 0 {
limit = len(chats)
}
now := uint64(time.Now().Unix())
user.log.Infoln("Syncing portals")
// wg := sync.WaitGroup{}
// for i, chat := range chats {
// if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
// break
// }
// wg.Add(1)
// go func(chat Chat, i int) {
// create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
// if len(chat.Portal.MXID) > 0 || create || createAll {
// chat.Portal.Sync(user, chat.Group)
// err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
// if err != nil {
// chat.Portal.log.Errorln("Error backfilling history:", err)
// }
// }
wg := sync.WaitGroup{}
for i, chat := range chats {
if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
break
}
wg.Add(1)
go func(chat Chat, i int) {
create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
if len(chat.Portal.MXID) > 0 || create || createAll {
chat.Portal.Sync(user, chat.Group)
err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
if err != nil {
chat.Portal.log.Errorln("Error backfilling history:", err)
}
}
// wg.Done()
// }(chat, i)
wg.Done()
}(chat, i)
}
wg.Wait()
//TODO: handle leave from groupme side
user.UpdateDirectChats(nil)
user.log.Infoln("Finished syncing portals")
select {
case user.syncPortalsDone <- struct{}{}:
default:
}
// }
// wg.Wait()
// //TODO: handle leave from groupme side
// user.UpdateDirectChats(nil)
// user.log.Infoln("Finished syncing portals")
// select {
// case user.syncPortalsDone <- struct{}{}:
// default:
// }
}
func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
res := make(map[id.UserID][]id.RoomID)
privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
privateChats := user.bridge.DB.Portal.FindPrivateChats(user.GMID)
for _, portal := range privateChats {
if len(portal.MXID) > 0 {
res[user.bridge.FormatPuppetMXID(portal.Key.GMID)] = []id.RoomID{portal.MXID}
@@ -715,11 +706,8 @@ func (user *User) UpdateDirectChats(chats map[id.UserID][]id.RoomID) {
}
user.log.Debugln("Updating m.direct list on homeserver")
var err error
if user.bridge.Config.Homeserver.Asmux {
urlPath := intent.BuildBaseURL("_matrix", "client", "unstable", "net.maunium.asmux", "dms")
// _, err = intent.MakeFullRequest(method, urlPath, http.Header{
// "X-Asmux-Auth": {user.bridge.AS.Registration.AppToken},
// }, chats, nil)
if user.bridge.Config.Homeserver.Software == bridgeconfig.SoftwareAsmux {
urlPath := intent.BuildClientURL("unstable", "com.beeper.asmux", "dms")
_, err = intent.MakeFullRequest(mautrix.FullRequest{
Method: method,
URL: urlPath,
@@ -772,9 +760,9 @@ func (user *User) syncPuppets(contacts map[string]whatsapp.Contact) {
}
func (user *User) updateLastConnectionIfNecessary() {
if user.LastConnection+60 < uint64(time.Now().Unix()) {
user.UpdateLastConnection()
}
// if user.LastConnection+60 < uint64(time.Now().Unix()) {
// user.UpdateLastConnection()
// }
}
func (user *User) HandleError(err error) {
@@ -784,7 +772,7 @@ func (user *User) HandleError(err error) {
if closed, ok := err.(*whatsapp.ErrConnectionClosed); ok {
user.bridge.Metrics.TrackDisconnection(user.MXID)
if closed.Code == 1000 && user.cleanDisconnection {
user.bridge.Metrics.TrackConnectionState(user.JID, false)
user.bridge.Metrics.TrackConnectionState(user.GMID, false)
user.cleanDisconnection = false
user.log.Infoln("Clean disconnection by server")
return
@@ -869,7 +857,7 @@ func (user *User) PortalKey(gmid groupme.ID) database.PortalKey {
return database.NewPortalKey(gmid, user.GMID)
}
func (user *User) GetPortalByJID(gmid groupme.ID) *Portal {
func (user *User) GetPortalByGMID(gmid groupme.ID) *Portal {
return user.bridge.GetPortalByGMID(user.PortalKey(gmid))
}
@@ -894,11 +882,11 @@ func (user *User) handleMessageLoop() {
puppet := user.bridge.GetPuppetByGMID(msg.data.UserID)
portal := user.bridge.GetPortalByGMID(msg.chat)
if puppet != nil {
puppet.Sync(user, portal.MXID, groupme.Member{
puppet.Sync(user, &groupme.Member{
UserID: msg.data.UserID,
Nickname: msg.data.Name,
ImageURL: msg.data.AvatarURL,
}) //TODO: add params or docs?
}, false, false)
}
portal.messages <- msg
}
@@ -984,19 +972,19 @@ func (user *User) HandleLikeIcon(_ groupme.ID, _, _ int, _ string) {
}
func (user *User) HandleNewNickname(groupID, userID groupme.ID, name string) {
puppet := user.bridge.GetPuppetByJID(userID.String())
puppet := user.bridge.GetPuppetByGMID(userID)
if puppet != nil {
puppet.UpdateName(user, user.GetPortalByJID(groupID.String()).MXID, groupme.Member{
puppet.UpdateName(groupme.Member{
Nickname: name,
UserID: userID,
})
}, false)
}
}
func (user *User) HandleNewAvatarInGroup(groupID, userID groupme.ID, url string) {
puppet := user.bridge.GetPuppetByJID(userID.String())
puppet := user.bridge.GetPuppetByGMID(userID)
if puppet != nil {
puppet.UpdateAvatar(user, user.GetPortalByJID(groupID.String()).MXID, url)
puppet.UpdateAvatar(user, false)
}
}
@@ -1262,7 +1250,3 @@ type FakeMessage struct {
//func (user *User) HandleUnknownBinaryNode(node *waBinary.Node) {
// user.log.Debugfln("Unknown binary message: %+v", node)
//}
func (user *User) NeedsRelaybot(portal *Portal) bool {
return !user.HasSession() || !user.IsInPortal(portal.Key)
}