Add command to ban relaybot users from Telegram

Fixes #357
Closes #819
This commit is contained in:
Tulir Asokan
2022-08-14 14:07:48 +03:00
parent 2ec89bc57e
commit 2cf9205cda
+162 -71
View File
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge # mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan # Copyright (C) 2022 Tulir Asokan
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@@ -13,8 +13,11 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Awaitable, Callable, Dict, List, Optional, Tuple from __future__ import annotations
from typing import Awaitable, Callable, Literal
import logging import logging
import time
from telethon.errors import ChannelInvalidError, ChannelPrivateError from telethon.errors import ChannelInvalidError, ChannelPrivateError
from telethon.tl.functions.channels import GetChannelsRequest, GetParticipantRequest from telethon.tl.functions.channels import GetChannelsRequest, GetParticipantRequest
@@ -35,31 +38,58 @@ from telethon.tl.types import (
PeerChannel, PeerChannel,
PeerChat, PeerChat,
PeerUser, PeerUser,
TypeChannelParticipant,
TypeChatParticipant,
TypeInputPeer,
TypePeer, TypePeer,
UpdateNewChannelMessage, UpdateNewChannelMessage,
UpdateNewMessage, UpdateNewMessage,
User, User,
) )
from telethon.utils import add_surrogate, del_surrogate
from mautrix.errors import MBadState, MForbidden
from mautrix.types import UserID from mautrix.types import UserID
from . import portal as po, puppet as pu, user as u from . import portal as po, puppet as pu, user as u
from .abstract_user import AbstractUser from .abstract_user import AbstractUser
from .db import BotChat from .db import BotChat, Message as DBMessage
from .types import TelegramID from .types import TelegramID
ReplyFunc = Callable[[str], Awaitable[Message]] ReplyFunc = Callable[[str], Awaitable[Message]]
TelegramAdminPermission = Literal[
"change_info",
"post_messages",
"edit_messages",
"delete_messages",
"ban_users",
"invite_users",
"pin_messages",
"add_admins",
"anonymous",
"manage_call",
"other",
]
class Bot(AbstractUser): class Bot(AbstractUser):
log: logging.Logger = logging.getLogger("mau.user.bot") log: logging.Logger = logging.getLogger("mau.user.bot")
token: str token: str
chats: Dict[int, str] chats: dict[int, str]
tg_whitelist: List[int] tg_whitelist: list[int]
whitelist_group_admins: bool whitelist_group_admins: bool
_me_info: Optional[User] _me_info: User | None
_me_mxid: Optional[UserID] _me_mxid: UserID | None
_admin_cache: dict[
tuple[int, int],
tuple[ChatParticipantAdmin | ChatParticipantCreator | None, float],
]
required_permissions: dict[str, TelegramAdminPermission] = {
"portal": None,
"invite": "invite_users",
"mxban": "ban_users",
}
def __init__(self, token: str) -> None: def __init__(self, token: str) -> None:
super().__init__() super().__init__()
@@ -73,6 +103,7 @@ class Bot(AbstractUser):
self.is_relaybot = True self.is_relaybot = True
self.is_bot = True self.is_bot = True
self.chats = {} self.chats = {}
self._admin_cache = {}
self.tg_whitelist = [] self.tg_whitelist = []
self.whitelist_group_admins = ( self.whitelist_group_admins = (
self.config["bridge.relaybot.whitelist_group_admins"] or False self.config["bridge.relaybot.whitelist_group_admins"] or False
@@ -80,7 +111,7 @@ class Bot(AbstractUser):
self._me_info = None self._me_info = None
self._me_mxid = None self._me_mxid = None
async def get_me(self, use_cache: bool = True) -> Tuple[User, UserID]: async def get_me(self, use_cache: bool = True) -> tuple[User, UserID]:
if not use_cache or not self._me_mxid: if not use_cache or not self._me_mxid:
self._me_info = await self.client.get_me() self._me_info = await self.client.get_me()
self._me_mxid = pu.Puppet.get_mxid_from_id(TelegramID(self._me_info.id)) self._me_mxid = pu.Puppet.get_mxid_from_id(TelegramID(self._me_info.id))
@@ -98,7 +129,7 @@ class Bot(AbstractUser):
if isinstance(user_id, int): if isinstance(user_id, int):
self.tg_whitelist.append(user_id) self.tg_whitelist.append(user_id)
async def start(self, delete_unless_authenticated: bool = False) -> "Bot": async def start(self, delete_unless_authenticated: bool = False) -> Bot:
self.chats = {chat.id: chat.type for chat in await BotChat.all()} self.chats = {chat.id: chat.type for chat in await BotChat.all()}
await super().start(delete_unless_authenticated) await super().start(delete_unless_authenticated)
if not await self.is_logged_in(): if not await self.is_logged_in():
@@ -148,7 +179,44 @@ class Bot(AbstractUser):
pass pass
await BotChat.delete_by_id(chat_id) await BotChat.delete_by_id(chat_id)
async def _can_use_commands(self, chat: TypePeer, tgid: TelegramID) -> bool: async def _get_admin_participant(
self, chat: TypePeer | TypeInputPeer, tgid: TelegramID
) -> TypeChatParticipant | TypeChannelParticipant | None:
chan_id = chat.channel_id if isinstance(chat, PeerChannel) else chat.chat_id
try:
cached, created = self._admin_cache[chan_id, tgid]
if created + 60 < time.time():
return cached
except KeyError:
pass
if isinstance(chat, PeerChannel):
p = await self.client(GetParticipantRequest(chat, tgid))
pcp = p.participant
self._admin_cache[chat.channel_id, tgid] = (pcp, time.time())
return pcp
elif isinstance(chat, PeerChat):
chat = await self.client(GetFullChatRequest(chat.chat_id))
participants = chat.full_chat.participants.participants
for p in participants:
self._admin_cache[chat.channel_id, tgid] = (p, time.time())
if p.user_id == tgid:
return p
return None
@staticmethod
def _has_participant_permission(
pcp: TypeChatParticipant | TypeChannelParticipant | None,
permission: TelegramAdminPermission | None,
) -> bool:
if isinstance(pcp, (ChannelParticipantCreator, ChannelParticipantAdmin)):
return permission is None or getattr(pcp.admin_rights, permission, False)
elif isinstance(pcp, (ChatParticipantCreator, ChatParticipantAdmin)):
return True
return False
async def _can_use_commands(
self, chat: TypePeer, tgid: TelegramID, permission: TelegramAdminPermission | None = None
) -> bool:
if tgid in self.tg_whitelist: if tgid in self.tg_whitelist:
return True return True
@@ -158,22 +226,20 @@ class Bot(AbstractUser):
return True return True
if self.whitelist_group_admins: if self.whitelist_group_admins:
if isinstance(chat, PeerChannel): pcp = await self._get_admin_participant(chat, tgid)
p = await self.client(GetParticipantRequest(chat, tgid)) return self._has_participant_permission(pcp, permission)
return isinstance(
p.participant, (ChannelParticipantCreator, ChannelParticipantAdmin)
)
elif isinstance(chat, PeerChat):
chat = await self.client(GetFullChatRequest(chat.chat_id))
participants = chat.full_chat.participants.participants
for p in participants:
if p.user_id == tgid:
return isinstance(p, (ChatParticipantCreator, ChatParticipantAdmin))
return False return False
async def check_can_use_commands(self, event: Message, reply: ReplyFunc) -> bool: async def check_can_use_command(self, event: Message, reply: ReplyFunc, command: str) -> bool:
# FIXME event.from_id is not int if command not in self.required_permissions:
if not await self._can_use_commands(event.to_id, TelegramID(event.from_id)): # Unknown command
return False
elif not isinstance(event.from_id, PeerUser):
await reply("Channels can't use commands")
return False
elif not await self._can_use_commands(
event.to_id, TelegramID(event.from_id.user_id), self.required_permissions[command]
):
await reply("You do not have the permission to use that command.") await reply("You do not have the permission to use that command.")
return False return False
return True return True
@@ -215,9 +281,48 @@ class Bot(AbstractUser):
f"Just invite [{displayname}](tg://user?id={user.tgid})" f"Just invite [{displayname}](tg://user?id={user.tgid})"
) )
else: else:
await portal.invite_to_matrix(user.mxid) try:
await portal.invite_to_matrix(user.mxid)
except MBadState:
try:
await portal.main_intent.unban_user(
portal.mxid, user.mxid, reason="Invited from Telegram"
)
except Exception:
return await reply(f"Failed to unban `{user.mxid}` from the portal.")
await portal.invite_to_matrix(user.mxid)
return await reply(f"Unbanned and invited `{user.mxid}` to the portal.")
return await reply(f"Invited `{user.mxid}` to the portal.") return await reply(f"Invited `{user.mxid}` to the portal.")
async def handle_command_ban(
self, message: Message, portal: po.Portal, reply: ReplyFunc, reason: str
) -> Message:
if not message.reply_to:
return await reply("You must reply to a relaybot message when using that command")
reply_to_id = TelegramID(message.reply_to.reply_to_msg_id)
tg_space = portal.tgid if portal.peer_type == "channel" else self.tgid
msg = await DBMessage.get_one_by_tgid(reply_to_id, tg_space)
if not msg or msg.sender != self.tgid or not msg.sender_mxid:
return await reply("Target message is not a relayed message")
puppet = await pu.Puppet.get_by_peer(message.from_id)
try:
await puppet.intent_for(portal).ban_user(portal.mxid, msg.sender_mxid, reason)
except MForbidden as e:
self.log.warning(
f"Failed to ban {msg.sender_mxid} from {portal.mxid} as {puppet.mxid}: {e}, "
f"falling back to bridge bot"
)
reason_prefix = f"Banned by {puppet.displayname or puppet.tgid}"
reason = f"{reason_prefix}: {reason}" if reason else reason_prefix
try:
await self.az.intent.ban_user(portal.mxid, msg.sender_mxid, reason)
except MForbidden as e:
self.log.warning(
f"Failed to ban {msg.sender_mxid} from {portal.mxid} as bridge bot: {e}"
)
return await reply(f"Failed to ban `{msg.sender_mxid}`")
return await reply(f"Successfully banned `{msg.sender_mxid}`")
@staticmethod @staticmethod
def handle_command_id(message: Message, reply: ReplyFunc) -> Awaitable[Message]: def handle_command_id(message: Message, reply: ReplyFunc) -> Awaitable[Message]:
# Provide the prefixed ID to the user so that the user wouldn't need to specify whether the # Provide the prefixed ID to the user so that the user wouldn't need to specify whether the
@@ -235,53 +340,44 @@ class Bot(AbstractUser):
else: else:
return reply("Failed to find chat ID.") return reply("Failed to find chat ID.")
def match_command(self, text: str, command: str) -> bool: def parse_command(self, message: Message) -> tuple[str | None, str | None]:
text = text.lower() if not message.entities or len(message.entities) < 1 or not message.message:
command = f"/{command.lower()}" return None, None
command_targeted = f"{command}@{self.tg_username.lower()}" cmd_entity = message.entities[0]
if not isinstance(cmd_entity, MessageEntityBotCommand) or cmd_entity.offset != 0:
return None, None
surrogated_text = add_surrogate(message.message)
command: str = del_surrogate(surrogated_text[: cmd_entity.length]).lower()
rest_of_message: str = ""
if len(surrogated_text) > cmd_entity.length + 1:
rest_of_message: str = del_surrogate(surrogated_text[cmd_entity.length + 1 :])
command, *target = command.split("@", 1)
if not command.startswith("/"):
return None, None
elif target and target[0] != self.tg_username.lower():
return None, None
return command[1:], rest_of_message
is_plain_command = text == command or text == command_targeted async def handle_command(self, message: Message, command: str, args: str) -> None:
if is_plain_command:
return True
is_arg_command = text.startswith(command + " ") or text.startswith(command_targeted + " ")
if is_arg_command:
return True
return False
async def handle_command(self, message: Message) -> None:
def reply(reply_text: str) -> Awaitable[Message]: def reply(reply_text: str) -> Awaitable[Message]:
return self.client.send_message(message.chat_id, reply_text, reply_to=message.id) return self.client.send_message(message.chat_id, reply_text, reply_to=message.id)
text = message.message if command == "start":
if self.match_command(text, "start"):
pcm = self.config["bridge.relaybot.private_chat.message"] pcm = self.config["bridge.relaybot.private_chat.message"]
if pcm: if pcm:
await reply(pcm) await reply(pcm)
return elif command == "id":
elif self.match_command(text, "id"):
await self.handle_command_id(message, reply) await self.handle_command_id(message, reply)
return elif not message.is_private:
elif message.is_private: if not await self.check_can_use_command(message, reply, command):
return
portal = await po.Portal.get_by_entity(message.to_id)
is_portal_cmd = self.match_command(text, "portal")
is_invite_cmd = self.match_command(text, "invite")
if is_portal_cmd or is_invite_cmd:
if not await self.check_can_use_commands(message, reply):
return return
if is_portal_cmd: portal = await po.Portal.get_by_entity(message.to_id)
if command == "portal":
await self.handle_command_portal(portal, reply) await self.handle_command_portal(portal, reply)
elif is_invite_cmd: elif command == "invite":
try: await self.handle_command_invite(portal, reply, mxid_input=UserID(args))
mxid = text[text.index(" ") + 1 :] elif command == "mxban":
except ValueError: await self.handle_command_ban(message, portal, reply, reason=args)
mxid = ""
await self.handle_command_invite(portal, reply, mxid_input=UserID(mxid))
async def handle_service_message(self, message: MessageService) -> None: async def handle_service_message(self, message: MessageService) -> None:
to_peer = message.to_id to_peer = message.to_id
@@ -310,15 +406,10 @@ class Bot(AbstractUser):
await self.handle_service_message(update.message) await self.handle_service_message(update.message)
return False return False
is_command = ( if isinstance(update.message, Message):
isinstance(update.message, Message) command, args = self.parse_command(update.message)
and update.message.entities if command:
and len(update.message.entities) > 0 await self.handle_command(update.message, command, args)
and isinstance(update.message.entities[0], MessageEntityBotCommand)
and update.message.entities[0].offset == 0
)
if is_command:
await self.handle_command(update.message)
return False return False
def is_in_chat(self, peer_id) -> bool: def is_in_chat(self, peer_id) -> bool: