diff --git a/user.go b/user.go index e726bbb..d8607cf 100644 --- a/user.go +++ b/user.go @@ -67,7 +67,8 @@ type User struct { chatListReceived chan struct{} syncPortalsDone chan struct{} - messages chan PortalMessage + messageInput chan PortalMessage + messageOutput chan PortalMessage syncStart chan struct{} syncWait sync.WaitGroup @@ -177,12 +178,14 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User { chatListReceived: make(chan struct{}, 1), syncPortalsDone: make(chan struct{}, 1), syncStart: make(chan struct{}, 1), - messages: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer), + messageInput: make(chan PortalMessage), + messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer), } user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID) user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID) user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID) go user.handleMessageLoop() + go user.runMessageRingBuffer() return user } @@ -791,10 +794,22 @@ func (user *User) GetPortalByJID(jid types.WhatsAppID) *Portal { return user.bridge.GetPortalByJID(user.PortalKey(jid)) } +func (user *User) runMessageRingBuffer() { + for msg := range user.messageInput { + select { + case user.messageOutput <- msg: + default: + user.log.Warnln("Buffer is full, dropping message in", msg.chat) + <-user.messageOutput + user.messageOutput<-msg + } + } +} + func (user *User) handleMessageLoop() { for { select { - case msg := <-user.messages: + case msg := <-user.messageOutput: user.GetPortalByJID(msg.chat).messages <- msg case <-user.syncStart: user.log.Debugln("Processing of incoming messages is locked") @@ -806,14 +821,6 @@ func (user *User) handleMessageLoop() { } } -func (user *User) putMessage(message PortalMessage) { - select { - case user.messages <- message: - default: - user.log.Warnln("Buffer is full, dropping message in", message.chat) - } -} - func (user *User) HandleNewContact(contact whatsapp.Contact) { user.log.Debugfln("Contact message: %+v", contact) go func() { @@ -843,39 +850,39 @@ func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) { } func (user *User) HandleTextMessage(message whatsapp.TextMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleImageMessage(message whatsapp.ImageMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleStickerMessage(message whatsapp.StickerMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleVideoMessage(message whatsapp.VideoMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleAudioMessage(message whatsapp.AudioMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleDocumentMessage(message whatsapp.DocumentMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleContactMessage(message whatsapp.ContactMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleLocationMessage(message whatsapp.LocationMessage) { - user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) + user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp} } func (user *User) HandleMessageRevoke(message whatsappExt.MessageRevocation) { - user.putMessage(PortalMessage{message.RemoteJid, user, message, 0}) + user.messageInput <- PortalMessage{message.RemoteJid, user, message, 0} } type FakeMessage struct {