treewide: upgrading to latest mautrix standards

Signed-off-by: Sumner Evans <sumner@beeper.com>
This commit is contained in:
Sumner Evans
2022-10-21 14:02:33 -05:00
parent e843faf9b4
commit 715107f5a2
52 changed files with 1887 additions and 6449 deletions

412
user.go
View File

@@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"
@@ -30,6 +31,9 @@ import (
"github.com/Rhymen/go-whatsapp"
"maunium.net/go/mautrix"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/bridge"
"maunium.net/go/mautrix/bridge/bridgeconfig"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/format"
"maunium.net/go/mautrix/id"
@@ -37,31 +41,29 @@ import (
"github.com/karmanyaahm/groupme"
"github.com/beeper/groupme/database"
"github.com/beeper/groupme/groupmeExt"
"github.com/beeper/groupme/types"
whatsappExt "github.com/beeper/groupme/whatsapp-ext"
"github.com/beeper/groupme/groupmeext"
)
type User struct {
*database.User
Conn *groupme.PushSubscription
bridge *Bridge
bridge *GMBridge
log log.Logger
Admin bool
Whitelisted bool
RelaybotWhitelisted bool
Admin bool
Whitelisted bool
PermissionLevel bridgeconfig.PermissionLevel
IsRelaybot bool
BridgeState *bridge.BridgeStateQueue
Client *groupmeExt.Client
Client *groupmeext.Client
ConnectionErrors int
CommunityID string
ChatList map[types.GroupMeID]groupme.Chat
GroupList map[types.GroupMeID]groupme.Group
RelationList map[types.GroupMeID]groupme.User
ChatList map[groupme.ID]groupme.Chat
GroupList map[groupme.ID]groupme.Group
RelationList map[groupme.ID]groupme.User
cleanDisconnection bool
batteryWarningsSent int
@@ -73,125 +75,228 @@ type User struct {
messageInput chan PortalMessage
messageOutput chan PortalMessage
syncStart chan struct{}
syncWait sync.WaitGroup
mgmtCreateLock sync.Mutex
spaceCreateLock sync.Mutex
spaceMembershipChecked bool
}
func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
_, isPuppet := bridge.ParsePuppetMXID(userID)
if isPuppet || userID == bridge.Bot.UserID {
func (br *GMBridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
_, isPuppet := br.ParsePuppetMXID(userID)
if isPuppet || userID == br.Bot.UserID {
return nil
}
bridge.usersLock.Lock()
defer bridge.usersLock.Unlock()
user, ok := bridge.usersByMXID[userID]
br.usersLock.Lock()
defer br.usersLock.Unlock()
user, ok := br.usersByMXID[userID]
if !ok {
return bridge.loadDBUser(bridge.DB.User.GetByMXID(userID), &userID)
userIDPtr := &userID
if onlyIfExists {
userIDPtr = nil
}
return br.loadDBUser(br.DB.User.GetByMXID(userID), userIDPtr)
}
return user
}
func (bridge *Bridge) GetUserByJID(userID types.GroupMeID) *User {
func (br *GMBridge) GetUserByMXID(userID id.UserID) *User {
return br.getUserByMXID(userID, false)
}
func (br *GMBridge) GetIUser(userID id.UserID, create bool) bridge.User {
u := br.getUserByMXID(userID, !create)
if u == nil {
return nil
}
return u
}
func (user *User) GetPermissionLevel() bridgeconfig.PermissionLevel {
return user.PermissionLevel
}
func (user *User) GetManagementRoomID() id.RoomID {
return user.ManagementRoom
}
func (user *User) GetMXID() id.UserID {
return user.MXID
}
func (user *User) GetCommandState() map[string]interface{} {
return nil
}
func (br *GMBridge) GetUserByMXIDIfExists(userID id.UserID) *User {
return br.getUserByMXID(userID, true)
}
func (bridge *GMBridge) GetUserByGMID(gmid groupme.ID) *User {
bridge.usersLock.Lock()
defer bridge.usersLock.Unlock()
user, ok := bridge.usersByJID[userID]
user, ok := bridge.usersByGMID[gmid]
if !ok {
return bridge.loadDBUser(bridge.DB.User.GetByJID(userID), nil)
return bridge.loadDBUser(bridge.DB.User.GetByGMID(gmid), nil)
}
return user
}
func (user *User) addToJIDMap() {
func (user *User) addToGMIDMap() {
user.bridge.usersLock.Lock()
user.bridge.usersByJID[user.JID] = user
user.bridge.usersByGMID[user.GMID] = user
user.bridge.usersLock.Unlock()
}
func (user *User) removeFromJIDMap() {
func (user *User) removeFromGMIDMap() {
user.bridge.usersLock.Lock()
jidUser, ok := user.bridge.usersByJID[user.JID]
jidUser, ok := user.bridge.usersByGMID[user.GMID]
if ok && user == jidUser {
delete(user.bridge.usersByJID, user.JID)
delete(user.bridge.usersByGMID, user.GMID)
}
user.bridge.usersLock.Unlock()
user.bridge.Metrics.TrackLoginState(user.JID, false)
user.bridge.Metrics.TrackLoginState(user.GMID, false)
}
func (bridge *Bridge) GetAllUsers() []*User {
bridge.usersLock.Lock()
defer bridge.usersLock.Unlock()
dbUsers := bridge.DB.User.GetAll()
func (br *GMBridge) GetAllUsers() []*User {
br.usersLock.Lock()
defer br.usersLock.Unlock()
dbUsers := br.DB.User.GetAll()
output := make([]*User, len(dbUsers))
for index, dbUser := range dbUsers {
user, ok := bridge.usersByMXID[dbUser.MXID]
user, ok := br.usersByMXID[dbUser.MXID]
if !ok {
user = bridge.loadDBUser(dbUser, nil)
user = br.loadDBUser(dbUser, nil)
}
output[index] = user
}
return output
}
func (bridge *Bridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
func (br *GMBridge) loadDBUser(dbUser *database.User, mxid *id.UserID) *User {
if dbUser == nil {
if mxid == nil {
return nil
}
dbUser = bridge.DB.User.New()
dbUser = br.DB.User.New()
dbUser.MXID = *mxid
dbUser.Insert()
}
user := bridge.NewUser(dbUser)
bridge.usersByMXID[user.MXID] = user
if len(user.JID) > 0 {
bridge.usersByJID[user.JID] = user
user := br.NewUser(dbUser)
br.usersByMXID[user.MXID] = user
if len(user.GMID) > 0 {
br.usersByGMID[user.GMID] = user
}
if len(user.ManagementRoom) > 0 {
bridge.managementRooms[user.ManagementRoom] = user
br.managementRooms[user.ManagementRoom] = user
}
return user
}
func (user *User) GetPortals() []*Portal {
user.bridge.portalsLock.Lock()
keys := user.User.GetPortalKeys()
portals := make([]*Portal, len(keys))
for i, key := range keys {
portal, ok := user.bridge.portalsByJID[key]
if !ok {
portal = user.bridge.loadDBPortal(user.bridge.DB.Portal.GetByJID(key), &key)
}
portals[i] = portal
}
user.bridge.portalsLock.Unlock()
return portals
}
func (bridge *Bridge) NewUser(dbUser *database.User) *User {
func (br *GMBridge) NewUser(dbUser *database.User) *User {
user := &User{
User: dbUser,
bridge: bridge,
log: bridge.Log.Sub("User").Sub(string(dbUser.MXID)),
IsRelaybot: false,
bridge: br,
log: br.Log.Sub("User").Sub(string(dbUser.MXID)),
chatListReceived: make(chan struct{}, 1),
syncPortalsDone: make(chan struct{}, 1),
syncStart: make(chan struct{}, 1),
messageInput: make(chan PortalMessage),
messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
messageOutput: make(chan PortalMessage, br.Config.Bridge.UserMessageBuffer),
}
user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
user.PermissionLevel = user.bridge.Config.Bridge.Permissions.Get(user.MXID)
user.Whitelisted = user.PermissionLevel >= bridgeconfig.PermissionLevelUser
user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin
user.BridgeState = br.NewBridgeStateQueue(user, user.log)
go user.handleMessageLoop()
go user.runMessageRingBuffer()
return user
}
func (user *User) ensureInvited(intent *appservice.IntentAPI, roomID id.RoomID, isDirect bool) (ok bool) {
extraContent := make(map[string]interface{})
if isDirect {
extraContent["is_direct"] = true
}
customPuppet := user.bridge.GetPuppetByCustomMXID(user.MXID)
if customPuppet != nil && customPuppet.CustomIntent() != nil {
extraContent["fi.mau.will_auto_accept"] = true
}
_, err := intent.InviteUser(roomID, &mautrix.ReqInviteUser{UserID: user.MXID}, extraContent)
var httpErr mautrix.HTTPError
if err != nil && errors.As(err, &httpErr) && httpErr.RespError != nil && strings.Contains(httpErr.RespError.Err, "is already in the room") {
user.bridge.StateStore.SetMembership(roomID, user.MXID, event.MembershipJoin)
ok = true
return
} else if err != nil {
user.log.Warnfln("Failed to invite user to %s: %v", roomID, err)
} else {
ok = true
}
if customPuppet != nil && customPuppet.CustomIntent() != nil {
err = customPuppet.CustomIntent().EnsureJoined(roomID, appservice.EnsureJoinedParams{IgnoreCache: true})
if err != nil {
user.log.Warnfln("Failed to auto-join %s: %v", roomID, err)
ok = false
} else {
ok = true
}
}
return
}
func (user *User) GetSpaceRoom() id.RoomID {
if !user.bridge.Config.Bridge.PersonalFilteringSpaces {
return ""
}
if len(user.SpaceRoom) == 0 {
user.spaceCreateLock.Lock()
defer user.spaceCreateLock.Unlock()
if len(user.SpaceRoom) > 0 {
return user.SpaceRoom
}
resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
Visibility: "private",
Name: "GroupMe",
Topic: "Your GroupMe bridged chats",
InitialState: []*event.Event{{
Type: event.StateRoomAvatar,
Content: event.Content{
Parsed: &event.RoomAvatarEventContent{
URL: user.bridge.Config.AppService.Bot.ParsedAvatar,
},
},
}},
CreationContent: map[string]interface{}{
"type": event.RoomTypeSpace,
},
PowerLevelOverride: &event.PowerLevelsEventContent{
Users: map[id.UserID]int{
user.bridge.Bot.UserID: 9001,
user.MXID: 50,
},
},
})
if err != nil {
user.log.Errorln("Failed to auto-create space room:", err)
} else {
user.SpaceRoom = resp.RoomID
user.Update()
user.ensureInvited(user.bridge.Bot, user.SpaceRoom, false)
}
} else if !user.spaceMembershipChecked && !user.bridge.StateStore.IsInRoom(user.SpaceRoom, user.MXID) {
user.ensureInvited(user.bridge.Bot, user.SpaceRoom, false)
}
user.spaceMembershipChecked = true
return user.SpaceRoom
}
func (user *User) GetManagementRoom() id.RoomID {
if len(user.ManagementRoom) == 0 {
user.mgmtCreateLock.Lock()
@@ -199,9 +304,14 @@ func (user *User) GetManagementRoom() id.RoomID {
if len(user.ManagementRoom) > 0 {
return user.ManagementRoom
}
creationContent := make(map[string]interface{})
if !user.bridge.Config.Bridge.FederateRooms {
creationContent["m.federate"] = false
}
resp, err := user.bridge.Bot.CreateRoom(&mautrix.ReqCreateRoom{
Topic: "WhatsApp bridge notices",
IsDirect: true,
Topic: "GroupMe bridge notices",
IsDirect: true,
CreationContent: creationContent,
})
if err != nil {
user.log.Errorln("Failed to auto-create management room:", err)
@@ -240,22 +350,13 @@ func (user *User) Connect() bool {
}
user.log.Debugln("Connecting to WhatsApp")
timeout := time.Duration(user.bridge.Config.Bridge.ConnectionTimeout)
timeout := time.Duration(user.bridge.Config.GroupMe.ConnectionTimeout)
if timeout == 0 {
timeout = 20
}
conn := groupme.NewPushSubscription(context.Background())
user.Conn = &conn
user.Conn.StartListening(context.TODO(), groupmeExt.NewFayeClient(user.log))
// if err != nil {
// user.log.Errorln("Failed to connect to WhatsApp:", err)
// user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp server. " +
// "This indicates a network problem on the bridge server. See bridge logs for more info.")
// return false
// }
// user.Conn = whatsappExt.ExtendConn(conn)
// _ = user.Conn.SetClientName(user.bridge.Config.WhatsApp.OSName, user.bridge.Config.WhatsApp.BrowserName, WAVersion)
// user.log.Debugln("WhatsApp connection successful")
user.Conn.StartListening(context.TODO(), groupmeext.NewFayeClient(user.log))
user.Conn.AddFullHandler(user)
//TODO: typing notification?
@@ -264,26 +365,7 @@ func (user *User) Connect() bool {
func (user *User) RestoreSession() bool {
if len(user.Token) > 0 {
// if err == whatsapp.ErrAlreadyLoggedIn {
// return true
// } else if err != nil {
// user.log.Errorln("Failed to restore session:", err)
// if errors.Is(err, whatsapp.ErrUnpaired) {
// user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
// "To re-pair your phone, use `delete-session` and then `login`.")
// } else {
// user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
// "on your phone is reachable and use `reconnect` to try connecting again.")
// }
// user.log.Debugln("Disconnecting due to failed session restore...")
// _, err :=
// if err != nil {
// user.log.Errorln("Failed to disconnect after failed session restore:", err)
// }
// return false
// }
err := user.Conn.SubscribeToUser(context.TODO(), groupme.ID(user.JID), user.Token)
err := user.Conn.SubscribeToUser(context.TODO(), groupme.ID(user.GMID), user.Token.String())
if err != nil {
fmt.Println(err)
}
@@ -304,7 +386,7 @@ func (user *User) HasSession() bool {
}
func (user *User) IsConnected() bool {
println("better connectoin check TODO")
// TODO: better connection check
return user.Conn != nil
}
@@ -317,65 +399,27 @@ func (user *User) IsLoginInProgress() bool {
return false
}
func (user *User) GetJID() types.GroupMeID {
if len(user.JID) == 0 {
u, _ := user.Client.MyUser(context.TODO())
user.JID = u.ID.String()
func (user *User) GetGMID() groupme.ID {
if len(user.GMID) == 0 {
u, err := user.Client.MyUser(context.TODO())
if err != nil {
user.log.Errorln("Failed to get own GroupMe ID:", err)
return ""
}
user.GMID = u.ID
}
return user.JID
return user.GMID
}
func (user *User) Login(ce *CommandEvent) {
// qrChan := make(chan string, 3)
// eventIDChan := make(chan id.EventID, 1)
// go user.loginQrChannel(ce, qrChan, eventIDChan)
// session, err := user.Conn.LoginWithRetry(qrChan, user.bridge.Config.Bridge.LoginQRRegenCount)
// qrChan <- "stop"
// if err != nil {
// var eventID id.EventID
// select {
// case eventID = <-eventIDChan:
// default:
// }
// reply := event.MessageEventContent{
// MsgType: event.MsgText,
// }
// if err == whatsapp.ErrAlreadyLoggedIn {
// reply.Body = "You're already logged in"
// } else if err == whatsapp.ErrLoginInProgress {
// reply.Body = "You have a login in progress already."
// } else if err == whatsapp.ErrLoginTimedOut {
// reply.Body = "QR code scan timed out. Please try again."
// } else {
// user.log.Warnln("Failed to log in:", err)
// reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
// }
// msg := reply
// if eventID != "" {
// msg.NewContent = &reply
// msg.RelatesTo = &event.RelatesTo{
// Type: event.RelReplace,
// EventID: eventID,
// }
// }
// _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
// return
// }
// // TODO there's a bit of duplication between this and the provisioning API login method
// // Also between the two logout methods (commands.go and provisioning.go)
// user.ConnectionErrors = 0
// user.JID = strings.Replace(user.Conn.Info.Wid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, 1)
if len(ce.Args) < 1 {
ce.Reply(`Get your access token from https://dev.groupme.com/ which should be the first argument to login`)
return
}
user.Token = ce.Args[0]
func (user *User) Login(token string) error {
user.Token = token
user.addToJIDMap()
//user.SetSession(&session)
ce.Reply("Successfully logged in, synchronizing chats...")
user.addToGMIDMap()
user.PostLogin()
user.Connect()
if user.Connect() {
return nil
}
return errors.New("failed to connect")
}
type Chat struct {
@@ -385,11 +429,6 @@ type Chat struct {
DM *groupme.Chat
}
////returns private chat assuming one of group or dm have been initialized properly
//func (c Chat) IsPrivate() bool {
// return c.Group == nil
//}
type ChatList []Chat
func (cl ChatList) Len() int {
@@ -405,12 +444,9 @@ func (cl ChatList) Swap(i, j int) {
}
func (user *User) PostLogin() {
user.bridge.Metrics.TrackConnectionState(user.JID, true)
user.bridge.Metrics.TrackLoginState(user.JID, true)
user.bridge.Metrics.TrackConnectionState(user.GMID, true)
user.bridge.Metrics.TrackLoginState(user.GMID, true)
user.bridge.Metrics.TrackBufferLength(user.MXID, 0)
user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
user.syncWait.Add(1)
user.syncStart <- struct{}{}
go user.intPostLogin()
}
@@ -423,7 +459,7 @@ func (user *User) tryAutomaticDoublePuppeting() {
return
}
user.log.Debugln("Checking if double puppeting needs to be enabled")
puppet := user.bridge.GetPuppetByJID(user.JID)
puppet := user.bridge.GetPuppetByGMID(user.JID)
if len(puppet.CustomMXID) > 0 {
user.log.Debugln("User already has double-puppeting enabled")
// Custom puppet already enabled
@@ -486,7 +522,7 @@ func (user *User) postConnPing() bool {
func (user *User) intPostLogin() {
defer user.syncWait.Done()
user.lastReconnection = time.Now().Unix()
user.Client = groupmeExt.NewClient(user.Token)
user.Client = groupmeext.NewClient(user.Token)
if len(user.JID) == 0 {
myuser, err := user.Client.MyUser(context.TODO())
if err != nil {
@@ -525,22 +561,6 @@ func (user *User) intPostLogin() {
}
}
func (user *User) HandleStreamEvent(evt whatsappExt.StreamEvent) {
if evt.Type == whatsappExt.StreamSleep {
if user.lastReconnection+60 > time.Now().Unix() {
user.lastReconnection = 0
user.log.Infoln("Stream went to sleep soon after reconnection, making new post-connection ping in 20 seconds")
go func() {
time.Sleep(20 * time.Second)
// TODO if this happens during the post-login sync, it can get stuck forever
user.postConnPing()
}()
}
} else {
user.log.Infofln("Stream event: %+v", evt)
}
}
func (user *User) HandleChatList() {
chatMap := make(map[string]groupme.Group)
chats, err := user.Client.IndexAllGroups()
@@ -618,7 +638,7 @@ func (user *User) syncPortals(createAll bool) {
if inCommunity, ok = existingKeys[chat.Portal.Key]; !ok || !inCommunity {
inCommunity = user.addPortalToCommunity(chat.Portal)
if chat.Portal.IsPrivateChat() {
puppet := user.bridge.GetPuppetByJID(chat.Portal.Key.JID)
puppet := user.bridge.GetPuppetByJID(chat.Portal.Key.GMID)
user.addPuppetToCommunity(puppet)
}
}
@@ -673,7 +693,7 @@ func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
privateChats := user.bridge.DB.Portal.FindPrivateChats(user.JID)
for _, portal := range privateChats {
if len(portal.MXID) > 0 {
res[user.bridge.FormatPuppetMXID(portal.Key.JID)] = []id.RoomID{portal.MXID}
res[user.bridge.FormatPuppetMXID(portal.Key.GMID)] = []id.RoomID{portal.MXID}
}
}
return res
@@ -845,12 +865,12 @@ func (user *User) HandleJSONParseError(err error) {
user.log.Errorln("WhatsApp JSON parse error:", err)
}
func (user *User) PortalKey(jid types.GroupMeID) database.PortalKey {
return database.NewPortalKey(jid, user.JID)
func (user *User) PortalKey(gmid groupme.ID) database.PortalKey {
return database.NewPortalKey(gmid, user.GMID)
}
func (user *User) GetPortalByJID(jid types.GroupMeID) *Portal {
return user.bridge.GetPortalByJID(user.PortalKey(jid))
func (user *User) GetPortalByJID(gmid groupme.ID) *Portal {
return user.bridge.GetPortalByGMID(user.PortalKey(gmid))
}
func (user *User) runMessageRingBuffer() {
@@ -871,8 +891,8 @@ func (user *User) handleMessageLoop() {
select {
case msg := <-user.messageOutput:
user.bridge.Metrics.TrackBufferLength(user.MXID, len(user.messageOutput))
puppet := user.bridge.GetPuppetByJID(msg.data.UserID.String())
portal := user.bridge.GetPortalByJID(msg.chat)
puppet := user.bridge.GetPuppetByGMID(msg.data.UserID)
portal := user.bridge.GetPortalByGMID(msg.chat)
if puppet != nil {
puppet.Sync(user, portal.MXID, groupme.Member{
UserID: msg.data.UserID,
@@ -881,12 +901,6 @@ func (user *User) handleMessageLoop() {
}) //TODO: add params or docs?
}
portal.messages <- msg
case <-user.syncStart:
user.log.Debugln("Processing of incoming messages is locked")
user.bridge.Metrics.TrackSyncLock(user.JID, true)
user.syncWait.Wait()
user.bridge.Metrics.TrackSyncLock(user.JID, false)
user.log.Debugln("Processing of incoming messages unlocked")
}
}
}