BIG ASS COMMIT

This commit is contained in:
Karmanyaah Malhotra
2021-02-13 00:53:35 -05:00
parent 95f6487912
commit eafc18099d
47 changed files with 3412 additions and 3240 deletions

527
user.go
View File

@@ -17,12 +17,11 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
@@ -39,14 +38,15 @@ import (
"maunium.net/go/mautrix/format"
"maunium.net/go/mautrix/id"
"github.com/karmanyaahm/groupme"
"maunium.net/go/mautrix-whatsapp/database"
"maunium.net/go/mautrix-whatsapp/types"
"maunium.net/go/mautrix-whatsapp/whatsapp-ext"
whatsappExt "maunium.net/go/mautrix-whatsapp/whatsapp-ext"
)
type User struct {
*database.User
Conn *whatsappExt.ExtendedConn
Conn *groupme.PushSubscription
bridge *Bridge
log log.Logger
@@ -57,6 +57,7 @@ type User struct {
IsRelaybot bool
Client *groupme.Client
ConnectionErrors int
CommunityID string
@@ -90,7 +91,7 @@ func (bridge *Bridge) GetUserByMXID(userID id.UserID) *User {
return user
}
func (bridge *Bridge) GetUserByJID(userID types.WhatsAppID) *User {
func (bridge *Bridge) GetUserByJID(userID types.GroupMeID) *User {
bridge.usersLock.Lock()
defer bridge.usersLock.Unlock()
user, ok := bridge.usersByJID[userID]
@@ -222,17 +223,17 @@ func (user *User) SetManagementRoom(roomID id.RoomID) {
}
func (user *User) SetSession(session *whatsapp.Session) {
user.Session = session
if session == nil {
user.LastConnection = 0
}
user.Update()
// user.Session = session
// if session == nil {
// user.LastConnection = 0
// }
// user.Update()
}
func (user *User) Connect(evenIfNoSession bool) bool {
func (user *User) Connect(evenIfNoLogin bool) bool {
if user.Conn != nil {
return true
} else if !evenIfNoSession && user.Session == nil {
} else if !evenIfNoLogin && len(user.Token) < 0 {
return false
}
user.log.Debugln("Connecting to WhatsApp")
@@ -240,43 +241,51 @@ func (user *User) Connect(evenIfNoSession bool) bool {
if timeout == 0 {
timeout = 20
}
conn, err := whatsapp.NewConn(timeout * time.Second)
if err != nil {
user.log.Errorln("Failed to connect to WhatsApp:", err)
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp server. " +
"This indicates a network problem on the bridge server. See bridge logs for more info.")
return false
}
user.Conn = whatsappExt.ExtendConn(conn)
_ = user.Conn.SetClientName(user.bridge.Config.WhatsApp.OSName, user.bridge.Config.WhatsApp.BrowserName, WAVersion)
user.log.Debugln("WhatsApp connection successful")
user.Conn.AddHandler(user)
conn := groupme.NewPushSubscription(context.TODO())
user.Conn = &conn
// if err != nil {
// user.log.Errorln("Failed to connect to WhatsApp:", err)
// user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp server. " +
// "This indicates a network problem on the bridge server. See bridge logs for more info.")
// return false
// }
// user.Conn = whatsappExt.ExtendConn(conn)
// _ = user.Conn.SetClientName(user.bridge.Config.WhatsApp.OSName, user.bridge.Config.WhatsApp.BrowserName, WAVersion)
// user.log.Debugln("WhatsApp connection successful")
// user.Conn.AddHandler(user)
//TODO: typing notification?
return user.RestoreSession()
}
func (user *User) RestoreSession() bool {
if user.Session != nil {
sess, err := user.Conn.RestoreWithSession(*user.Session)
if err == whatsapp.ErrAlreadyLoggedIn {
return true
} else if err != nil {
user.log.Errorln("Failed to restore session:", err)
if errors.Is(err, whatsapp.ErrUnpaired) {
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
"To re-pair your phone, use `delete-session` and then `login`.")
} else {
user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
"on your phone is reachable and use `reconnect` to try connecting again.")
}
user.log.Debugln("Disconnecting due to failed session restore...")
_, err := user.Conn.Disconnect()
if err != nil {
user.log.Errorln("Failed to disconnect after failed session restore:", err)
}
return false
}
if len(user.Token) > 0 {
//sess, err :=
user.Conn.SubscribeToUser(context.TODO(), groupme.ID(user.GetJID()), user.Token)
// if err == whatsapp.ErrAlreadyLoggedIn {
// return true
// } else if err != nil {
// user.log.Errorln("Failed to restore session:", err)
// if errors.Is(err, whatsapp.ErrUnpaired) {
// user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp: unpaired from phone. " +
// "To re-pair your phone, use `delete-session` and then `login`.")
// } else {
// user.sendMarkdownBridgeAlert("\u26a0 Failed to connect to WhatsApp. Make sure WhatsApp " +
// "on your phone is reachable and use `reconnect` to try connecting again.")
// }
// user.log.Debugln("Disconnecting due to failed session restore...")
// _, err :=
user.Conn.Stop(context.TODO())
// if err != nil {
// user.log.Errorln("Failed to disconnect after failed session restore:", err)
// }
// return false
// }
user.Conn.SubscribeToUser(context.TODO(), groupme.ID(user.JID), user.Token)
user.ConnectionErrors = 0
user.SetSession(&sess)
//user.SetSession(&sess)
user.log.Debugln("Session restored successfully")
user.PostLogin()
}
@@ -284,15 +293,30 @@ func (user *User) RestoreSession() bool {
}
func (user *User) HasSession() bool {
return user.Session != nil
return user.Conn != nil && len(user.Token) > 0
}
func (user *User) IsConnected() bool {
return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
// return user.Conn != nil && user.Conn.IsConnected() && user.Conn.IsLoggedIn()
return true
}
func (user *User) IsLoggedIn() bool {
return true
}
func (user *User) IsLoginInProgress() bool {
return user.Conn != nil && user.Conn.IsLoginInProgress()
// return user.Conn != nil && user.Conn.IsLoginInProgress()
return false
}
func (user *User) GetJID() types.GroupMeID {
if len(user.JID) == 0 {
u, _ := user.Client.MyUser(context.TODO())
user.JID = u.ID.String()
}
return user.JID
}
func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventIDChan chan<- id.EventID) {
@@ -348,49 +372,51 @@ func (user *User) loginQrChannel(ce *CommandEvent, qrChan <-chan string, eventID
}
func (user *User) Login(ce *CommandEvent) {
qrChan := make(chan string, 3)
eventIDChan := make(chan id.EventID, 1)
go user.loginQrChannel(ce, qrChan, eventIDChan)
session, err := user.Conn.LoginWithRetry(qrChan, user.bridge.Config.Bridge.LoginQRRegenCount)
qrChan <- "stop"
if err != nil {
var eventID id.EventID
select {
case eventID = <-eventIDChan:
default:
}
reply := event.MessageEventContent{
MsgType: event.MsgText,
}
if err == whatsapp.ErrAlreadyLoggedIn {
reply.Body = "You're already logged in"
} else if err == whatsapp.ErrLoginInProgress {
reply.Body = "You have a login in progress already."
} else if err == whatsapp.ErrLoginTimedOut {
reply.Body = "QR code scan timed out. Please try again."
} else {
user.log.Warnln("Failed to log in:", err)
reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
}
msg := reply
if eventID != "" {
msg.NewContent = &reply
msg.RelatesTo = &event.RelatesTo{
Type: event.RelReplace,
EventID: eventID,
}
}
_, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
return
}
// TODO there's a bit of duplication between this and the provisioning API login method
// Also between the two logout methods (commands.go and provisioning.go)
user.ConnectionErrors = 0
user.JID = strings.Replace(user.Conn.Info.Wid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, 1)
// qrChan := make(chan string, 3)
// eventIDChan := make(chan id.EventID, 1)
// go user.loginQrChannel(ce, qrChan, eventIDChan)
// session, err := user.Conn.LoginWithRetry(qrChan, user.bridge.Config.Bridge.LoginQRRegenCount)
// qrChan <- "stop"
// if err != nil {
// var eventID id.EventID
// select {
// case eventID = <-eventIDChan:
// default:
// }
// reply := event.MessageEventContent{
// MsgType: event.MsgText,
// }
// if err == whatsapp.ErrAlreadyLoggedIn {
// reply.Body = "You're already logged in"
// } else if err == whatsapp.ErrLoginInProgress {
// reply.Body = "You have a login in progress already."
// } else if err == whatsapp.ErrLoginTimedOut {
// reply.Body = "QR code scan timed out. Please try again."
// } else {
// user.log.Warnln("Failed to log in:", err)
// reply.Body = fmt.Sprintf("Unknown error while logging in: %v", err)
// }
// msg := reply
// if eventID != "" {
// msg.NewContent = &reply
// msg.RelatesTo = &event.RelatesTo{
// Type: event.RelReplace,
// EventID: eventID,
// }
// }
// _, _ = ce.Bot.SendMessageEvent(ce.RoomID, event.EventMessage, &msg)
// return
// }
// // TODO there's a bit of duplication between this and the provisioning API login method
// // Also between the two logout methods (commands.go and provisioning.go)
// user.ConnectionErrors = 0
// user.JID = strings.Replace(user.Conn.Info.Wid, whatsappExt.OldUserSuffix, whatsappExt.NewUserSuffix, 1)
user.Token = ce.Args[0]
user.addToJIDMap()
user.SetSession(&session)
//user.SetSession(&session)
ce.Reply("Successfully logged in, synchronizing chats...")
user.PostLogin()
// user.PostLogin()
}
type Chat struct {
@@ -469,26 +495,27 @@ func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface
}
func (user *User) postConnPing() bool {
user.log.Debugln("Making post-connection ping")
err := user.Conn.AdminTest()
if err != nil {
user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
sess, disconnectErr := user.Conn.Disconnect()
if disconnectErr != nil {
user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
} else {
user.Session = &sess
}
user.bridge.Metrics.TrackDisconnection(user.MXID)
go func() {
time.Sleep(1 * time.Second)
user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
}()
return false
} else {
user.log.Debugln("Post-connection ping OK")
return true
}
// user.log.Debugln("Making post-connection ping")
// err := user.Conn.AdminTest()
// if err != nil {
// user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
// sess, disconnectErr := user.Conn.Disconnect()
// if disconnectErr != nil {
// user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr)
// } else {
// user.Session = &sess
// }
// user.bridge.Metrics.TrackDisconnection(user.MXID)
// go func() {
// time.Sleep(1 * time.Second)
// user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
// }()
// return false
// } else {
// user.log.Debugln("Post-connection ping OK")
// return true
// }
return true
}
func (user *User) intPostLogin() {
@@ -539,83 +566,83 @@ func (user *User) HandleStreamEvent(evt whatsappExt.StreamEvent) {
}
func (user *User) HandleChatList(chats []whatsapp.Chat) {
user.log.Infoln("Chat list received")
chatMap := make(map[string]whatsapp.Chat)
for _, chat := range user.Conn.Store.Chats {
chatMap[chat.Jid] = chat
}
for _, chat := range chats {
chatMap[chat.Jid] = chat
}
select {
case user.chatListReceived <- struct{}{}:
default:
}
go user.syncPortals(chatMap, false)
// user.log.Infoln("Chat list received")
// chatMap := make(map[string]whatsapp.Chat)
// for _, chat := range user.Conn.Store.Chats {
// chatMap[chat.Jid] = chat
// }
// for _, chat := range chats {
// chatMap[chat.Jid] = chat
// }
// select {
// case user.chatListReceived <- struct{}{}:
// default:
// }
// go user.syncPortals(chatMap, false)
}
func (user *User) syncPortals(chatMap map[string]whatsapp.Chat, createAll bool) {
if chatMap == nil {
chatMap = user.Conn.Store.Chats
}
user.log.Infoln("Reading chat list")
chats := make(ChatList, 0, len(chatMap))
existingKeys := user.GetInCommunityMap()
portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
for _, chat := range chatMap {
ts, err := strconv.ParseUint(chat.LastMessageTime, 10, 64)
if err != nil {
user.log.Warnfln("Non-integer last message time in %s: %s", chat.Jid, chat.LastMessageTime)
continue
}
portal := user.GetPortalByJID(chat.Jid)
// if chatMap == nil {
// chatMap = user.Conn.Store.Chats
// }
// user.log.Infoln("Reading chat list")
// chats := make(ChatList, 0, len(chatMap))
// existingKeys := user.GetInCommunityMap()
// portalKeys := make([]database.PortalKeyWithMeta, 0, len(chatMap))
// for _, chat := range chatMap {
// ts, err := strconv.ParseUint(chat.LastMessageTime, 10, 64)
// if err != nil {
// user.log.Warnfln("Non-integer last message time in %s: %s", chat.Jid, chat.LastMessageTime)
// continue
// }
// portal := user.GetPortalByJID(chat.Jid)
chats = append(chats, Chat{
Portal: portal,
Contact: user.Conn.Store.Contacts[chat.Jid],
LastMessageTime: ts,
})
var inCommunity, ok bool
if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
inCommunity = user.addPortalToCommunity(portal)
if portal.IsPrivateChat() {
puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
user.addPuppetToCommunity(puppet)
}
}
portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
}
user.log.Infoln("Read chat list, updating user-portal mapping")
err := user.SetPortalKeys(portalKeys)
if err != nil {
user.log.Warnln("Failed to update user-portal mapping:", err)
}
sort.Sort(chats)
limit := user.bridge.Config.Bridge.InitialChatSync
if limit < 0 {
limit = len(chats)
}
now := uint64(time.Now().Unix())
user.log.Infoln("Syncing portals")
for i, chat := range chats {
if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
break
}
create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
if len(chat.Portal.MXID) > 0 || create || createAll {
chat.Portal.Sync(user, chat.Contact)
err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
if err != nil {
chat.Portal.log.Errorln("Error backfilling history:", err)
}
}
}
user.UpdateDirectChats(nil)
user.log.Infoln("Finished syncing portals")
select {
case user.syncPortalsDone <- struct{}{}:
default:
}
// chats = append(chats, Chat{
// Portal: portal,
// Contact: user.Conn.Store.Contacts[chat.Jid],
// LastMessageTime: ts,
// })
// var inCommunity, ok bool
// if inCommunity, ok = existingKeys[portal.Key]; !ok || !inCommunity {
// inCommunity = user.addPortalToCommunity(portal)
// if portal.IsPrivateChat() {
// puppet := user.bridge.GetPuppetByJID(portal.Key.JID)
// user.addPuppetToCommunity(puppet)
// }
// }
// portalKeys = append(portalKeys, database.PortalKeyWithMeta{PortalKey: portal.Key, InCommunity: inCommunity})
// }
// user.log.Infoln("Read chat list, updating user-portal mapping")
// err := user.SetPortalKeys(portalKeys)
// if err != nil {
// user.log.Warnln("Failed to update user-portal mapping:", err)
// }
// sort.Sort(chats)
// limit := user.bridge.Config.Bridge.InitialChatSync
// if limit < 0 {
// limit = len(chats)
// }
// now := uint64(time.Now().Unix())
// user.log.Infoln("Syncing portals")
// for i, chat := range chats {
// if chat.LastMessageTime+user.bridge.Config.Bridge.SyncChatMaxAge < now {
// break
// }
// create := (chat.LastMessageTime >= user.LastConnection && user.LastConnection > 0) || i < limit
// if len(chat.Portal.MXID) > 0 || create || createAll {
// chat.Portal.Sync(user, chat.Contact)
// err := chat.Portal.BackfillHistory(user, chat.LastMessageTime)
// if err != nil {
// chat.Portal.log.Errorln("Error backfilling history:", err)
// }
// }
// }
// user.UpdateDirectChats(nil)
// user.log.Infoln("Finished syncing portals")
// select {
// case user.syncPortalsDone <- struct{}{}:
// default:
// }
}
func (user *User) getDirectChats() map[id.UserID][]id.RoomID {
@@ -682,17 +709,17 @@ func (user *User) HandleContactList(contacts []whatsapp.Contact) {
}
func (user *User) syncPuppets(contacts map[string]whatsapp.Contact) {
if contacts == nil {
contacts = user.Conn.Store.Contacts
}
user.log.Infoln("Syncing puppet info from contacts")
for jid, contact := range contacts {
if strings.HasSuffix(jid, whatsappExt.NewUserSuffix) {
puppet := user.bridge.GetPuppetByJID(contact.Jid)
puppet.Sync(user, contact)
}
}
user.log.Infoln("Finished syncing puppet info from contacts")
// if contacts == nil {
// contacts = user.Conn.Store.Contacts
// }
// user.log.Infoln("Syncing puppet info from contacts")
// for jid, contact := range contacts {
// if strings.HasSuffix(jid, whatsappExt.NewUserSuffix) {
// puppet := user.bridge.GetPuppetByJID(contact.Jid)
// puppet.Sync(user, contact)
// }
// }
// user.log.Infoln("Finished syncing puppet info from contacts")
}
func (user *User) updateLastConnectionIfNecessary() {
@@ -723,62 +750,62 @@ func (user *User) HandleError(err error) {
}
func (user *User) tryReconnect(msg string) {
user.bridge.Metrics.TrackConnectionState(user.JID, false)
if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
return
}
if user.bridge.Config.Bridge.ReportConnectionRetry {
user.sendBridgeNotice("%s. Reconnecting...", msg)
// Don't want the same error to be repeated
msg = ""
}
var tries uint
var exponentialBackoff bool
baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
if baseDelay < 0 {
exponentialBackoff = true
baseDelay = -baseDelay + 1
}
delay := baseDelay
for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
err := user.Conn.Restore()
if err == nil {
user.ConnectionErrors = 0
if user.bridge.Config.Bridge.ReportConnectionRetry {
user.sendBridgeNotice("Reconnected successfully")
}
user.PostLogin()
return
} else if err.Error() == "init responded with 400" {
user.log.Infoln("Got init 400 error when trying to reconnect, resetting connection...")
sess, err := user.Conn.Disconnect()
if err != nil {
user.log.Debugln("Error while disconnecting for connection reset:", err)
}
if len(sess.Wid) > 0 {
user.SetSession(&sess)
}
}
user.log.Errorln("Error while trying to reconnect after disconnection:", err)
tries++
user.ConnectionErrors++
if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
if exponentialBackoff {
delay = (1 << tries) + baseDelay
}
if user.bridge.Config.Bridge.ReportConnectionRetry {
user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
}
time.Sleep(delay * time.Second)
}
}
// user.bridge.Metrics.TrackConnectionState(user.JID, false)
// if user.ConnectionErrors > user.bridge.Config.Bridge.MaxConnectionAttempts {
// user.sendMarkdownBridgeAlert("%s. Use the `reconnect` command to reconnect.", msg)
// return
// }
// if user.bridge.Config.Bridge.ReportConnectionRetry {
// user.sendBridgeNotice("%s. Reconnecting...", msg)
// // Don't want the same error to be repeated
// msg = ""
// }
// var tries uint
// var exponentialBackoff bool
// baseDelay := time.Duration(user.bridge.Config.Bridge.ConnectionRetryDelay)
// if baseDelay < 0 {
// exponentialBackoff = true
// baseDelay = -baseDelay + 1
// }
// delay := baseDelay
// for user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
// err := user.Conn.Restore()
// if err == nil {
// user.ConnectionErrors = 0
// if user.bridge.Config.Bridge.ReportConnectionRetry {
// user.sendBridgeNotice("Reconnected successfully")
// }
// user.PostLogin()
// return
// } else if err.Error() == "init responded with 400" {
// user.log.Infoln("Got init 400 error when trying to reconnect, resetting connection...")
// sess, err := user.Conn.Disconnect()
// if err != nil {
// user.log.Debugln("Error while disconnecting for connection reset:", err)
// }
// if len(sess.Wid) > 0 {
// user.SetSession(&sess)
// }
// }
// user.log.Errorln("Error while trying to reconnect after disconnection:", err)
// tries++
// user.ConnectionErrors++
// if user.ConnectionErrors <= user.bridge.Config.Bridge.MaxConnectionAttempts {
// if exponentialBackoff {
// delay = (1 << tries) + baseDelay
// }
// if user.bridge.Config.Bridge.ReportConnectionRetry {
// user.sendBridgeNotice("Reconnection attempt failed: %v. Retrying in %d seconds...", err, delay)
// }
// time.Sleep(delay * time.Second)
// }
// }
if user.bridge.Config.Bridge.ReportConnectionRetry {
user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
} else {
user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
}
// if user.bridge.Config.Bridge.ReportConnectionRetry {
// user.sendMarkdownBridgeAlert("%d reconnection attempts failed. Use the `reconnect` command to try to reconnect manually.", tries)
// } else {
// user.sendMarkdownBridgeAlert("\u26a0 %s. Additionally, %d reconnection attempts failed. Use the `reconnect` command to try to reconnect.", msg, tries)
// }
}
func (user *User) ShouldCallSynchronously() bool {
@@ -789,11 +816,11 @@ func (user *User) HandleJSONParseError(err error) {
user.log.Errorln("WhatsApp JSON parse error:", err)
}
func (user *User) PortalKey(jid types.WhatsAppID) database.PortalKey {
func (user *User) PortalKey(jid types.GroupMeID) database.PortalKey {
return database.NewPortalKey(jid, user.JID)
}
func (user *User) GetPortalByJID(jid types.WhatsAppID) *Portal {
func (user *User) GetPortalByJID(jid types.GroupMeID) *Portal {
return user.bridge.GetPortalByJID(user.PortalKey(jid))
}
@@ -805,7 +832,7 @@ func (user *User) runMessageRingBuffer() {
default:
dropped := <-user.messageOutput
user.log.Warnln("Buffer is full, dropping message in", dropped.chat)
user.messageOutput<-msg
user.messageOutput <- msg
}
}
}