Add command to backfill room history from Telegram

Currently supports backfilling one room at a time and backfills
everything after the last bridged message.
This commit is contained in:
Tulir Asokan
2020-02-04 22:41:49 +02:00
parent 07edcc4867
commit 2fbee75453
6 changed files with 85 additions and 20 deletions
+16 -2
View File
@@ -20,7 +20,8 @@ import base64
import re import re
from telethon.errors import (InviteHashInvalidError, InviteHashExpiredError, OptionsTooMuchError, from telethon.errors import (InviteHashInvalidError, InviteHashExpiredError, OptionsTooMuchError,
UserAlreadyParticipantError, ChatIdInvalidError) UserAlreadyParticipantError, ChatIdInvalidError,
TakeoutInitDelayError)
from telethon.tl.patched import Message from telethon.tl.patched import Message
from telethon.tl.types import (User as TLUser, TypeUpdates, MessageMediaGame, MessageMediaPoll, from telethon.tl.types import (User as TLUser, TypeUpdates, MessageMediaGame, MessageMediaPoll,
TypeInputPeer) TypeInputPeer)
@@ -35,7 +36,8 @@ from ... import puppet as pu, portal as po
from ...abstract_user import AbstractUser from ...abstract_user import AbstractUser
from ...db import Message as DBMessage from ...db import Message as DBMessage
from ...types import TelegramID from ...types import TelegramID
from ...commands import command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS from ...commands import (command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS,
SECTION_PORTAL_MANAGEMENT)
@command_handler(needs_auth=False, @command_handler(needs_auth=False,
@@ -303,3 +305,15 @@ async def vote(evt: CommandEvent) -> EventID:
return await evt.reply("You passed too many options.") return await evt.reply("You passed too many options.")
# TODO use response # TODO use response
return await evt.mark_read() return await evt.mark_read()
@command_handler(help_section=SECTION_PORTAL_MANAGEMENT,
help_args="<_number of messages_> [--takeout]",
help_text="Backfill messages from Telegram history.")
async def backfill(evt: CommandEvent) -> None:
portal = po.Portal.get_by_mxid(evt.room_id)
try:
await portal.backfill(evt.sender)
except TakeoutInitDelayError:
await evt.reply("Please accept the data export request from a mobile device, "
"then re-run the backfill command.")
+10
View File
@@ -61,6 +61,16 @@ class Message(Base):
except StopIteration: except StopIteration:
return 0 return 0
@classmethod
def find_last(cls, mx_room: RoomID, tg_space: TelegramID) -> Optional['Message']:
return cls._one_or_none(cls.db.execute(
cls._make_simple_select(cls.c.mx_room == mx_room, cls.c.tg_space == tg_space)
.order_by(desc(cls.c.tgid)).limit(1)))
@classmethod
def delete_all(cls, mx_room: RoomID) -> None:
cls.db.execute(cls.t.delete().where(cls.c.mx_room == mx_room))
@classmethod @classmethod
def get_by_mxid(cls, mxid: EventID, mx_room: RoomID, tg_space: TelegramID def get_by_mxid(cls, mxid: EventID, mx_room: RoomID, tg_space: TelegramID
) -> Optional['Message']: ) -> Optional['Message']:
+3 -3
View File
@@ -1,8 +1,8 @@
from typing import Union from typing import Union
from .base import BasePortal from .base import BasePortal
from .portal_matrix import PortalMatrix from .matrix import PortalMatrix
from .portal_metadata import PortalMetadata from .metadata import PortalMetadata
from .portal_telegram import PortalTelegram from .telegram import PortalTelegram
from ..context import Context from ..context import Context
Portal = Union[BasePortal, PortalMatrix, PortalMetadata, PortalTelegram] Portal = Union[BasePortal, PortalMatrix, PortalMetadata, PortalTelegram]
+14 -5
View File
@@ -13,7 +13,7 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, TYPE_CHECKING from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, Set, TYPE_CHECKING
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import asyncio import asyncio
import logging import logging
@@ -35,7 +35,7 @@ from mautrix.util.simple_template import SimpleTemplate
from ..types import TelegramID from ..types import TelegramID
from ..context import Context from ..context import Context
from ..db import Portal as DBPortal from ..db import Portal as DBPortal, Message as DBMessage
from .. import puppet as p, user as u, util from .. import puppet as p, user as u, util
from .deduplication import PortalDedup from .deduplication import PortalDedup
from .send_lock import PortalSendLock from .send_lock import PortalSendLock
@@ -86,6 +86,8 @@ class BasePortal(ABC):
photo_id: Optional[str] photo_id: Optional[str]
local_config: Dict[str, Any] local_config: Dict[str, Any]
deleted: bool deleted: bool
backfilling: bool
backfill_leave: Optional[Set[IntentAPI]]
log: logging.Logger log: logging.Logger
alias: Optional[RoomAlias] alias: Optional[RoomAlias]
@@ -115,6 +117,8 @@ class BasePortal(ABC):
self._main_intent = None self._main_intent = None
self.deleted = False self.deleted = False
self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid) self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid)
self.backfilling = True
self.backfill_leave = None
self.dedup = PortalDedup(self) self.dedup = PortalDedup(self)
self.send_lock = PortalSendLock() self.send_lock = PortalSendLock()
@@ -273,8 +277,8 @@ class BasePortal(ABC):
authenticated.append(user) authenticated.append(user)
return authenticated return authenticated
@staticmethod @classmethod
async def cleanup_room(intent: IntentAPI, room_id: RoomID, message: str, async def cleanup_room(cls, intent: IntentAPI, room_id: RoomID, message: str,
puppets_only: bool = False) -> None: puppets_only: bool = False) -> None:
try: try:
members = await intent.get_room_members(room_id) members = await intent.get_room_members(room_id)
@@ -293,7 +297,7 @@ class BasePortal(ABC):
try: try:
await intent.leave_room(room_id) await intent.leave_room(room_id)
except (MatrixRequestError, IntentError): except (MatrixRequestError, IntentError):
self.log.warning("Failed to leave room when cleaning up room", exc_info=True) cls.log.warning(f"Failed to leave room {room_id} when cleaning up room", exc_info=True)
async def cleanup_portal(self, message: str, puppets_only: bool = False) -> None: async def cleanup_portal(self, message: str, puppets_only: bool = False) -> None:
if self.username: if self.username:
@@ -342,6 +346,7 @@ class BasePortal(ABC):
pass pass
if self._db_instance: if self._db_instance:
self._db_instance.delete() self._db_instance.delete()
DBMessage.delete_all(self.mxid)
self.deleted = True self.deleted = True
@classmethod @classmethod
@@ -491,6 +496,10 @@ class BasePortal(ABC):
old_levels: Dict[UserID, int]) -> Awaitable[None]: old_levels: Dict[UserID, int]) -> Awaitable[None]:
pass pass
@abstractmethod
def backfill(self, source: 'AbstractUser') -> Awaitable[None]:
pass
# endregion # endregion
+35 -5
View File
@@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Awaitable, Dict, List, Optional, Tuple, Union, NamedTuple, TYPE_CHECKING from typing import Awaitable, Dict, List, Optional, Tuple, Union, NamedTuple, TYPE_CHECKING
from html import escape as escape_html
from abc import ABC from abc import ABC
import random import random
import mimetypes import mimetypes
@@ -30,7 +29,7 @@ from telethon.tl.types import (
MessageMediaPoll, MessageActionChannelCreate, MessageActionChatAddUser, MessageMediaPoll, MessageActionChannelCreate, MessageActionChatAddUser,
MessageActionChatCreate, MessageActionChatDeletePhoto, MessageActionChatDeleteUser, MessageActionChatCreate, MessageActionChatDeletePhoto, MessageActionChatDeleteUser,
MessageActionChatEditPhoto, MessageActionChatEditTitle, MessageActionChatJoinedByLink, MessageActionChatEditPhoto, MessageActionChatEditTitle, MessageActionChatJoinedByLink,
MessageActionChatMigrateTo, MessageActionPinMessage, MessageActionGameScore, MessageActionChatMigrateTo, MessageActionChannelMigrateFrom, MessageActionGameScore,
MessageMediaDocument, MessageMediaGeo, MessageMediaPhoto, MessageMediaUnsupported, MessageMediaDocument, MessageMediaGeo, MessageMediaPhoto, MessageMediaUnsupported,
MessageMediaGame, PeerUser, PhotoCachedSize, TypeChannelParticipant, TypeChatParticipant, MessageMediaGame, PeerUser, PhotoCachedSize, TypeChannelParticipant, TypeChatParticipant,
TypeDocumentAttribute, TypeMessageAction, TypePhotoSize, PhotoSize, UpdateChatUserTyping, TypeDocumentAttribute, TypeMessageAction, TypePhotoSize, PhotoSize, UpdateChatUserTyping,
@@ -356,6 +355,31 @@ class PortalTelegram(BasePortal, ABC):
edit_index=prev_edit_msg.edit_index + 1).insert() edit_index=prev_edit_msg.edit_index + 1).insert()
DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id) DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id)
async def backfill(self, source: 'AbstractUser') -> None:
last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel"
else self.tgid))
min_id = last.tgid if last else 0
self.backfilling = True
self.backfill_leave = set()
max_file_size = min(config["bridge.max_document_size"], 1500) * 1024 * 1024
async with source.client.takeout(files=True, megagroups=self.megagroup,
chats=self.peer_type == "chat",
users=self.peer_type == "user",
channels=(self.peer_type == "channel"
and not self.megagroup),
max_file_size=max_file_size
) as takeout_client:
async for message in takeout_client.iter_messages(await self.get_input_entity(source),
reverse=True, min_id=min_id):
sender = p.Puppet.get(message.sender_id)
# if isinstance(message, MessageService):
# await self.handle_telegram_action(source, sender, message)
await self.handle_telegram_message(source, sender, message)
for intent in self.backfill_leave:
await intent.leave_room(self.mxid)
self.backfilling = False
self.backfill_leave = None
async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet, async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet,
evt: Message) -> None: evt: Message) -> None:
if not self.mxid: if not self.mxid:
@@ -383,7 +407,7 @@ class PortalTelegram(BasePortal, ABC):
tg_space=tg_space, edit_index=0).insert() tg_space=tg_space, edit_index=0).insert()
return return
if self.dedup.pre_db_check and self.peer_type == "channel": if self.backfilling or (self.dedup.pre_db_check and self.peer_type == "channel"):
msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space) msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if msg: if msg:
self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already" self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already"
@@ -402,7 +426,13 @@ class PortalTelegram(BasePortal, ABC):
MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported) MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported)
media = evt.media if hasattr(evt, "media") and isinstance(evt.media, media = evt.media if hasattr(evt, "media") and isinstance(evt.media,
allowed_media) else None allowed_media) else None
intent = sender.intent_for(self) if sender else self.main_intent if sender:
intent = sender.intent_for(self)
if self.backfilling and intent != sender.default_mxid_intent:
intent = sender.default_mxid_intent
self.backfill_leave.add(intent)
else:
intent = self.main_intent
if not media and evt.message: if not media and evt.message:
is_bot = sender.is_bot if sender else False is_bot = sender.is_bot if sender else False
event_id = await self.handle_telegram_text(source, intent, is_bot, evt) event_id = await self.handle_telegram_text(source, intent, is_bot, evt)
@@ -502,7 +532,7 @@ class PortalTelegram(BasePortal, ABC):
await self.main_intent.set_power_levels(self.mxid, levels) await self.main_intent.set_power_levels(self.mxid, levels)
async def receive_telegram_pin_id(self, msg_id: TelegramID, receiver: TelegramID) -> None: async def receive_telegram_pin_id(self, msg_id: TelegramID, receiver: TelegramID) -> None:
tg_space = receiver if self.peer_type != "channel" else self.tgid tg_space = receiver if self.peer_type != "channel" else self.tgid
message = DBMessage.get_one_by_tgid(msg_id, tg_space) if msg_id != 0 else None message = DBMessage.get_one_by_tgid(msg_id, tg_space) if msg_id != 0 else None
if message: if message:
await self.main_intent.set_pinned_messages(self.mxid, [message.mxid]) await self.main_intent.set_pinned_messages(self.mxid, [message.mxid])
+7 -5
View File
@@ -30,7 +30,6 @@ from telethon.errors import (AuthBytesInvalidError, AuthKeyInvalidError, Locatio
from mautrix.appservice import IntentAPI from mautrix.appservice import IntentAPI
from ..tgclient import MautrixTelegramClient from ..tgclient import MautrixTelegramClient
from ..db import TelegramFile as DBTelegramFile from ..db import TelegramFile as DBTelegramFile
from ..util import sane_mimetypes from ..util import sane_mimetypes
@@ -214,8 +213,8 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
image_converted = False image_converted = False
# A weird bug in alpine/magic makes it return application/octet-stream for gzips... # A weird bug in alpine/magic makes it return application/octet-stream for gzips...
if is_sticker and tgs_convert and (mime_type == "application/gzip" or ( if is_sticker and tgs_convert and (mime_type == "application/gzip" or (
mime_type == "application/octet-stream" mime_type == "application/octet-stream"
and magic.from_buffer(file).startswith("gzip"))): and magic.from_buffer(file).startswith("gzip"))):
mime_type, file, width, height = await convert_tgs_to( mime_type, file, width, height = await convert_tgs_to(
file, tgs_convert["target"], **tgs_convert["args"]) file, tgs_convert["target"], **tgs_convert["args"])
thumbnail = None thumbnail = None
@@ -238,8 +237,11 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
if thumbnail and (mime_type.startswith("video/") or mime_type == "image/gif"): if thumbnail and (mime_type.startswith("video/") or mime_type == "image/gif"):
if isinstance(thumbnail, (PhotoSize, PhotoCachedSize)): if isinstance(thumbnail, (PhotoSize, PhotoCachedSize)):
thumbnail = thumbnail.location thumbnail = thumbnail.location
db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file, try:
mime_type) db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file,
mime_type)
except FileIdInvalidError:
log.warning(f"Failed to transfer thumbnail for {thumbnail!s}", exc_info=True)
try: try:
db_file.insert() db_file.insert()