Use ring buffer for message handling

This can sometimes avoid dropping the wrong messages if syncing chats
takes too long.
This commit is contained in:
Tulir Asokan 2020-11-06 15:52:16 +02:00
parent 221326bcbf
commit 4eacece8ba

47
user.go
View File

@ -67,7 +67,8 @@ type User struct {
chatListReceived chan struct{} chatListReceived chan struct{}
syncPortalsDone chan struct{} syncPortalsDone chan struct{}
messages chan PortalMessage messageInput chan PortalMessage
messageOutput chan PortalMessage
syncStart chan struct{} syncStart chan struct{}
syncWait sync.WaitGroup syncWait sync.WaitGroup
@ -177,12 +178,14 @@ func (bridge *Bridge) NewUser(dbUser *database.User) *User {
chatListReceived: make(chan struct{}, 1), chatListReceived: make(chan struct{}, 1),
syncPortalsDone: make(chan struct{}, 1), syncPortalsDone: make(chan struct{}, 1),
syncStart: make(chan struct{}, 1), syncStart: make(chan struct{}, 1),
messages: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer), messageInput: make(chan PortalMessage),
messageOutput: make(chan PortalMessage, bridge.Config.Bridge.UserMessageBuffer),
} }
user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID) user.RelaybotWhitelisted = user.bridge.Config.Bridge.Permissions.IsRelaybotWhitelisted(user.MXID)
user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID) user.Whitelisted = user.bridge.Config.Bridge.Permissions.IsWhitelisted(user.MXID)
user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID) user.Admin = user.bridge.Config.Bridge.Permissions.IsAdmin(user.MXID)
go user.handleMessageLoop() go user.handleMessageLoop()
go user.runMessageRingBuffer()
return user return user
} }
@ -791,10 +794,22 @@ func (user *User) GetPortalByJID(jid types.WhatsAppID) *Portal {
return user.bridge.GetPortalByJID(user.PortalKey(jid)) return user.bridge.GetPortalByJID(user.PortalKey(jid))
} }
func (user *User) runMessageRingBuffer() {
for msg := range user.messageInput {
select {
case user.messageOutput <- msg:
default:
user.log.Warnln("Buffer is full, dropping message in", msg.chat)
<-user.messageOutput
user.messageOutput<-msg
}
}
}
func (user *User) handleMessageLoop() { func (user *User) handleMessageLoop() {
for { for {
select { select {
case msg := <-user.messages: case msg := <-user.messageOutput:
user.GetPortalByJID(msg.chat).messages <- msg user.GetPortalByJID(msg.chat).messages <- msg
case <-user.syncStart: case <-user.syncStart:
user.log.Debugln("Processing of incoming messages is locked") user.log.Debugln("Processing of incoming messages is locked")
@ -806,14 +821,6 @@ func (user *User) handleMessageLoop() {
} }
} }
func (user *User) putMessage(message PortalMessage) {
select {
case user.messages <- message:
default:
user.log.Warnln("Buffer is full, dropping message in", message.chat)
}
}
func (user *User) HandleNewContact(contact whatsapp.Contact) { func (user *User) HandleNewContact(contact whatsapp.Contact) {
user.log.Debugfln("Contact message: %+v", contact) user.log.Debugfln("Contact message: %+v", contact)
go func() { go func() {
@ -843,39 +850,39 @@ func (user *User) HandleBatteryMessage(battery whatsapp.BatteryMessage) {
} }
func (user *User) HandleTextMessage(message whatsapp.TextMessage) { func (user *User) HandleTextMessage(message whatsapp.TextMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleImageMessage(message whatsapp.ImageMessage) { func (user *User) HandleImageMessage(message whatsapp.ImageMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleStickerMessage(message whatsapp.StickerMessage) { func (user *User) HandleStickerMessage(message whatsapp.StickerMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleVideoMessage(message whatsapp.VideoMessage) { func (user *User) HandleVideoMessage(message whatsapp.VideoMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleAudioMessage(message whatsapp.AudioMessage) { func (user *User) HandleAudioMessage(message whatsapp.AudioMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleDocumentMessage(message whatsapp.DocumentMessage) { func (user *User) HandleDocumentMessage(message whatsapp.DocumentMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleContactMessage(message whatsapp.ContactMessage) { func (user *User) HandleContactMessage(message whatsapp.ContactMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleLocationMessage(message whatsapp.LocationMessage) { func (user *User) HandleLocationMessage(message whatsapp.LocationMessage) {
user.putMessage(PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}) user.messageInput <- PortalMessage{message.Info.RemoteJid, user, message, message.Info.Timestamp}
} }
func (user *User) HandleMessageRevoke(message whatsappExt.MessageRevocation) { func (user *User) HandleMessageRevoke(message whatsappExt.MessageRevocation) {
user.putMessage(PortalMessage{message.RemoteJid, user, message, 0}) user.messageInput <- PortalMessage{message.RemoteJid, user, message, 0}
} }
type FakeMessage struct { type FakeMessage struct {