Improve things

This commit is contained in:
Tulir Asokan
2019-08-06 21:30:14 +03:00
parent 45f1dddb81
commit 8655f5903a
9 changed files with 72 additions and 41 deletions
+2 -1
View File
@@ -119,9 +119,10 @@ bridge:
# their Telegram account at startup. # their Telegram account at startup.
startup_sync: true startup_sync: true
# Number of most recently active dialogs to check when syncing chats. # Number of most recently active dialogs to check when syncing chats.
# Dialogs include groups and private chats, but only groups are synced.
# Set to 0 to remove limit. # Set to 0 to remove limit.
sync_dialog_limit: 30 sync_dialog_limit: 30
# Whether or not to sync and create portals for direct chats at startup.
sync_direct_chats: false
# The maximum number of simultaneous Telegram deletions to handle. # The maximum number of simultaneous Telegram deletions to handle.
# A large number of simultaneous redactions could put strain on your homeserver. # A large number of simultaneous redactions could put strain on your homeserver.
max_telegram_delete: 10 max_telegram_delete: 10
+7 -14
View File
@@ -13,7 +13,7 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Tuple, Optional, AsyncIterable, Union, Dict, TYPE_CHECKING from typing import Tuple, Optional, Union, Dict, TYPE_CHECKING
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import asyncio import asyncio
import logging import logging
@@ -23,12 +23,12 @@ import time
from telethon.sessions import Session from telethon.sessions import Session
from telethon.tl.patched import MessageService, Message from telethon.tl.patched import MessageService, Message
from telethon.tl.types import ( from telethon.tl.types import (
Channel, ChannelForbidden, Chat, ChatForbidden, MessageActionChannelMigrateFrom, PeerUser, Channel, Chat, MessageActionChannelMigrateFrom, PeerUser, TypeUpdate, UpdateChatPinnedMessage,
TypeUpdate, UpdateChannelPinnedMessage, UpdateChatPinnedMessage, UpdateChatParticipantAdmin, UpdateChannelPinnedMessage, UpdateChatParticipantAdmin, UpdateChatParticipants,
UpdateChatParticipants, UpdateChatUserTyping, UpdateDeleteChannelMessages, UpdateNewMessage, UpdateChatUserTyping, UpdateDeleteChannelMessages, UpdateNewMessage, UpdateDeleteMessages,
UpdateDeleteMessages, UpdateEditChannelMessage, UpdateEditMessage, UpdateNewChannelMessage, UpdateEditChannelMessage, UpdateEditMessage, UpdateNewChannelMessage, UpdateReadHistoryOutbox,
UpdateReadHistoryOutbox, UpdateShortChatMessage, UpdateShortMessage, UpdateUserName, UpdateShortChatMessage, UpdateShortMessage, UpdateUserName, UpdateUserPhoto, UpdateUserStatus,
UpdateUserPhoto, UpdateUserStatus, UpdateUserTyping, User, UserStatusOffline, UserStatusOnline) UpdateUserTyping, User, UserStatusOffline, UserStatusOnline)
from mautrix.types import UserID, PresenceState from mautrix.types import UserID, PresenceState
from mautrix.errors import MatrixError from mautrix.errors import MatrixError
@@ -189,13 +189,6 @@ class AbstractUser(ABC):
if UPDATE_TIME: if UPDATE_TIME:
UPDATE_TIME.labels(update_type=type(update).__name__).observe(time.time() - start_time) UPDATE_TIME.labels(update_type=type(update).__name__).observe(time.time() - start_time)
def get_dialogs(self, limit: int = None) -> AsyncIterable[Union[User, Chat, Channel]]:
return (dialog.entity async for dialog in
self.client.iter_dialogs(limit=limit, ignore_migrated=True, archived=False)
if isinstance(dialog.entity, (ChatForbidden, ChannelForbidden))
or (isinstance(dialog.entity, Chat)
and (dialog.entity.deactivated or dialog.entity.left)))
@property @property
@abstractmethod @abstractmethod
def name(self) -> str: def name(self) -> str:
+1
View File
@@ -85,6 +85,7 @@ class Config(BaseBridgeConfig):
copy("bridge.skip_deleted_members") copy("bridge.skip_deleted_members")
copy("bridge.startup_sync") copy("bridge.startup_sync")
copy("bridge.sync_dialog_limit") copy("bridge.sync_dialog_limit")
copy("bridge.sync_direct_chats")
copy("bridge.max_telegram_delete") copy("bridge.max_telegram_delete")
copy("bridge.sync_matrix_state") copy("bridge.sync_matrix_state")
copy("bridge.allow_matrix_login") copy("bridge.allow_matrix_login")
+3 -3
View File
@@ -115,7 +115,7 @@ class BasePortal(ABC):
self._db_instance = db_instance self._db_instance = db_instance
self._main_intent = None self._main_intent = None
self.deleted = False self.deleted = False
self.log = self.base_log.getChild(self.tgid_log) if self.tgid else self.base_log self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid)
self.dedup = PortalDedup(self) self.dedup = PortalDedup(self)
self.send_lock = PortalSendLock() self.send_lock = PortalSendLock()
@@ -155,7 +155,7 @@ class BasePortal(ABC):
if not self._main_intent: if not self._main_intent:
direct = self.peer_type == "user" direct = self.peer_type == "user"
puppet = p.Puppet.get(self.tgid) if direct else None puppet = p.Puppet.get(self.tgid) if direct else None
self._main_intent = puppet.intent if direct else self.az.intent self._main_intent = puppet.intent_for(self) if direct else self.az.intent
return self._main_intent return self._main_intent
@property @property
@@ -272,7 +272,7 @@ class BasePortal(ABC):
if user != intent.mxid and (not puppets_only or puppet): if user != intent.mxid and (not puppets_only or puppet):
try: try:
if puppet: if puppet:
await puppet.intent.leave_room(room_id) await puppet.default_mxid_intent.leave_room(room_id)
else: else:
await intent.kick_user(room_id, user, message) await intent.kick_user(room_id, user, message)
except (MatrixRequestError, IntentError): except (MatrixRequestError, IntentError):
+31 -12
View File
@@ -100,7 +100,7 @@ class PortalMetadata(BasePortal, ABC):
self.tgid = new_id self.tgid = new_id
self.tg_receiver = new_id self.tg_receiver = new_id
self.by_tgid[self.tgid_full] = self self.by_tgid[self.tgid_full] = self
self.log = self.base_log.getChild(str(self.tgid)) self.log = self.base_log.getChild(self.tgid_log)
self.log.info(f"Telegram chat upgraded from {old_id}") self.log.info(f"Telegram chat upgraded from {old_id}")
async def set_telegram_username(self, source: 'u.User', username: str) -> None: async def set_telegram_username(self, source: 'u.User', username: str) -> None:
@@ -145,7 +145,7 @@ class PortalMetadata(BasePortal, ABC):
self.by_tgid[self.tgid_full] = self self.by_tgid[self.tgid_full] = self
await self.update_info(source, entity) await self.update_info(source, entity)
self.db_instance.insert() self.db_instance.insert()
self.log = self.base_log.getChild(str(self.tgid)) self.log = self.base_log.getChild(self.tgid_log)
if self.bot and self.bot.tgid in invites: if self.bot and self.bot.tgid in invites:
self.bot.add_chat(self.tgid, self.peer_type) self.bot.add_chat(self.tgid, self.peer_type)
@@ -192,6 +192,17 @@ class PortalMetadata(BasePortal, ABC):
levels: PowerLevelStateEventContent = None, levels: PowerLevelStateEventContent = None,
users: List[User] = None, users: List[User] = None,
participants: List[TypeParticipant] = None) -> None: participants: List[TypeParticipant] = None) -> None:
try:
await self._update_matrix_room(user, entity, direct, puppet, levels, users,
participants)
except Exception:
self.log.exception("Fatal error updating Matrix room")
async def _update_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User],
direct: bool, puppet: p.Puppet = None,
levels: PowerLevelStateEventContent = None,
users: List[User] = None,
participants: List[TypeParticipant] = None) -> None:
if not direct: if not direct:
await self.update_info(user, entity) await self.update_info(user, entity)
if not users or not participants: if not users or not participants:
@@ -202,7 +213,7 @@ class PortalMetadata(BasePortal, ABC):
if not puppet: if not puppet:
puppet = p.Puppet.get(self.tgid) puppet = p.Puppet.get(self.tgid)
await puppet.update_info(user, entity) await puppet.update_info(user, entity)
await puppet.intent.join_room(self.mxid) await puppet.intent_for(self).join_room(self.mxid)
if self.sync_matrix_state: if self.sync_matrix_state:
await self.sync_matrix_members() await self.sync_matrix_members()
@@ -221,7 +232,10 @@ class PortalMetadata(BasePortal, ABC):
await self.invite_to_matrix(invites or []) await self.invite_to_matrix(invites or [])
return self.mxid return self.mxid
async with self._room_create_lock: async with self._room_create_lock:
return await self._create_matrix_room(user, entity, invites) try:
return await self._create_matrix_room(user, entity, invites)
except Exception:
self.log.exception("Fatal error creating Matrix room")
async def _create_matrix_room(self, user: 'AbstractUser', entity: TypeChat, invites: InviteList async def _create_matrix_room(self, user: 'AbstractUser', entity: TypeChat, invites: InviteList
) -> Optional[RoomID]: ) -> Optional[RoomID]:
@@ -235,7 +249,7 @@ class PortalMetadata(BasePortal, ABC):
if not entity: if not entity:
entity = await self.get_entity(user) entity = await self.get_entity(user)
self.log.debug("Fetched data: %s", entity) self.log.debug(f"Fetched data: {entity}")
self.log.debug("Creating room") self.log.debug("Creating room")
@@ -244,8 +258,12 @@ class PortalMetadata(BasePortal, ABC):
except AttributeError: except AttributeError:
self.title = None self.title = None
if direct and self.tgid == user.tgid:
self.title = "Telegram Saved Messages"
self.about = "Your Telegram cloud storage chat"
puppet = p.Puppet.get(self.tgid) if direct else None puppet = p.Puppet.get(self.tgid) if direct else None
self._main_intent = puppet.intent if direct else self.az.intent self._main_intent = puppet.intent_for(self) if direct else self.az.intent
if self.peer_type == "channel": if self.peer_type == "channel":
self.megagroup = entity.megagroup self.megagroup = entity.megagroup
@@ -280,7 +298,8 @@ class PortalMetadata(BasePortal, ABC):
room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset,
is_direct=direct, invitees=invites or [], is_direct=direct, invitees=invites or [],
name=self.title, initial_state=initial_state) name=self.title, topic=self.about,
initial_state=initial_state)
if not room_id: if not room_id:
raise Exception(f"Failed to create room") raise Exception(f"Failed to create room")
@@ -420,7 +439,7 @@ class PortalMetadata(BasePortal, ABC):
if entity.bot: if entity.bot:
self._add_bot_chat(entity) self._add_bot_chat(entity)
allowed_tgids.add(entity.id) allowed_tgids.add(entity.id)
await puppet.intent.ensure_joined(self.mxid) await puppet.intent_for(self).ensure_joined(self.mxid)
await puppet.update_info(source, entity) await puppet.update_info(source, entity)
user = u.User.get_by_tgid(TelegramID(entity.id)) user = u.User.get_by_tgid(TelegramID(entity.id))
@@ -461,7 +480,7 @@ class PortalMetadata(BasePortal, ABC):
if source: if source:
entity: User = await source.client.get_entity(PeerUser(user_id)) entity: User = await source.client.get_entity(PeerUser(user_id))
await puppet.update_info(source, entity) await puppet.update_info(source, entity)
await puppet.intent.join_room(self.mxid) await puppet.intent_for(self).join_room(self.mxid)
user = u.User.get_by_tgid(user_id) user = u.User.get_by_tgid(user_id)
if user: if user:
@@ -476,16 +495,16 @@ class PortalMetadata(BasePortal, ABC):
else "Left Telegram chat") else "Left Telegram chat")
if sender.tgid != puppet.tgid: if sender.tgid != puppet.tgid:
try: try:
await sender.intent.kick_user(self.mxid, puppet.mxid) await sender.intent_for(self).kick_user(self.mxid, puppet.mxid)
except MForbidden: except MForbidden:
await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message) await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message)
else: else:
await puppet.intent.leave_room(self.mxid) await puppet.intent_for(self).leave_room(self.mxid)
if user: if user:
user.unregister_portal(self) user.unregister_portal(self)
if sender.tgid != puppet.tgid: if sender.tgid != puppet.tgid:
try: try:
await sender.intent.kick_user(self.mxid, puppet.mxid) await sender.intent_for(self).kick_user(self.mxid, puppet.mxid)
return return
except MForbidden: except MForbidden:
pass pass
+5 -5
View File
@@ -73,7 +73,7 @@ class PortalTelegram(BasePortal, ABC):
async def handle_telegram_typing(self, user: p.Puppet, async def handle_telegram_typing(self, user: p.Puppet,
_: Union[UpdateUserTyping, UpdateChatUserTyping]) -> None: _: Union[UpdateUserTyping, UpdateChatUserTyping]) -> None:
await user.intent.set_typing(self.mxid, is_typing=True) await user.intent_for(self).set_typing(self.mxid, is_typing=True)
def _get_external_url(self, evt: Message) -> Optional[str]: def _get_external_url(self, evt: Message) -> Optional[str]:
if self.peer_type == "channel" and self.username is not None: if self.peer_type == "channel" and self.username is not None:
@@ -361,7 +361,7 @@ class PortalTelegram(BasePortal, ABC):
f"{editing_msg.mxid}\">Edit</a>: " f"{editing_msg.mxid}\">Edit</a>: "
f"{content.formatted_body or escape_html(content.body)}") f"{content.formatted_body or escape_html(content.body)}")
intent = sender.intent if sender else self.main_intent intent = sender.intent_for(self) if sender else self.main_intent
await intent.set_typing(self.mxid, is_typing=False) await intent.set_typing(self.mxid, is_typing=False)
event_id = await intent.send_message(self.mxid, content) event_id = await intent.send_message(self.mxid, content)
@@ -409,7 +409,7 @@ class PortalTelegram(BasePortal, ABC):
MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported) MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported)
media = evt.media if hasattr(evt, "media") and isinstance(evt.media, media = evt.media if hasattr(evt, "media") and isinstance(evt.media,
allowed_media) else None allowed_media) else None
intent = sender.intent if sender else self.main_intent intent = sender.intent_for(self) if sender else self.main_intent
if not media and evt.message: if not media and evt.message:
is_bot = sender.is_bot if sender else False is_bot = sender.is_bot if sender else False
event_id = await self.handle_telegram_text(source, intent, is_bot, evt) event_id = await self.handle_telegram_text(source, intent, is_bot, evt)
@@ -489,7 +489,7 @@ class PortalTelegram(BasePortal, ABC):
elif isinstance(action, MessageActionChatMigrateTo): elif isinstance(action, MessageActionChatMigrateTo):
self.peer_type = "channel" self.peer_type = "channel"
self._migrate_and_save_telegram(TelegramID(action.channel_id)) self._migrate_and_save_telegram(TelegramID(action.channel_id))
await sender.intent.send_emote(self.mxid, "upgraded this group to a supergroup.") await sender.intent_for(self).send_emote(self.mxid, "upgraded this group to a supergroup.")
elif isinstance(action, MessageActionPinMessage): elif isinstance(action, MessageActionPinMessage):
await self.receive_telegram_pin_sender(sender) await self.receive_telegram_pin_sender(sender)
elif isinstance(action, MessageActionGameScore): elif isinstance(action, MessageActionGameScore):
@@ -515,7 +515,7 @@ class PortalTelegram(BasePortal, ABC):
await self.update_telegram_pin() await self.update_telegram_pin()
async def update_telegram_pin(self) -> None: async def update_telegram_pin(self) -> None:
intent = (self._temp_pinned_message_sender.intent intent = (self._temp_pinned_message_sender.intent_for(self)
if self._temp_pinned_message_sender else self.main_intent) if self._temp_pinned_message_sender else self.main_intent)
msg_id = self._temp_pinned_message_id msg_id = self._temp_pinned_message_id
self._temp_pinned_message_id = None self._temp_pinned_message_id = None
+6 -1
View File
@@ -28,7 +28,7 @@ from mautrix.types import UserID
from .types import TelegramID from .types import TelegramID
from .db import Puppet as DBPuppet from .db import Puppet as DBPuppet
from . import util from . import util, portal as p
if TYPE_CHECKING: if TYPE_CHECKING:
from .matrix import MatrixHandler from .matrix import MatrixHandler
@@ -135,6 +135,11 @@ class Puppet(CustomPuppetMixin):
) -> Awaitable[Union[TypeInputPeer, TypeInputUser]]: ) -> Awaitable[Union[TypeInputPeer, TypeInputUser]]:
return user.client.get_input_entity(self.peer) return user.client.get_input_entity(self.peer)
def intent_for(self, portal: 'p.Portal') -> IntentAPI:
if portal.tgid == self.tgid:
return self.default_mxid_intent
return self.intent
# region DB conversion # region DB conversion
@property @property
+16 -4
View File
@@ -13,14 +13,15 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import (Awaitable, Dict, List, Iterable, Match, NewType, Optional, Tuple, Any, from typing import (Awaitable, Dict, List, Iterable, Match, NewType, Optional, Tuple, Any, Union,
TYPE_CHECKING) AsyncIterable, TYPE_CHECKING)
import logging import logging
import asyncio import asyncio
import re import re
from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser, from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser,
UpdateShortChatMessage, UpdateShortMessage, User as TLUser) UpdateShortChatMessage, UpdateShortMessage, User as TLUser,
ChannelForbidden, ChatForbidden, Chat, Channel)
from telethon.tl.types.contacts import ContactsNotModified from telethon.tl.types.contacts import ContactsNotModified
from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest
from telethon.tl.functions.account import UpdateStatusRequest from telethon.tl.functions.account import UpdateStatusRequest
@@ -310,7 +311,17 @@ class User(AbstractUser):
if self.is_bot: if self.is_bot:
return return
creators = [] creators = []
async for entity in self.get_dialogs(limit=config["bridge.sync_dialog_limit"] or None): limit = config["bridge.sync_dialog_limit"] or None
self.log.debug(f"Syncing dialogs (limit={limit}, synchronous_create={synchronous_create})")
async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True,
archived=False):
entity = dialog.entity
if isinstance(entity, Chat) and (entity.deactivated or entity.left):
self.log.warn(f"Ignoring deactivated or left chat {entity} while syncing")
continue
elif isinstance(entity, TLUser) and not config["bridge.sync_direct_chats"]:
continue
self.log.info(f"Syncing {type(entity)}")
portal = po.Portal.get_by_entity(entity) portal = po.Portal.get_by_entity(entity)
self.portals[portal.tgid_full] = portal self.portals[portal.tgid_full] = portal
creators.append( creators.append(
@@ -318,6 +329,7 @@ class User(AbstractUser):
synchronous=synchronous_create)) synchronous=synchronous_create))
self.save(portals=True) self.save(portals=True)
await asyncio.gather(*creators, loop=self.loop) await asyncio.gather(*creators, loop=self.loop)
self.log.debug("Dialog syncing complete")
def register_portal(self, portal: po.Portal) -> None: def register_portal(self, portal: po.Portal) -> None:
try: try:
@@ -319,7 +319,7 @@ class ProvisioningAPI(AuthAPI):
return web.json_response([{ return web.json_response([{
"id": get_peer_id(chat), "id": get_peer_id(chat),
"title": chat.title, "title": chat.title,
} async for chat in user.get_dialogs()]) } async for chat in user.client.get_dialogs(ignore_migrated=True, archived=False)])
else: else:
return web.json_response([{ return web.json_response([{
"id": get_peer_id(chat.peer), "id": get_peer_id(chat.peer),