Add missed message backfilling

This commit is contained in:
Tulir Asokan
2020-07-28 18:32:34 +03:00
parent ea017467fd
commit 59eb7376c9
5 changed files with 30 additions and 19 deletions
+1 -1
View File
@@ -31,7 +31,7 @@
* [x] Message history * [x] Message history
* [x] Manually (`!tg backfill`) * [x] Manually (`!tg backfill`)
* [x] Automatically when creating portal * [x] Automatically when creating portal
* [ ] Automatically for missed messages * [x] Automatically for missed messages
* [x] Avatars * [x] Avatars
* [x] Presence * [x] Presence
* [x] Typing notifications * [x] Typing notifications
+3 -1
View File
@@ -185,8 +185,10 @@ async def sync(evt: CommandEvent) -> EventID:
sync_only = None sync_only = None
if not sync_only or sync_only == "chats": if not sync_only or sync_only == "chats":
await evt.sender.sync_dialogs(synchronous_create=True) await evt.reply("Synchronizing chats...")
await evt.sender.sync_dialogs()
if not sync_only or sync_only == "contacts": if not sync_only or sync_only == "contacts":
await evt.reply("Synchronizing contacts...")
await evt.sender.sync_contacts() await evt.sender.sync_contacts()
if not sync_only or sync_only == "me": if not sync_only or sync_only == "me":
await evt.sender.update_info() await evt.sender.update_info()
+3 -6
View File
@@ -223,8 +223,8 @@ class PortalMetadata(BasePortal, ABC):
await self.main_intent.get_joined_members(self.mxid) await self.main_intent.get_joined_members(self.mxid)
async def create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User] = None, async def create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User] = None,
invites: InviteList = None, update_if_exists: bool = True, invites: InviteList = None, update_if_exists: bool = True
synchronous: bool = False) -> Optional[str]: ) -> Optional[RoomID]:
if self.mxid: if self.mxid:
if update_if_exists: if update_if_exists:
if not entity: if not entity:
@@ -234,10 +234,7 @@ class PortalMetadata(BasePortal, ABC):
self.log.exception(f"Failed to get entity through {user.tgid} for update") self.log.exception(f"Failed to get entity through {user.tgid} for update")
return self.mxid return self.mxid
update = self.update_matrix_room(user, entity, self.peer_type == "user") update = self.update_matrix_room(user, entity, self.peer_type == "user")
if synchronous: self.loop.create_task(update)
await update
else:
asyncio.ensure_future(update, loop=self.loop)
await self.invite_to_matrix(invites or []) await self.invite_to_matrix(invites or [])
return self.mxid return self.mxid
async with self._room_create_lock: async with self._room_create_lock:
+10 -5
View File
@@ -421,7 +421,7 @@ class PortalTelegram(BasePortal, ABC):
} }
async def backfill(self, source: 'AbstractUser', is_initial: bool = False, async def backfill(self, source: 'AbstractUser', is_initial: bool = False,
limit: Optional[int] = None) -> None: limit: Optional[int] = None, last_id: Optional[int] = None) -> None:
limit = limit or (config["bridge.backfill.initial_limit"] if is_initial limit = limit or (config["bridge.backfill.initial_limit"] if is_initial
else config["bridge.backfill.missed_limit"]) else config["bridge.backfill.missed_limit"])
if limit == 0: if limit == 0:
@@ -429,18 +429,23 @@ class PortalTelegram(BasePortal, ABC):
last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel" last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel"
else self.tgid)) else self.tgid))
min_id = last.tgid if last else 0 min_id = last.tgid if last else 0
message = (await source.client.get_messages(self.peer, limit=1))[0] if last_id is None:
message = (await source.client.get_messages(self.peer, limit=1))[0]
last_id = message.id
if last_id <= min_id:
# Nothing to backfill
return
if limit < 0: if limit < 0:
limit = None limit = None
self.log.debug(f"Backfilling approximately {message.id - min_id} messages " self.log.debug(f"Backfilling approximately {last_id - min_id} messages "
f"through {source.mxid}") f"through {source.mxid}")
elif self.peer_type == "channel": elif self.peer_type == "channel":
# This is a channel or supergroup, so we'll backfill messages based on the ID. # This is a channel or supergroup, so we'll backfill messages based on the ID.
# There are some cases, such as deleted messages, where this may backfill less # There are some cases, such as deleted messages, where this may backfill less
# messages than the limit. # messages than the limit.
min_id = max(message.id - limit, min_id) min_id = max(last_id - limit, min_id)
limit = None limit = None
self.log.debug(f"Backfilling messages after ID {min_id} (last message: {message.id}) " self.log.debug(f"Backfilling messages after ID {min_id} (last message: {last_id}) "
f"through {source.mxid}") f"through {source.mxid}")
else: else:
# Private chats and normal groups don't have their own message ID namespace, # Private chats and normal groups don't have their own message ID namespace,
+13 -6
View File
@@ -21,6 +21,7 @@ import asyncio
from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser, from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser,
UpdateShortChatMessage, UpdateShortMessage, User as TLUser, Chat, UpdateShortChatMessage, UpdateShortMessage, User as TLUser, Chat,
ChatForbidden) ChatForbidden)
from telethon.tl.custom import Dialog
from telethon.tl.types.contacts import ContactsNotModified from telethon.tl.types.contacts import ContactsNotModified
from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest
from telethon.tl.functions.account import UpdateStatusRequest from telethon.tl.functions.account import UpdateStatusRequest
@@ -333,12 +334,13 @@ class User(AbstractUser, BaseUser):
return await self._search_remote(query), True return await self._search_remote(query), True
async def sync_dialogs(self, synchronous_create: bool = False) -> None: async def sync_dialogs(self) -> None:
if self.is_bot: if self.is_bot:
return return
creators = [] creators = []
limit = config["bridge.sync_dialog_limit"] or None limit = config["bridge.sync_dialog_limit"] or None
self.log.debug(f"Syncing dialogs (limit={limit}, synchronous_create={synchronous_create})") self.log.debug(f"Syncing dialogs (limit={limit})")
dialog: Dialog
async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True, async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True,
archived=False): archived=False):
entity = dialog.entity entity = dialog.entity
@@ -353,11 +355,16 @@ class User(AbstractUser, BaseUser):
continue continue
portal = po.Portal.get_by_entity(entity, receiver_id=self.tgid) portal = po.Portal.get_by_entity(entity, receiver_id=self.tgid)
self.portals[portal.tgid_full] = portal self.portals[portal.tgid_full] = portal
creators.append( if portal.mxid:
portal.create_matrix_room(self, entity, invites=[self.mxid], update_task = portal.update_matrix_room(self, entity)
synchronous=synchronous_create)) backfill_task = portal.backfill(self, last_known_id=dialog.message.id)
creators.append(self.loop.create_task(update_task))
creators.append(self.loop.create_task(backfill_task))
else:
create_task = portal.create_matrix_room(self, entity, invites=[self.mxid])
creators.append(self.loop.create_task(create_task))
self.save(portals=True) self.save(portals=True)
await asyncio.gather(*creators, loop=self.loop) await asyncio.gather(*creators)
self.log.debug("Dialog syncing complete") self.log.debug("Dialog syncing complete")
def register_portal(self, portal: po.Portal) -> None: def register_portal(self, portal: po.Portal) -> None: