This commit is contained in:
Brandon Watson 2023-09-05 20:10:24 -05:00
parent 9f65ad172c
commit 3db8e1da05
3 changed files with 44 additions and 30 deletions

View File

@ -53,7 +53,7 @@ func NewFayeClient(logger log.Logger) *FayeClient {
fc := &FayeClient{wray.NewFayeClient(groupme.PushServer)} fc := &FayeClient{wray.NewFayeClient(groupme.PushServer)}
fc.SetLogger(fayeLogger{logger.Sub("FayeClient")}) fc.SetLogger(fayeLogger{logger.Sub("FayeClient")})
fc.AddExtension(&AuthExt{}) fc.AddExtension(&AuthExt{})
//fc.AddExtension(fc.FayeClient) fc.AddExtension(fc.FayeClient)
return fc return fc
} }

View File

@ -1531,7 +1531,9 @@ func (portal *Portal) HandleMatrixLeave(sender *User) {
} }
func (portal *Portal) HandleMatrixKick(sender *User, evt *event.Event) { func (portal *Portal) HandleMatrixKick(sender *User, evt *event.Event) {
portal.log.Debug("HandleMatrixKick")
} }
func (portal *Portal) HandleMatrixInvite(sender *User, evt *event.Event) { func (portal *Portal) HandleMatrixInvite(sender *User, evt *event.Event) {
portal.log.Debug("HandleMatrixInvite")
} }

40
user.go
View File

@ -76,6 +76,9 @@ type User struct {
spaceCreateLock sync.Mutex spaceCreateLock sync.Mutex
spaceMembershipChecked bool spaceMembershipChecked bool
syncStart chan struct{}
syncWait sync.WaitGroup
} }
func (br *GMBridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User { func (br *GMBridge) getUserByMXID(userID id.UserID, onlyIfExists bool) *User {
@ -197,6 +200,7 @@ func (br *GMBridge) NewUser(dbUser *database.User) *User {
chatListReceived: make(chan struct{}, 1), chatListReceived: make(chan struct{}, 1),
syncPortalsDone: make(chan struct{}, 1), syncPortalsDone: make(chan struct{}, 1),
syncStart: make(chan struct{}, 1),
messageInput: make(chan PortalMessage), messageInput: make(chan PortalMessage),
messageOutput: make(chan PortalMessage, br.Config.Bridge.PortalMessageBuffer), messageOutput: make(chan PortalMessage, br.Config.Bridge.PortalMessageBuffer),
} }
@ -409,7 +413,6 @@ func (user *User) Login(token string) error {
if user.Connect() { if user.Connect() {
return nil return nil
} }
user.intPostLogin()
return errors.New("failed to connect") return errors.New("failed to connect")
} }
@ -438,7 +441,10 @@ func (user *User) PostLogin() {
user.bridge.Metrics.TrackConnectionState(user.GMID, true) user.bridge.Metrics.TrackConnectionState(user.GMID, true)
user.bridge.Metrics.TrackLoginState(user.GMID, true) user.bridge.Metrics.TrackLoginState(user.GMID, true)
user.bridge.Metrics.TrackBufferLength(user.MXID, 0) user.bridge.Metrics.TrackBufferLength(user.MXID, 0)
// go user.intPostLogin() user.log.Debugln("Locking processing of incoming messages and starting post-login sync")
user.syncWait.Add(1)
user.syncStart <- struct{}{}
go user.intPostLogin()
} }
func (user *User) tryAutomaticDoublePuppeting() { func (user *User) tryAutomaticDoublePuppeting() {
@ -483,9 +489,9 @@ func (user *User) sendMarkdownBridgeAlert(formatString string, args ...interface
} }
func (user *User) postConnPing() bool { func (user *User) postConnPing() bool {
// user.log.Debugln("Making post-connection ping") //user.log.Debugln("Making post-connection ping")
// err := user.Conn.AdminTest() //err := user.Conn.AdminTest()
// if err != nil { //if err != nil {
// user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err) // user.log.Errorfln("Post-connection ping failed: %v. Disconnecting and then reconnecting after a second", err)
// sess, disconnectErr := user.Conn.Disconnect() // sess, disconnectErr := user.Conn.Disconnect()
// if disconnectErr != nil { // if disconnectErr != nil {
@ -499,15 +505,15 @@ func (user *User) postConnPing() bool {
// user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err)) // user.tryReconnect(fmt.Sprintf("Post-connection ping failed: %v", err))
// }() // }()
// return false // return false
// } else { //} else {
// user.log.Debugln("Post-connection ping OK") // user.log.Debugln("Post-connection ping OK")
// return true // return true
// } //}
return true return true
} }
func (user *User) intPostLogin() { func (user *User) intPostLogin() {
//defer user.syncWait.Done() defer user.syncWait.Done()
user.lastReconnection = time.Now().Unix() user.lastReconnection = time.Now().Unix()
user.Client = groupmeext.NewClient(user.Token) user.Client = groupmeext.NewClient(user.Token)
if len(user.GMID) == 0 { if len(user.GMID) == 0 {
@ -578,7 +584,7 @@ func (user *User) HandleChatList() {
for _, u := range users { for _, u := range users {
puppet := user.bridge.GetPuppetByGMID(u.ID) puppet := user.bridge.GetPuppetByGMID(u.ID)
// "" for overall user not related to one group // "" for overall user not related to one group
puppet.Sync(nil, &groupme.Member{ puppet.Sync(user, &groupme.Member{
UserID: u.ID, UserID: u.ID,
Nickname: u.Name, Nickname: u.Name,
ImageURL: u.AvatarURL, ImageURL: u.AvatarURL,
@ -775,6 +781,12 @@ func (user *User) handleMessageLoop() {
}, false, false) }, false, false)
} }
portal.messages <- msg portal.messages <- msg
case <-user.syncStart:
user.log.Debugln("Processing of incoming messages is locked")
user.bridge.Metrics.TrackSyncLock(user.GMID, true)
user.syncWait.Wait()
user.bridge.Metrics.TrackSyncLock(user.GMID, false)
user.log.Debugln("Processing of incoming messages unlocked")
} }
} }
} }
@ -803,11 +815,11 @@ func (user *User) HandleJoin(id groupme.ID) {
} }
func (user *User) HandleGroupName(group groupme.ID, newName string) { func (user *User) HandleGroupName(group groupme.ID, newName string) {
//p := user.GetPortalByJID(group.String()) p := user.GetPortalByGMID(group)
//if p != nil { if p != nil {
// p.UpdateName(newName, "", false) p.UpdateName(newName, "", false)
// get more info abt actual user TODO //get more info abt actual user TODO
//} }
//bugs atm with above? //bugs atm with above?
user.HandleChatList() user.HandleChatList()