Revert portal backfill lock changes and update mautrix-go

This commit is contained in:
Tulir Asokan
2020-11-18 13:29:47 +02:00
parent 83779c6970
commit e6ccdb83b7
3 changed files with 23 additions and 57 deletions

View File

@@ -133,8 +133,7 @@ func (bridge *Bridge) NewManualPortal(key database.PortalKey) *Portal {
recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
backfillStart: make(chan struct{}, 1),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
}
portal.Key = key
go portal.handleMessageLoop()
@@ -149,8 +148,7 @@ func (bridge *Bridge) NewPortal(dbPortal *database.Portal) *Portal {
recentlyHandled: [recentlyHandledLength]types.WhatsAppMessageID{},
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
backfillStart: make(chan struct{}, 1),
messages: make(chan PortalMessage, bridge.Config.Bridge.PortalMessageBuffer),
}
go portal.handleMessageLoop()
return portal
@@ -177,8 +175,7 @@ type Portal struct {
recentlyHandledLock sync.Mutex
recentlyHandledIndex uint8
backfillStart chan struct{}
backfillWait sync.WaitGroup
backfillLock sync.Mutex
backfilling bool
lastMessageTs uint64
@@ -193,34 +190,25 @@ type Portal struct {
const MaxMessageAgeToCreatePortal = 5 * 60 // 5 minutes
func (portal *Portal) handleMessageLoop() {
for {
select {
case msg := <-portal.messages:
portal.handleNewMessage(msg)
case <-portal.backfillStart:
portal.log.Debugln("Processing of incoming messages is locked for backfilling")
portal.backfillWait.Wait()
portal.log.Debugln("Processing of incoming messages unlocked after backfilling")
for msg := range portal.messages {
if len(portal.MXID) == 0 {
if msg.timestamp+MaxMessageAgeToCreatePortal < uint64(time.Now().Unix()) {
portal.log.Debugln("Not creating portal room for incoming message as the message is too old.")
continue
}
portal.log.Debugln("Creating Matrix room from incoming message")
err := portal.CreateMatrixRoom(msg.source)
if err != nil {
portal.log.Errorln("Failed to create portal room:", err)
return
}
}
portal.backfillLock.Lock()
portal.handleMessage(msg)
portal.backfillLock.Unlock()
}
}
func (portal *Portal) handleNewMessage(msg PortalMessage) {
if len(portal.MXID) == 0 {
if msg.timestamp+MaxMessageAgeToCreatePortal < uint64(time.Now().Unix()) {
portal.log.Debugln("Not creating portal room for incoming message as the message is too old.")
return
}
portal.log.Debugln("Creating Matrix room from incoming message")
err := portal.CreateMatrixRoom(msg.source)
if err != nil {
portal.log.Errorln("Failed to create portal room:", err)
return
}
}
portal.handleMessage(msg)
}
func (portal *Portal) handleMessage(msg PortalMessage) {
if len(portal.MXID) == 0 {
portal.log.Warnln("handleMessage called even though portal.MXID is empty")
@@ -746,11 +734,7 @@ func (portal *Portal) BackfillHistory(user *User, lastMessageTime uint64) error
}
func (portal *Portal) beginBackfill() func() {
portal.backfillWait.Add(1)
select {
case portal.backfillStart <- struct{}{}:
default:
}
portal.backfillLock.Lock()
portal.backfilling = true
var privateChatPuppetInvited bool
var privateChatPuppet *Puppet
@@ -768,7 +752,7 @@ func (portal *Portal) beginBackfill() func() {
return func() {
portal.backfilling = false
portal.privateChatBackfillInvitePuppet = nil
portal.backfillWait.Done()
portal.backfillLock.Unlock()
if privateChatPuppet != nil && privateChatPuppetInvited {
_, _ = privateChatPuppet.DefaultIntent().LeaveRoom(portal.MXID)
}