# mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import List, Optional, Iterable, Union, Dict, Any, Tuple, TYPE_CHECKING from abc import ABC import asyncio from telethon.tl.functions.messages import (AddChatUserRequest, CreateChatRequest, GetFullChatRequest, MigrateChatRequest) from telethon.tl.functions.channels import (CreateChannelRequest, GetParticipantsRequest, InviteToChannelRequest, UpdateUsernameRequest) from telethon.errors import ChatAdminRequiredError from telethon.tl.types import ( Channel, ChatBannedRights, ChannelParticipantsRecent, ChannelParticipantsSearch, ChatPhoto, PhotoEmpty, InputChannel, InputUser, ChatPhotoEmpty, PeerUser, Photo, TypeChat, TypeInputPeer, TypeUser, User, InputPeerPhotoFileLocation, ChatParticipantAdmin, ChannelParticipantAdmin, ChatParticipantCreator, ChannelParticipantCreator, UserProfilePhoto, UserProfilePhotoEmpty, InputPeerUser, ChannelParticipantBanned) from mautrix.errors import MForbidden from mautrix.types import (RoomID, UserID, RoomCreatePreset, EventType, Membership, PowerLevelStateEventContent, RoomTopicStateEventContent, RoomNameStateEventContent, RoomAvatarStateEventContent, StateEventContent, EventID, JoinRule) from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY from ..types import TelegramID from ..context import Context from .. import puppet as p, user as u, util from .base import BasePortal, InviteList, TypeParticipant, TypeChatPhoto if TYPE_CHECKING: from ..abstract_user import AbstractUser from ..config import Config config: Optional['Config'] = None StateBridge = EventType.find("m.bridge", EventType.Class.STATE) StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE) class PortalMetadata(BasePortal, ABC): _room_create_lock: asyncio.Lock def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._room_create_lock = asyncio.Lock() # region Matrix -> Telegram async def get_telegram_users_in_matrix_room(self, source: 'u.User' ) -> Tuple[List[InputPeerUser], List[UserID]]: user_tgids = {} user_mxids = await self.main_intent.get_room_members(self.mxid, (Membership.JOIN, Membership.INVITE)) for mxid in user_mxids: if mxid == self.az.bot_mxid: continue mx_user = u.User.get_by_mxid(mxid, create=False) if mx_user and mx_user.tgid: user_tgids[mx_user.tgid] = mxid puppet_id = p.Puppet.get_id_from_mxid(mxid) if puppet_id: user_tgids[puppet_id] = mxid input_users = [] errors = [] for tgid, mxid in user_tgids.items(): try: input_users.append(await source.client.get_input_entity(tgid)) except ValueError as e: source.log.debug(f"Failed to find the input entity for {tgid} ({mxid}) for " f"creating a group: {e}") errors.append(mxid) return input_users, errors async def upgrade_telegram_chat(self, source: 'u.User') -> None: if self.peer_type != "chat": raise ValueError("Only normal group chats are upgradable to supergroups.") response = await source.client(MigrateChatRequest(chat_id=self.tgid)) entity = None for chat in response.chats: if isinstance(chat, Channel): entity = chat break if not entity: raise ValueError("Upgrade may have failed: output channel not found.") self.peer_type = "channel" self._migrate_and_save_telegram(TelegramID(entity.id)) await self.update_info(source, entity) def _migrate_and_save_telegram(self, new_id: TelegramID) -> None: try: del self.by_tgid[self.tgid_full] except KeyError: pass try: existing = self.by_tgid[(new_id, new_id)] existing.delete_sync() except KeyError: pass self.db_instance.edit(tgid=new_id, tg_receiver=new_id, peer_type=self.peer_type) old_id = self.tgid self.tgid = new_id self.tg_receiver = new_id self.by_tgid[self.tgid_full] = self self.log = self.base_log.getChild(self.tgid_log) self.log.info(f"Telegram chat upgraded from {old_id}") async def set_telegram_username(self, source: 'u.User', username: str) -> None: if self.peer_type != "channel": raise ValueError("Only channels and supergroups have usernames.") await source.client( UpdateUsernameRequest(await self.get_input_entity(source), username)) if await self._update_username(username): await self.save() async def create_telegram_chat(self, source: 'u.User', invites: List[InputUser], supergroup: bool = False) -> None: if not self.mxid: raise ValueError("Can't create Telegram chat for portal without Matrix room.") elif self.tgid: raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.") if len(invites) < 2: if self.bot is not None: info, mxid = await self.bot.get_me() raise ValueError("Not enough Telegram users to create a chat. " "Invite more Telegram ghost users to the room, such as the " f"relaybot ([{info.first_name}](https://matrix.to/#/{mxid})).") raise ValueError("Not enough Telegram users to create a chat. " "Invite more Telegram ghost users to the room.") if self.peer_type == "chat": response = await source.client(CreateChatRequest(title=self.title, users=invites)) entity = response.chats[0] elif self.peer_type == "channel": response = await source.client(CreateChannelRequest(title=self.title, about=self.about or "", megagroup=supergroup)) entity = response.chats[0] await source.client(InviteToChannelRequest( channel=await source.client.get_input_entity(entity), users=invites)) else: raise ValueError("Invalid peer type for Telegram chat creation") self.tgid = entity.id self.tg_receiver = self.tgid self.by_tgid[self.tgid_full] = self await self.update_info(source, entity) self.db_instance.insert() self.log = self.base_log.getChild(self.tgid_log) if self.bot and self.bot.tgid in invites: self.bot.add_chat(self.tgid, self.peer_type) levels = await self.main_intent.get_power_levels(self.mxid) if levels.get_user_level(self.main_intent.mxid) == 100: levels = self._get_base_power_levels(levels, entity) await self.main_intent.set_power_levels(self.mxid, levels) await self.handle_matrix_power_levels(source, levels.users, {}, None) await self.update_bridge_info() async def invite_telegram(self, source: 'u.User', puppet: Union[p.Puppet, 'AbstractUser']) -> None: if self.peer_type == "chat": await source.client( AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0)) elif self.peer_type == "channel": await source.client(InviteToChannelRequest(channel=self.peer, users=[puppet.tgid])) # We don't care if there are invites for private chat portals with the relaybot. elif not self.bot or self.tg_receiver != self.bot.tgid: raise ValueError("Invalid peer type for Telegram user invite") # endregion # region Telegram -> Matrix def _get_invite_content(self, double_puppet: Optional['p.Puppet']) -> Dict[str, Any]: invite_content = {} if double_puppet: invite_content["fi.mau.will_auto_accept"] = True if self.is_direct: invite_content["is_direct"] = True return invite_content async def invite_to_matrix(self, users: InviteList) -> None: if isinstance(users, list): for user in users: await self.invite_to_matrix(user) else: puppet = await p.Puppet.get_by_custom_mxid(users) await self.main_intent.invite_user(self.mxid, users, check_cache=True, extra_content=self._get_invite_content(puppet)) if puppet: try: await puppet.intent.ensure_joined(self.mxid) except Exception: self.log.exception("Failed to ensure %s is joined to portal", users) async def update_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User], direct: bool = None, puppet: p.Puppet = None, levels: PowerLevelStateEventContent = None, users: List[User] = None) -> None: if direct is None: direct = self.peer_type == "user" try: await self._update_matrix_room(user, entity, direct, puppet, levels, users) except Exception: self.log.exception("Fatal error updating Matrix room") async def _update_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User], direct: bool, puppet: p.Puppet = None, levels: PowerLevelStateEventContent = None, users: List[User] = None) -> None: if not direct: await self.update_info(user, entity) if not users: users = await self._get_users(user, entity) await self._sync_telegram_users(user, users) await self.update_power_levels(users, levels) else: if not puppet: puppet = p.Puppet.get(self.tgid) await puppet.update_info(user, entity) await puppet.intent_for(self).join_room(self.mxid) if self.encrypted or self.private_chat_portal_meta: # The bridge bot needs to join for e2ee, but that messes up the default name # generation. If/when canonical DMs happen, this might not be necessary anymore. changed = await self._update_title(puppet.displayname) changed = await self._update_avatar(user, entity.photo) or changed if changed: await self.save() await self.update_bridge_info() puppet = await p.Puppet.get_by_custom_mxid(user.mxid) if puppet: try: did_join = await puppet.intent.ensure_joined(self.mxid) if isinstance(user, u.User) and did_join and self.peer_type == "user": await user.update_direct_chats({self.main_intent.mxid: [self.mxid]}) except Exception: self.log.exception("Failed to ensure %s is joined to portal", user.mxid) if self.sync_matrix_state: await self.main_intent.get_joined_members(self.mxid) async def create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User] = None, invites: InviteList = None, update_if_exists: bool = True ) -> Optional[RoomID]: if self.mxid: if update_if_exists: if not entity: try: entity = await self.get_entity(user) except Exception: self.log.exception(f"Failed to get entity through {user.tgid} for update") return self.mxid update = self.update_matrix_room(user, entity, self.peer_type == "user") self.loop.create_task(update) await self.invite_to_matrix(invites or []) return self.mxid async with self._room_create_lock: try: return await self._create_matrix_room(user, entity, invites) except Exception: self.log.exception("Fatal error creating Matrix room") @property def bridge_info_state_key(self) -> str: return f"net.maunium.telegram://telegram/{self.tgid}" @property def bridge_info(self) -> Dict[str, Any]: info = { "bridgebot": self.az.bot_mxid, "creator": self.main_intent.mxid, "protocol": { "id": "telegram", "displayname": "Telegram", "avatar_url": config["appservice.bot_avatar"], "external_url": "https://telegram.org", }, "channel": { "id": str(self.tgid), "displayname": self.title, "avatar_url": self.avatar_url, } } if self.username: info["channel"]["external_url"] = f"https://t.me/{self.username}" elif self.peer_type == "user": puppet = p.Puppet.get(self.tgid) if puppet and puppet.username: info["channel"]["external_url"] = f"https://t.me/{puppet.username}" return info async def update_bridge_info(self) -> None: if not self.mxid: self.log.debug("Not updating bridge info: no Matrix room created") return try: self.log.debug("Updating bridge info...") await self.main_intent.send_state_event(self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key) # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec await self.main_intent.send_state_event(self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key) except Exception: self.log.warning("Failed to update bridge info", exc_info=True) async def _create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User], invites: InviteList) -> Optional[RoomID]: if self.mxid: return self.mxid elif not self.allow_bridging: return None direct = self.peer_type == "user" invites = invites or [] if not entity: entity = await self.get_entity(user) self.log.trace("Fetched data: %s", entity) self.log.debug("Creating room") try: self.title = entity.title except AttributeError: self.title = None if direct and self.tgid == user.tgid: self.title = "Telegram Saved Messages" self.about = "Your Telegram cloud storage chat" puppet = p.Puppet.get(self.tgid) if direct else None if puppet: await puppet.update_info(user, entity) self._main_intent = puppet.intent_for(self) if direct else self.az.intent if self.peer_type == "channel": self.megagroup = entity.megagroup preset = RoomCreatePreset.PRIVATE if self.peer_type == "channel" and entity.username: if self.public_portals: preset = RoomCreatePreset.PUBLIC self.username = entity.username alias = self.alias_localpart else: # TODO invite link alias? alias = None if alias: # TODO? properly handle existing room aliases await self.main_intent.remove_room_alias(alias) power_levels = self._get_base_power_levels(entity=entity) users = None if not direct: users = await self._get_users(user, entity) if self.has_bot: extra_invites = config["bridge.relaybot.group_chat_invite"] invites += extra_invites for invite in extra_invites: power_levels.users.setdefault(invite, 100) await self._participants_to_power_levels(users, power_levels) elif self.bot and self.tg_receiver == self.bot.tgid: invites = config["bridge.relaybot.private_chat.invite"] for invite in invites: power_levels.users.setdefault(invite, 100) self.title = puppet.displayname initial_state = [{ "type": EventType.ROOM_POWER_LEVELS.serialize(), "content": power_levels.serialize(), }, { "type": str(StateBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }, { # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec "type": str(StateHalfShotBridge), "state_key": self.bridge_info_state_key, "content": self.bridge_info, }] create_invites = [] if config["bridge.encryption.default"] and self.matrix.e2ee: self.encrypted = True initial_state.append({ "type": "m.room.encryption", "content": {"algorithm": "m.megolm.v1.aes-sha2"}, }) if direct: create_invites.append(self.az.bot_mxid) if direct and (self.encrypted or self.private_chat_portal_meta): self.title = puppet.displayname if config["appservice.community_id"]: initial_state.append({ "type": "m.room.related_groups", "content": {"groups": [config["appservice.community_id"]]}, }) creation_content = {} if not config["bridge.federate_rooms"]: creation_content["m.federate"] = False with self.backfill_lock: room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, is_direct=direct, invitees=create_invites, name=self.title, topic=self.about, initial_state=initial_state, creation_content=creation_content) if not room_id: raise Exception(f"Failed to create room") if self.encrypted and self.matrix.e2ee and direct: try: await self.az.intent.ensure_joined(room_id) except Exception: self.log.warning(f"Failed to add bridge bot to new private chat {room_id}") self.mxid = room_id self.by_mxid[self.mxid] = self await self.save() await self.az.state_store.set_power_levels(self.mxid, power_levels) await user.register_portal(self) await self.invite_to_matrix(invites) update_room = self.loop.create_task(self.update_matrix_room( user, entity, direct, puppet, levels=power_levels, users=users)) if config["bridge.backfill.initial_limit"] > 0: self.log.debug("Initial backfill is enabled. Waiting for room members to sync " "and then starting backfill") await update_room try: await self.backfill(user, is_initial=True) except Exception: self.log.exception("Failed to backfill new portal") return self.mxid def _get_base_power_levels(self, levels: PowerLevelStateEventContent = None, entity: TypeChat = None) -> PowerLevelStateEventContent: levels = levels or PowerLevelStateEventContent() if self.peer_type == "user": overrides = config["bridge.initial_power_level_overrides.user"] levels.ban = overrides.get("ban", 100) levels.kick = overrides.get("kick", 100) levels.invite = overrides.get("invite", 100) levels.redact = overrides.get("redact", 0) levels.events[EventType.ROOM_NAME] = 0 levels.events[EventType.ROOM_AVATAR] = 0 levels.events[EventType.ROOM_TOPIC] = 0 levels.state_default = overrides.get("state_default", 0) levels.users_default = overrides.get("users_default", 0) levels.events_default = overrides.get("events_default", 0) else: overrides = config["bridge.initial_power_level_overrides.group"] dbr = entity.default_banned_rights if not dbr: self.log.debug(f"default_banned_rights is None in {entity}") dbr = ChatBannedRights(invite_users=True, change_info=True, pin_messages=True, send_stickers=False, send_messages=False, until_date=None) levels.ban = overrides.get("ban", 50) levels.kick = overrides.get("kick", 50) levels.redact = overrides.get("redact", 50) levels.invite = overrides.get("invite", 50 if dbr.invite_users else 0) levels.events[EventType.ROOM_ENCRYPTION] = 50 if self.matrix.e2ee else 99 levels.events[EventType.ROOM_TOMBSTONE] = 99 levels.events[EventType.ROOM_NAME] = 50 if dbr.change_info else 0 levels.events[EventType.ROOM_AVATAR] = 50 if dbr.change_info else 0 levels.events[EventType.ROOM_TOPIC] = 50 if dbr.change_info else 0 levels.events[EventType.ROOM_PINNED_EVENTS] = 50 if dbr.pin_messages else 0 levels.events[EventType.ROOM_POWER_LEVELS] = 75 levels.events[EventType.ROOM_HISTORY_VISIBILITY] = 75 levels.events[EventType.STICKER] = 50 if dbr.send_stickers else levels.events_default levels.state_default = overrides.get("state_default", 50) levels.users_default = overrides.get("users_default", 0) levels.events_default = ( overrides.get("events_default", 50 if (self.peer_type == "channel" and not entity.megagroup or entity.default_banned_rights.send_messages) else 0)) for evt_type, value in overrides.get("events", {}).items(): levels.events[EventType.find(evt_type)] = value levels.users = overrides.get("users", {}) if self.main_intent.mxid not in levels.users: levels.users[self.main_intent.mxid] = 100 return levels @classmethod def _get_level_from_participant(cls, participant: TypeParticipant, levels: PowerLevelStateEventContent) -> int: # TODO use the power level requirements to get better precision in channels if isinstance(participant, (ChatParticipantAdmin, ChannelParticipantAdmin)): return levels.state_default or 50 elif isinstance(participant, (ChatParticipantCreator, ChannelParticipantCreator)): return levels.get_user_level(cls.az.bot_mxid) - 5 return levels.users_default or 0 @staticmethod def _participant_to_power_levels(levels: PowerLevelStateEventContent, user: Union['u.User', p.Puppet], new_level: int, bot_level: int) -> bool: new_level = min(new_level, bot_level) user_level = levels.get_user_level(user.mxid) if user_level != new_level and user_level < bot_level: levels.users[user.mxid] = new_level return True return False async def _participants_to_power_levels(self, users: List[Union[TypeUser, TypeParticipant]], levels: PowerLevelStateEventContent) -> bool: bot_level = levels.get_user_level(self.main_intent.mxid) if bot_level < levels.get_event_level(EventType.ROOM_POWER_LEVELS): return False changed = False admin_power_level = min(75 if self.peer_type == "channel" else 50, bot_level) if levels.get_event_level(EventType.ROOM_POWER_LEVELS) != admin_power_level: changed = True levels.events[EventType.ROOM_POWER_LEVELS] = admin_power_level for user in users: # The User objects we get from TelegramClient.get_participants have a custom # participant property participant = getattr(user, "participant", user) puppet = p.Puppet.get(TelegramID(participant.user_id)) user = u.User.get_by_tgid(TelegramID(participant.user_id)) new_level = self._get_level_from_participant(participant, levels) if user: await user.register_portal(self) changed = self._participant_to_power_levels(levels, user, new_level, bot_level) or changed if puppet: changed = self._participant_to_power_levels(levels, puppet, new_level, bot_level) or changed return changed async def update_power_levels(self, users: List[Union[TypeUser, TypeParticipant]], levels: PowerLevelStateEventContent = None) -> None: if not levels: levels = await self.main_intent.get_power_levels(self.mxid) if await self._participants_to_power_levels(users, levels): await self.main_intent.set_power_levels(self.mxid, levels) async def _add_bot_chat(self, bot: User) -> None: if self.bot and bot.id == self.bot.tgid: self.bot.add_chat(self.tgid, self.peer_type) return user = u.User.get_by_tgid(TelegramID(bot.id)) if user and user.is_bot: await user.register_portal(self) async def _sync_telegram_users(self, source: 'AbstractUser', users: List[User]) -> None: allowed_tgids = set() skip_deleted = config["bridge.skip_deleted_members"] for entity in users: puppet = p.Puppet.get(TelegramID(entity.id)) if entity.bot: await self._add_bot_chat(entity) allowed_tgids.add(entity.id) await puppet.update_info(source, entity) if skip_deleted and entity.deleted: continue await puppet.intent_for(self).ensure_joined(self.mxid) user = u.User.get_by_tgid(TelegramID(entity.id)) if user: await self.invite_to_matrix(user.mxid) # We can't trust the member list if any of the following cases is true: # * There are close to 10 000 users, because Telegram might not be sending all members. # * The member sync count is limited, because then we might ignore some members. # * It's a channel, because non-admins don't have access to the member list. trust_member_list = ((len(allowed_tgids) < 9900 if self.max_initial_member_sync < 0 else len(allowed_tgids) < self.max_initial_member_sync - 10) and (self.megagroup or self.peer_type != "channel")) if not trust_member_list: return for user_mxid in await self.main_intent.get_room_members(self.mxid): if user_mxid == self.az.bot_mxid: continue puppet_id = p.Puppet.get_id_from_mxid(user_mxid) if puppet_id: if puppet_id in allowed_tgids: continue if self.bot and puppet_id == self.bot.tgid: self.bot.remove_chat(self.tgid) try: await self.main_intent.kick_user(self.mxid, user_mxid, "User had left this Telegram chat.") except MForbidden: pass continue mx_user = u.User.get_by_mxid(user_mxid, create=False) if mx_user: if mx_user.tgid in allowed_tgids: continue if mx_user.is_bot: await mx_user.unregister_portal(*self.tgid_full) if not self.has_bot: try: await self.main_intent.kick_user(self.mxid, mx_user.mxid, "You had left this Telegram chat.") except MForbidden: pass async def _add_telegram_user(self, user_id: TelegramID, source: Optional['AbstractUser'] = None ) -> None: puppet = p.Puppet.get(user_id) if source: entity: User = await source.client.get_entity(PeerUser(user_id)) await puppet.update_info(source, entity) await puppet.intent_for(self).ensure_joined(self.mxid) user = u.User.get_by_tgid(user_id) if user: await user.register_portal(self) await self.invite_to_matrix(user.mxid) async def _delete_telegram_user(self, user_id: TelegramID, sender: p.Puppet) -> None: puppet = p.Puppet.get(user_id) user = u.User.get_by_tgid(user_id) kick_message = (f"Kicked by {sender.displayname}" if sender and sender.tgid != puppet.tgid else "Left Telegram chat") if sender.tgid != puppet.tgid: try: await sender.intent_for(self).kick_user(self.mxid, puppet.mxid) except MForbidden: await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message) else: await puppet.intent_for(self).leave_room(self.mxid) if user: await user.unregister_portal(*self.tgid_full) if sender.tgid != puppet.tgid: try: await sender.intent_for(self).kick_user(self.mxid, puppet.mxid) return except MForbidden: pass try: await self.main_intent.kick_user(self.mxid, user.mxid, kick_message) except MForbidden as e: self.log.warning(f"Failed to kick {user.mxid}: {e}") async def update_info(self, user: 'AbstractUser', entity: TypeChat = None) -> None: if self.peer_type == "user": self.log.warning("Called update_info() for direct chat portal") return changed = False self.log.debug("Updating info") try: if not entity: entity = await self.get_entity(user) self.log.trace("Fetched data: %s", entity) if self.peer_type == "channel": changed = self.megagroup != entity.megagroup or changed self.megagroup = entity.megagroup changed = await self._update_username(entity.username) or changed if hasattr(entity, "about"): changed = self._update_about(entity.about) or changed changed = await self._update_title(entity.title) or changed if isinstance(entity.photo, ChatPhoto): changed = await self._update_avatar(user, entity.photo) or changed except Exception: self.log.exception(f"Failed to update info from source {user.tgid}") if changed: await self.save() await self.update_bridge_info() async def _update_username(self, username: str, save: bool = False) -> bool: if self.username == username: return False if self.username: await self.main_intent.remove_room_alias(self.alias_localpart) self.username = username or None if self.username: await self.main_intent.add_room_alias(self.mxid, self.alias_localpart, override=True) if self.public_portals: await self.main_intent.set_join_rule(self.mxid, JoinRule.PUBLIC) else: await self.main_intent.set_join_rule(self.mxid, JoinRule.INVITE) if save: await self.save() return True async def _try_set_state(self, sender: Optional['p.Puppet'], evt_type: EventType, content: StateEventContent) -> None: if sender: try: intent = sender.intent_for(self) if sender.is_real_user: content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name await intent.send_state_event(self.mxid, evt_type, content) except MForbidden: await self.main_intent.send_state_event(self.mxid, evt_type, content) else: await self.main_intent.send_state_event(self.mxid, evt_type, content) async def _update_about(self, about: str, sender: Optional['p.Puppet'] = None, save: bool = False) -> bool: if self.about == about: return False self.about = about await self._try_set_state(sender, EventType.ROOM_TOPIC, RoomTopicStateEventContent(topic=self.about)) if save: await self.save() return True async def _update_title(self, title: str, sender: Optional['p.Puppet'] = None, save: bool = False) -> bool: if self.title == title: return False self.title = title await self._try_set_state(sender, EventType.ROOM_NAME, RoomNameStateEventContent(name=self.title)) if save: await self.save() return True async def _update_avatar(self, user: 'AbstractUser', photo: TypeChatPhoto, sender: Optional['p.Puppet'] = None, save: bool = False) -> bool: if isinstance(photo, (ChatPhoto, UserProfilePhoto)): loc = InputPeerPhotoFileLocation( peer=await self.get_input_entity(user), photo_id=photo.photo_id, big=True ) photo_id = str(photo.photo_id) elif isinstance(photo, Photo): loc, _ = self._get_largest_photo_size(photo) photo_id = str(loc.id) elif isinstance(photo, (UserProfilePhotoEmpty, ChatPhotoEmpty, PhotoEmpty, type(None))): photo_id = "" loc = None else: raise ValueError(f"Unknown photo type {type(photo)}") if self.peer_type == "user" and not photo_id and not config["bridge.allow_avatar_remove"]: return False if self.photo_id != photo_id: if not photo_id: await self._try_set_state(sender, EventType.ROOM_AVATAR, RoomAvatarStateEventContent(url=None)) self.photo_id = "" self.avatar_url = None if save: await self.save() return True file = await util.transfer_file_to_matrix(user.client, self.main_intent, loc) if file: await self._try_set_state(sender, EventType.ROOM_AVATAR, RoomAvatarStateEventContent(url=file.mxc)) self.photo_id = photo_id self.avatar_url = file.mxc if save: await self.save() return True return False @staticmethod def _filter_participants(users: List[TypeUser], participants: List[TypeParticipant] ) -> Iterable[TypeUser]: participant_map = {part.user_id: part for part in participants if not isinstance(part, ChannelParticipantBanned)} for user in users: try: user.participant = participant_map[user.id] except KeyError: pass else: yield user async def _get_channel_users(self, user: 'AbstractUser', entity: InputChannel, limit: int ) -> List[TypeUser]: if 0 < limit <= 200: response = await user.client(GetParticipantsRequest( entity, ChannelParticipantsRecent(), offset=0, limit=limit, hash=0)) return list(self._filter_participants(response.users, response.participants)) elif limit > 200 or limit == -1: users: List[TypeUser] = [] offset = 0 remaining_quota = limit if limit > 0 else 1000000 query = (ChannelParticipantsSearch("") if limit == -1 else ChannelParticipantsRecent()) while True: if remaining_quota <= 0: break response = await user.client(GetParticipantsRequest( entity, query, offset=offset, limit=min(remaining_quota, 200), hash=0)) if not response.users: break users += self._filter_participants(response.users, response.participants) offset += len(response.participants) remaining_quota -= len(response.participants) return users async def _get_users(self, user: 'AbstractUser', entity: Union[TypeInputPeer, InputUser, TypeChat, TypeUser, InputChannel] ) -> List[TypeUser]: limit = self.max_initial_member_sync if self.peer_type == "chat": chat = await user.client(GetFullChatRequest(chat_id=self.tgid)) return list( self._filter_participants(chat.users, chat.full_chat.participants.participants) )[:limit] elif self.peer_type == "channel": if not self.megagroup and not self.sync_channel_members: return [] if limit == 0: return [] try: return await self._get_channel_users(user, entity, limit) except ChatAdminRequiredError: return [] elif self.peer_type == "user": return [entity] else: raise RuntimeError(f"Unexpected peer type {self.peer_type}") # endregion async def _send_delivery_receipt(self, event_id: EventID, room_id: Optional[RoomID] = None ) -> None: # TODO maybe check if the bot is in the room rather than assuming based on self.encrypted if event_id and config["bridge.delivery_receipts"] and (self.encrypted or self.peer_type != "user"): try: await self.az.intent.mark_read(room_id or self.mxid, event_id) except Exception: self.log.exception("Failed to send delivery receipt for %s", event_id) def init(context: Context) -> None: global config config = context.config