diff --git a/config/bridge.go b/config/bridge.go index a56545e..4621f02 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -44,6 +44,8 @@ type BridgeConfig struct { ReportConnectionRetry bool `yaml:"report_connection_retry"` ChatListWait int `yaml:"chat_list_wait"` PortalSyncWait int `yaml:"portal_sync_wait"` + UserMessageBuffer int `yaml:"user_message_buffer"` + PortalMessageBuffer int `yaml:"portal_message_buffer"` CallNotices struct { Start bool `yaml:"start"` @@ -96,6 +98,8 @@ func (bc *BridgeConfig) setDefaults() { bc.ReportConnectionRetry = true bc.ChatListWait = 30 bc.PortalSyncWait = 600 + bc.UserMessageBuffer = 1024 + bc.PortalMessageBuffer = 128 bc.CallNotices.Start = true bc.CallNotices.End = true diff --git a/example-config.yaml b/example-config.yaml index 7e5559e..039eff1 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -109,6 +109,8 @@ bridge: # Maximum number of seconds to wait to sync portals before force unlocking message processing. # If this is too low and you have lots of chats, it could cause backfilling to fail. portal_sync_wait: 600 + user_message_buffer: 1024 + portal_message_buffer: 128 # Whether or not to send call start/end notices to Matrix. call_notices: diff --git a/portal.go b/portal.go index 07cb12c..53a495c 100644 --- a/portal.go +++ b/portal.go @@ -133,7 +133,7 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal { recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{}, - messages: make(chan PortalMessage, 128), + messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), } portal.Key = key go portal.handleMessageLoop() @@ -148,7 +148,7 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal { recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{}, - messages: make(chan PortalMessage, 128), + messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer), } go portal.handleMessageLoop() return portal diff --git a/user.go b/user.go index 28f80b6..90d6a1b 100644 --- a/user.go +++ b/user.go @@ -65,7 +65,9 @@ type User struct { syncPortalsDone chan struct{} messages chan PortalMessage - syncLock sync.Mutex + + syncStart chan struct{} + syncWait sync.WaitGroup mgmtCreateLock sync.Mutex } @@ -167,7 +169,8 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User { chatListReceived: make(chan struct{}, 1), syncPortalsDone: make(chan struct{}, 1), - messages: make(chan PortalMessage, 256), + syncStart: make(chan struct{}, 1), + messages: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer), } user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID) user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID) @@ -399,7 +402,8 @@ func (cl ChatList) Swap(i, j int) { func (user *User) PostLogin() { user.log.Debugln("Locking processing of incoming messages and starting post-login sync") - user.syncLock.Lock() + user.syncWait.Add(1) + user.syncStart <- struct{}{} go user.intPostLogin() } @@ -431,7 +435,7 @@ func (user *User) tryAutomaticDoublePuppeting() { } func (user *User) intPostLogin() { - defer user.syncLock.Unlock() + defer user.syncWait.Done() user.createCommunity() user.tryAutomaticDoublePuppeting() @@ -439,14 +443,14 @@ func (user *User) intPostLogin() { case <-user.chatListReceived: user.log.Debugln("Chat list receive confirmation received in PostLogin") case <-time.After(time.Duration(user.bridge.Config.Bridge.ChatListWait) * time.Second): - user.log.Warnln("Timed out waiting for chat list to arrive! Unlocking processing of incoming messages.") + user.log.Warnln("Timed out waiting for chat list to arrive!") return } select { case <-user.syncPortalsDone: user.log.Debugln("Post-login portal sync complete, unlocking processing of incoming messages.") case <-time.After(time.Duration(user.bridge.Config.Bridge.PortalSyncWait) * time.Second): - user.log.Warnln("Timed out waiting for chat list to arrive! Unlocking processing of incoming messages.") + user.log.Warnln("Timed out waiting for portal sync to complete! Unlocking processing of incoming messages.") } } @@ -658,10 +662,13 @@ func (user *User) GetPortalByJID(jid types.WhatsAppID) *Portal { } func (user *User) handleMessageLoop() { - for msg := range user.messages { - user.syncLock.Lock() - user.GetPortalByJID(msg.chat).messages <- msg - user.syncLock.Unlock() + for { + select { + case msg := <-user.messages: + user.GetPortalByJID(msg.chat).messages <- msg + case <-user.syncStart: + user.syncWait.Wait() + } } }