From 3db8e1da0533c341174208a81cd3b71c7db5b086 Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Tue, 5 Sep 2023 20:10:24 -0500 Subject: [PATCH] WIP --- groupmeext/subscription.go | 2 +- portal.go | 2 ++ user.go | 70 ++++++++++++++++++++++---------------- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/groupmeext/subscription.go b/groupmeext/subscription.go index 4bcf5df..2b4ad61 100644 --- a/groupmeext/subscription.go +++ b/groupmeext/subscription.go @@ -53,7 +53,7 @@ func NewFayeClient(logger log.Logger) *FayeClient { fc := &FayeClient{wray.NewFayeClient(groupme.PushServer)} fc.SetLogger(fayeLogger{logger.Sub("FayeClient")}) fc.AddExtension(&AuthExt{}) - //fc.AddExtension(fc.FayeClient) + fc.AddExtension(fc.FayeClient) return fc } diff --git a/portal.go b/portal.go index d4fd60d..250cf0c 100644 --- a/portal.go +++ b/portal.go @@ -1531,7 +1531,9 @@ func (portal *Portal) HandleMatrixLeave(sender *User) { } func (portal *Portal) HandleMatrixKick(sender *User, evt *event.Event) { + portal.log.Debug("HandleMatrixKick") } func (portal *Portal) HandleMatrixInvite(sender *User, evt *event.Event) { + portal.log.Debug("HandleMatrixInvite") } diff --git a/user.go b/user.go index 444cffc..22ad9da 100644 --- a/user.go +++ b/user.go @@ -76,6 +76,9 @@ type User struct { spaceCreateLock sync.Mutex spaceMembershipChecked bool + + syncStart chan struct{} + syncWait sync.WaitGroup } func (br *GMBridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { @@ -197,6 +200,7 @@ func (br *GMBridge) NewUser(dbUser *database.User) *User { chatListReceived: make(chan struct{}, 1), syncPortalsDone: make(chan struct{}, 1), + syncStart: make(chan struct{}, 1), messageInput: make(chan PortalMessage), messageOutput: make(chan PortalMessage, br.Config.Bridge.PortalMessageBuffer), } @@ -409,7 +413,6 @@ func (user *User) Login(token string) error { if user.Connect() { return nil } - user.intPostLogin() return errors.New("failed to connect") } @@ -438,7 +441,10 @@ func (user *User) PostLogin() { user.bridge.Metrics.TrackConnectionState(user.GMID, true) user.bridge.Metrics.TrackLoginState(user.GMID, true) user.bridge.Metrics.TrackBufferLength(user.MXID, 0) - // go user.intPostLogin() + user.log.Debugln("Locking processing of incoming messages and starting post-login sync") + user.syncWait.Add(1) + user.syncStart <- struct{}{} + go user.intPostLogin() } func (user *User) tryAutomaticDoublePuppeting() { @@ -483,31 +489,31 @@ func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface } func (user *User) postConnPing() bool { - // user.log.Debugln("Making post-connection ping") - // err := user.Conn.AdminTest() - // if err != nil { - // user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err) - // sess, disconnectErr := user.Conn.Disconnect() - // if disconnectErr != nil { - // user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr) - // } else { - // user.Session = &sess - // } - // user.bridge.Metrics.TrackDisconnection(user.MXID) - // go func() { - // time.Sleep(1 * time.Second) - // user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err)) - // }() - // return false - // } else { - // user.log.Debugln("Post-connection ping OK") - // return true - // } + //user.log.Debugln("Making post-connection ping") + //err := user.Conn.AdminTest() + //if err != nil { + // user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err) + // sess, disconnectErr := user.Conn.Disconnect() + // if disconnectErr != nil { + // user.log.Warnln("Error while disconnecting after failed post-connection ping:", disconnectErr) + // } else { + // user.Session = &sess + // } + // user.bridge.Metrics.TrackDisconnection(user.MXID) + // go func() { + // time.Sleep(1 * time.Second) + // user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err)) + // }() + // return false + //} else { + // user.log.Debugln("Post-connection ping OK") + // return true + //} return true } func (user *User) intPostLogin() { - //defer user.syncWait.Done() + defer user.syncWait.Done() user.lastReconnection = time.Now().Unix() user.Client = groupmeext.NewClient(user.Token) if len(user.GMID) == 0 { @@ -578,7 +584,7 @@ func (user *User) HandleChatList() { for _, u := range users { puppet := user.bridge.GetPuppetByGMID(u.ID) // "" for overall user not related to one group - puppet.Sync(nil, &groupme.Member{ + puppet.Sync(user, &groupme.Member{ UserID: u.ID, Nickname: u.Name, ImageURL: u.AvatarURL, @@ -775,6 +781,12 @@ func (user *User) handleMessageLoop() { }, false, false) } portal.messages <- msg + case <-user.syncStart: + user.log.Debugln("Processing of incoming messages is locked") + user.bridge.Metrics.TrackSyncLock(user.GMID, true) + user.syncWait.Wait() + user.bridge.Metrics.TrackSyncLock(user.GMID, false) + user.log.Debugln("Processing of incoming messages unlocked") } } } @@ -803,11 +815,11 @@ func (user *User) HandleJoin(id groupme.ID) { } func (user *User) HandleGroupName(group groupme.ID, newName string) { - //p := user.GetPortalByJID(group.String()) - //if p != nil { - // p.UpdateName(newName, "", false) - // get more info abt actual user TODO - //} + p := user.GetPortalByGMID(group) + if p != nil { + p.UpdateName(newName, "", false) + //get more info abt actual user TODO + } //bugs atm with above? user.HandleChatList()