Add debug logs and workaround for forward backfill getting stuck
This commit is contained in:
@@ -711,6 +711,22 @@ class AbstractUser(ABC):
|
|||||||
self.log.debug("Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log)
|
self.log.debug("Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
task = self._call_portal_message_handler(update, original_update, portal, sender)
|
||||||
|
if portal.backfill_lock.locked:
|
||||||
|
self.log.debug(
|
||||||
|
f"{portal.tgid_log} is backfill locked, moving incoming message to async task"
|
||||||
|
)
|
||||||
|
background_task.create(task)
|
||||||
|
else:
|
||||||
|
await task
|
||||||
|
|
||||||
|
async def _call_portal_message_handler(
|
||||||
|
self,
|
||||||
|
update: UpdateMessageContent,
|
||||||
|
original_update: UpdateMessage,
|
||||||
|
portal: po.Portal,
|
||||||
|
sender: pu.Puppet,
|
||||||
|
) -> None:
|
||||||
await portal.backfill_lock.wait(f"update {update.id}")
|
await portal.backfill_lock.wait(f"update {update.id}")
|
||||||
|
|
||||||
if isinstance(update, MessageService):
|
if isinstance(update, MessageService):
|
||||||
|
|||||||
@@ -2863,8 +2863,11 @@ class Portal(DBPortal, BasePortal):
|
|||||||
if limit == 0:
|
if limit == 0:
|
||||||
return "Limit is zero, not backfilling"
|
return "Limit is zero, not backfilling"
|
||||||
with self.backfill_lock:
|
with self.backfill_lock:
|
||||||
output = await self.backfill(
|
output = await asyncio.wait_for(
|
||||||
source, client, forward=True, forward_limit=limit, last_tgid=last_tgid
|
self.backfill(
|
||||||
|
source, client, forward=True, forward_limit=limit, last_tgid=last_tgid
|
||||||
|
),
|
||||||
|
timeout=15 * 60,
|
||||||
)
|
)
|
||||||
self.log.debug(f"Forward backfill complete, status: {output}")
|
self.log.debug(f"Forward backfill complete, status: {output}")
|
||||||
return output
|
return output
|
||||||
@@ -3129,9 +3132,16 @@ class Portal(DBPortal, BasePortal):
|
|||||||
anchor_id = 2**31 - 1
|
anchor_id = 2**31 - 1
|
||||||
minmax = {}
|
minmax = {}
|
||||||
self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}")
|
self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}")
|
||||||
|
delay_warn_handle = self.loop.call_later(
|
||||||
|
5 * 60, lambda: self.log.warning("Iterating messages is taking long")
|
||||||
|
)
|
||||||
# Iterate messages newest to oldest and collect the results
|
# Iterate messages newest to oldest and collect the results
|
||||||
async for msg in client.iter_messages(entity, limit=limit, **minmax):
|
async for msg in client.iter_messages(entity, limit=limit, **minmax):
|
||||||
message_count += 1
|
message_count += 1
|
||||||
|
if message_count == 1:
|
||||||
|
self.log.debug(f"Backfill iter: got first message {msg.id}")
|
||||||
|
elif message_count % 50 == 0:
|
||||||
|
self.log.debug(f"Backfill iter: got {message_count} messages so far (at {msg.id})")
|
||||||
if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id):
|
if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id):
|
||||||
continue
|
continue
|
||||||
elif isinstance(msg, MessageService):
|
elif isinstance(msg, MessageService):
|
||||||
@@ -3156,6 +3166,7 @@ class Portal(DBPortal, BasePortal):
|
|||||||
events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True))
|
events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True))
|
||||||
intents.append(intent)
|
intents.append(intent)
|
||||||
metas.append(None)
|
metas.append(None)
|
||||||
|
delay_warn_handle.cancel()
|
||||||
if len(events) == 0:
|
if len(events) == 0:
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
f"Didn't get any events to send out of {message_count} messages fetched "
|
f"Didn't get any events to send out of {message_count} messages fetched "
|
||||||
|
|||||||
Reference in New Issue
Block a user