Merge pull request #56 from tulir/asyncio

Switch to asyncio
This commit is contained in:
Tulir Asokan
2018-02-11 11:46:25 +02:00
committed by GitHub
14 changed files with 892 additions and 698 deletions
+1
View File
@@ -1,4 +1,5 @@
from .appservice import AppService from .appservice import AppService
from .errors import MatrixError, MatrixRequestError, IntentError
__version__ = "0.1.0" __version__ = "0.1.0"
__author__ = "Tulir Asokan <tulir@maunium.net>" __author__ = "Tulir Asokan <tulir@maunium.net>"
+16 -10
View File
@@ -47,8 +47,8 @@ class AppService:
self.log = (logging.getLogger(log) if isinstance(log, str) self.log = (logging.getLogger(log) if isinstance(log, str)
else log or logging.getLogger("mautrix_appservice")) else log or logging.getLogger("mautrix_appservice"))
self.query_user = query_user or (lambda user: None) self.query_user = query_user or self.default_query_handler
self.query_alias = query_alias or (lambda alias: None) self.query_alias = query_alias or self.default_query_handler
self.event_handlers = [] self.event_handlers = []
@@ -60,6 +60,9 @@ class AppService:
self.matrix_event_handler(self.update_state_store) self.matrix_event_handler(self.update_state_store)
async def default_query_handler(self, param):
return None
@property @property
def http_session(self): def http_session(self):
if self._http_session is None: if self._http_session is None:
@@ -80,10 +83,10 @@ class AppService:
def run(self, host="127.0.0.1", port=8080): def run(self, host="127.0.0.1", port=8080):
self._http_session = aiohttp.ClientSession(loop=self.loop) self._http_session = aiohttp.ClientSession(loop=self.loop)
self._intent = HTTPAPI(base_url=self.server, domain=self.domain, bot_mxid=self.bot_mxid, self._intent = HTTPAPI(base_url=self.server, domain=self.domain, bot_mxid=self.bot_mxid,
token=self.as_token, log=self.log, token=self.as_token, log=self.log, state_store=self.state_store,
state_store=self.state_store).bot_intent() client_session=self._http_session).bot_intent()
yield partial(aiohttp.web.run_app, self.app, host=host, port=port) yield self.loop.create_server(self.app.make_handler(), host, port)
self._intent = None self._intent = None
self._http_session.close() self._http_session.close()
@@ -107,7 +110,7 @@ class AppService:
user_id = request.match_info["userId"] user_id = request.match_info["userId"]
try: try:
response = self.query_user(user_id) response = await self.query_user(user_id)
except Exception: except Exception:
self.log.exception("Exception in user query handler") self.log.exception("Exception in user query handler")
return web.Response(status=500) return web.Response(status=500)
@@ -123,7 +126,7 @@ class AppService:
alias = request.match_info["alias"] alias = request.match_info["alias"]
try: try:
response = self.query_alias(alias) response = await self.query_alias(alias)
except Exception: except Exception:
self.log.exception("Exception in alias query handler") self.log.exception("Exception in alias query handler")
return web.Response(status=500) return web.Response(status=500)
@@ -154,7 +157,7 @@ class AppService:
return web.json_response({}) return web.json_response({})
def update_state_store(self, event): async def update_state_store(self, event):
event_type = event["type"] event_type = event["type"]
if event_type == "m.room.power_levels": if event_type == "m.room.power_levels":
self.state_store.set_power_levels(event["room_id"], event["content"]) self.state_store.set_power_levels(event["room_id"], event["content"])
@@ -163,12 +166,15 @@ class AppService:
event["content"]["membership"]) event["content"]["membership"])
def handle_matrix_event(self, event): def handle_matrix_event(self, event):
for handler in self.event_handlers: async def try_handle(handler):
try: try:
handler(event) await handler(event)
except Exception: except Exception:
self.log.exception("Exception in Matrix event handler") self.log.exception("Exception in Matrix event handler")
for handler in self.event_handlers:
asyncio.ensure_future(try_handle(handler), loop=self.loop)
def matrix_event_handler(self, func): def matrix_event_handler(self, func):
self.event_handlers.append(func) self.event_handlers.append(func)
return func return func
+38
View File
@@ -0,0 +1,38 @@
# -*- coding: future_fstrings -*-
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
class MatrixError(Exception):
"""A generic Matrix error. Specific errors will subclass this."""
pass
class IntentError(MatrixError):
def __init__(self, message, source):
super().__init__(message)
self.source = source
class MatrixRequestError(MatrixError):
""" The home server returned an error response. """
def __init__(self, code=0, text="", errcode=None, message=None):
super().__init__("%d: %s" % (code, text))
self.code = code
self.text = text
self.errcode = errcode
self.message = message
+272 -167
View File
@@ -14,26 +14,39 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from urllib.parse import quote
from time import time
from json.decoder import JSONDecodeError
from aiohttp.client_exceptions import ContentTypeError
import re import re
import json import json
import magic import magic
import urllib.request import asyncio
from matrix_client.api import MatrixHttpApi from .errors import MatrixError, MatrixRequestError, IntentError
from matrix_client.errors import MatrixRequestError
class HTTPAPI(MatrixHttpApi): class HTTPAPI:
def __init__(self, base_url, domain=None, bot_mxid=None, token=None, identity=None, log=None, def __init__(self, base_url, domain=None, bot_mxid=None, token=None, identity=None, log=None,
state_store=None): state_store=None, client_session=None, child=False):
super().__init__(base_url, token, identity) self.base_url = base_url
self.token = token
self.identity = identity
self.validate_cert = True
self.session = client_session
self.domain = domain self.domain = domain
self.bot_mxid = bot_mxid self.bot_mxid = bot_mxid
self.intent_log = log.getChild("intent") self._bot_intent = None
self.log = log.getChild("api")
self.validate_cert = True
self.state_store = state_store self.state_store = state_store
self.children = {}
if child:
self.log = log
else:
self.intent_log = log.getChild("intent")
self.log = log.getChild("api")
self.txn_id = 0
self.children = {}
def user(self, user): def user(self, user):
try: try:
@@ -44,45 +57,82 @@ class HTTPAPI(MatrixHttpApi):
return child return child
def bot_intent(self): def bot_intent(self):
if self._bot_intent:
return self._bot_intent
return IntentAPI(self.bot_mxid, self, state_store=self.state_store, log=self.intent_log) return IntentAPI(self.bot_mxid, self, state_store=self.state_store, log=self.intent_log)
def intent(self, user): def intent(self, user):
return IntentAPI(user, self.user(user), self, self.state_store, self.intent_log) return IntentAPI(user, self.user(user), self.bot_intent(), self.state_store,
self.intent_log)
def _send(self, method, path, content=None, query_params=None, headers=None, async def _send(self, method, endpoint, content, query_params, headers):
api_path="/_matrix/client/r0"): while True:
if not query_params: query_params["access_token"] = self.token
query_params = {} request = self.session.request(method, endpoint, params=query_params,
query_params["user_id"] = self.identity data=content, headers=headers)
async with request as response:
if response.status < 200 or response.status >= 300:
errcode = message = None
try:
response_data = await response.json()
errcode = response_data["errcode"]
message = response_data["error"]
except (JSONDecodeError, ContentTypeError, KeyError):
pass
raise MatrixRequestError(code=response.status, text=await response.text(),
errcode=errcode, message=message)
if response.status == 429:
await asyncio.sleep(response.json()["retry_after_ms"] / 1000)
else:
return await response.json()
def _log_request(self, method, path, content, query_params):
log_content = content if not isinstance(content, bytes) else f"<{len(content)} bytes>" log_content = content if not isinstance(content, bytes) else f"<{len(content)} bytes>"
self.log.debug("%s %s %s", method, path, log_content) log_content = log_content or "(No content)"
return super()._send(method, path, content, query_params, headers or {}, api_path=api_path) query_identity = query_params["user_id"] if "user_id" in query_params else "No identity"
self.log.debug("%s %s %s as user %s", method, path, log_content, query_identity)
def create_room(self, alias=None, is_public=False, name=None, topic=None, is_direct=False, def request(self, method, path, content=None, query_params=None, headers=None,
invitees=(), initial_state=None): api_path="/_matrix/client/r0"):
content = { content = content or {}
"visibility": "public" if is_public else "private" query_params = query_params or {}
} headers = headers or {}
if alias:
content["room_alias_name"] = alias
if invitees:
content["invite"] = invitees
if name:
content["name"] = name
if topic:
content["topic"] = topic
if initial_state:
content["initial_state"] = initial_state
content["is_direct"] = is_direct
return self._send("POST", "/createRoom", content) method = method.upper()
if method not in ["GET", "PUT", "DELETE", "POST"]:
raise MatrixError("Unsupported HTTP method: %s" % method)
def set_presence(self, status="online", user=None): if "Content-Type" not in headers:
content = { headers["Content-Type"] = "application/json"
"presence": status if headers["Content-Type"] == "application/json":
} content = json.dumps(content)
user = user or self.identity
return self._send("PUT", f"/presence/{user}/status", content) if self.identity:
query_params["user_id"] = self.identity
self._log_request(method, path, content, query_params)
endpoint = self.base_url + api_path + path
return self._send(method, endpoint, content, query_params, headers or {})
def get_download_url(self, mxcurl):
if mxcurl.startswith('mxc://'):
return f"{self.base_url}/_matrix/media/r0/download/{mxcurl[6:]}"
else:
raise ValueError("MXC URL did not begin with 'mxc://'")
async def get_display_name(self, user_id):
content = await self.request("GET", f"/profile/{user_id}/displayname")
return content.get('displayname', None)
async def get_avatar_url(self, user_id):
content = await self.request("GET", f"/profile/{user_id}/avatar_url")
return content.get('avatar_url', None)
async def get_room_id(self, room_alias):
content = await self.request("GET", f"/directory/room/{quote(room_alias)}")
return content.get("room_id", None)
def set_typing(self, room_id, is_typing=True, timeout=5000, user=None): def set_typing(self, room_id, is_typing=True, timeout=5000, user=None):
content = { content = {
@@ -91,18 +141,13 @@ class HTTPAPI(MatrixHttpApi):
if is_typing: if is_typing:
content["timeout"] = timeout content["timeout"] = timeout
user = user or self.identity user = user or self.identity
return self._send("PUT", f"/rooms/{room_id}/typing/{user}", content) return self.request("PUT", f"/rooms/{room_id}/typing/{user}", content)
class ChildHTTPAPI(HTTPAPI): class ChildHTTPAPI(HTTPAPI):
def __init__(self, user, parent): def __init__(self, user, parent):
self.base_url = parent.base_url super().__init__(parent.base_url, parent.domain, parent.bot_mxid, parent.token, user,
self.token = parent.token parent.log, parent.state_store, parent.session, child=True)
self.identity = user
self.validate_cert = True
self.validate_cert = parent.validate_cert
self.log = parent.log
self.domain = parent.domain
self.parent = parent self.parent = parent
@property @property
@@ -114,29 +159,6 @@ class ChildHTTPAPI(HTTPAPI):
self.parent.txn_id = value self.parent.txn_id = value
class IntentError(Exception):
def __init__(self, message, source):
super().__init__(message)
self.source = source
def matrix_error_code(err):
try:
data = json.loads(err.content)
return data["errcode"]
except Exception:
return err.content
def matrix_error_data(err):
try:
data = json.loads(err.content)
return data["errcode"], data["error"]
except Exception:
return err.content
class IntentAPI: class IntentAPI:
mxid_regex = re.compile("@(.+):(.+)") mxid_regex = re.compile("@(.+):(.+)")
@@ -158,62 +180,86 @@ class IntentAPI:
return self.client.intent(user) return self.client.intent(user)
else: else:
self.log.warning("Called IntentAPI#user() of child intent object.") self.log.warning("Called IntentAPI#user() of child intent object.")
return self.bot.intent(user) return self.bot.client.intent(user)
# region User actions # region User actions
def get_joined_rooms(self): async def get_joined_rooms(self):
self.ensure_registered() await self.ensure_registered()
response = self.client._send("GET", "/joined_rooms") response = await self.client.request("GET", "/joined_rooms")
return response["joined_rooms"] return response["joined_rooms"]
def set_display_name(self, name): async def set_display_name(self, name):
self.ensure_registered() await self.ensure_registered()
return self.client.set_display_name(self.mxid, name) content = {"displayname": name}
return await self.client.request("PUT", f"/profile/{self.mxid}/displayname", content)
def set_presence(self, status="online"): async def set_presence(self, status="online"):
self.ensure_registered() await self.ensure_registered()
return self.client.set_presence(status) content = {
"presence": status
}
return await self.client.request("PUT", f"/presence/{self.mxid}/status", content)
def set_avatar(self, url): async def set_avatar(self, url):
self.ensure_registered() await self.ensure_registered()
return self.client.set_avatar_url(self.mxid, url) content = {"avatar_url": url}
return await self.client.request("PUT", f"/profile/{self.mxid}/avatar_url", content)
def upload_file(self, data, mime_type=None): async def upload_file(self, data, mime_type=None):
self.ensure_registered() await self.ensure_registered()
mime_type = mime_type or magic.from_buffer(data, mime=True) mime_type = mime_type or magic.from_buffer(data, mime=True)
return self.client.media_upload(data, mime_type) return await self.client.request("POST", "", content=data,
headers={"Content-Type": mime_type},
api_path="/_matrix/media/r0/upload")
def download_file(self, url): async def download_file(self, url):
self.ensure_registered() await self.ensure_registered()
url = self.client.get_download_url(url) url = self.client.get_download_url(url)
response = urllib.request.urlopen(url) async with self.client.session.get(url) as response:
return response.read() return await response.read()
# endregion # endregion
# region Room actions # region Room actions
def create_room(self, alias=None, is_public=False, name=None, topic=None, is_direct=False, async def create_room(self, alias=None, is_public=False, name=None, topic=None,
invitees=(), initial_state=None): is_direct=False, invitees=None, initial_state=None):
self.ensure_registered() await self.ensure_registered()
return self.client.create_room(alias, is_public, name, topic, is_direct, invitees, content = {
initial_state or {}) "visibility": "public" if is_public else "private",
"is_direct": is_direct,
}
if alias:
content["room_alias_name"] = alias
if invitees:
content["invite"] = invitees
if name:
content["name"] = name
if topic:
content["topic"] = topic
if initial_state:
content["initial_state"] = initial_state
def invite(self, room_id, user_id, check_cache=False): return await self.client.request("POST", "/createRoom", content)
self.ensure_joined(room_id)
def _invite_direct(self, room_id, user_id):
content = {"user_id": user_id}
return self.client.request("POST", "/rooms/" + room_id + "/invite", content)
async def invite(self, room_id, user_id, check_cache=False):
await self.ensure_joined(room_id)
try: try:
ok_states = {"invite", "join"} ok_states = {"invite", "join"}
do_invite = (not check_cache do_invite = (not check_cache
or self.state_store.get_membership(room_id, user_id) not in ok_states) or self.state_store.get_membership(room_id, user_id) not in ok_states)
if do_invite: if do_invite:
response = self.client.invite_user(room_id, user_id) response = await self._invite_direct(room_id, user_id)
self.state_store.invited(room_id, user_id) self.state_store.invited(room_id, user_id)
return response return response
except MatrixRequestError as e: except MatrixRequestError as e:
code, message = matrix_error_data(e) if e.errcode != "M_FORBIDDEN":
if code != "M_FORBIDDEN":
raise IntentError(f"Failed to invite {user_id} to {room_id}", e) raise IntentError(f"Failed to invite {user_id} to {room_id}", e)
if "is already in the room" in message: if "is already in the room" in e.message:
self.state_store.joined(room_id, user_id) self.state_store.joined(room_id, user_id)
def set_room_avatar(self, room_id, avatar_url, info=None): def set_room_avatar(self, room_id, avatar_url, info=None):
@@ -224,38 +270,43 @@ class IntentAPI:
content["info"] = info content["info"] = info
return self.send_state_event(room_id, "m.room.avatar", content) return self.send_state_event(room_id, "m.room.avatar", content)
def add_room_alias(self, room_id, alias): async def add_room_alias(self, room_id, localpart):
self.ensure_registered() await self.ensure_registered()
self.client.set_room_alias(room_id, f"#{alias}:{self.client.domain}") content = {"room_id": room_id}
alias = f"#{localpart}:{self.client.domain}"
return await self.client.request("PUT", f"/directory/room/{quote(alias)}", content)
def remove_room_alias(self, alias): async def remove_room_alias(self, localpart):
self.ensure_registered() await self.ensure_registered()
self.client.remove_room_alias(f"#{alias}:{self.client.domain}") alias = f"#{localpart}:{self.client.domain}"
return await self.client.request("DELETE", f"/directory/room/{quote(alias)}")
def set_room_name(self, room_id, name): def set_room_name(self, room_id, name):
self.ensure_joined(room_id) body = {"name": name}
self._ensure_has_power_level_for(room_id, "m.room.name") return self.send_state_event(room_id, "m.room.name", body)
return self.client.set_room_name(room_id, name)
def get_power_levels(self, room_id, ignore_cache=False): async def get_power_levels(self, room_id, ignore_cache=False):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
if not ignore_cache: if not ignore_cache:
try: try:
return self.state_store.get_power_levels(room_id) return self.state_store.get_power_levels(room_id)
except KeyError: except KeyError:
pass pass
levels = self.client.get_power_levels(room_id) levels = await self.client.request("GET",
f"/rooms/{quote(room_id)}/state/m.room.power_levels")
self.state_store.set_power_levels(room_id, levels) self.state_store.set_power_levels(room_id, levels)
return levels return levels
def set_power_levels(self, room_id, content): async def set_power_levels(self, room_id, content):
response = self.send_state_event(room_id, "m.room.power_levels", content) if "events" not in content:
content["events"] = {}
response = await self.send_state_event(room_id, "m.room.power_levels", content)
self.state_store.set_power_levels(room_id, content) self.state_store.set_power_levels(room_id, content)
return response return response
def get_pinned_messages(self, room_id): async def get_pinned_messages(self, room_id):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
response = self.client._send("GET", f"/rooms/{room_id}/state/m.room.pinned_events") response = await self.client.request("GET", f"/rooms/{room_id}/state/m.room.pinned_events")
return response["content"]["pinned"] return response["content"]["pinned"]
def set_pinned_messages(self, room_id, events): def set_pinned_messages(self, room_id, events):
@@ -263,29 +314,35 @@ class IntentAPI:
"pinned": events "pinned": events
}) })
def pin_message(self, room_id, event_id): async def pin_message(self, room_id, event_id):
events = self.get_pinned_messages(room_id) events = await self.get_pinned_messages(room_id)
if event_id not in events: if event_id not in events:
events.append(event_id) events.append(event_id)
self.set_pinned_messages(room_id, events) await self.set_pinned_messages(room_id, events)
def unpin_message(self, room_id, event_id): async def unpin_message(self, room_id, event_id):
events = self.get_pinned_messages(room_id) events = await self.get_pinned_messages(room_id)
if event_id in events: if event_id in events:
events.remove(event_id) events.remove(event_id)
self.set_pinned_messages(room_id, events) await self.set_pinned_messages(room_id, events)
def get_event(self, room_id, event_id): async def get_event(self, room_id, event_id):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
return self.client._send("GET", f"/rooms/{room_id}/event/{event_id}") return await self.client.request("GET", f"/rooms/{room_id}/event/{event_id}")
def set_typing(self, room_id, is_typing=True, timeout=5000): async def set_typing(self, room_id, is_typing=True, timeout=5000):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
return self.client.set_typing(room_id, is_typing, timeout) content = {
"typing": is_typing
}
if is_typing:
content["timeout"] = timeout
return await self.client.request("PUT", f"/rooms/{room_id}/typing/{self.mxid}", content)
def mark_read(self, room_id, event_id): async def mark_read(self, room_id, event_id):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
return self.client._send("POST", f"/rooms/{room_id}/receipt/m.read/{event_id}", content={}) return await self.client.request("POST", f"/rooms/{room_id}/receipt/m.read/{event_id}",
content={})
def send_notice(self, room_id, text, html=None): def send_notice(self, room_id, text, html=None):
return self.send_text(room_id, text, html, "m.notice") return self.send_text(room_id, text, html, "m.notice")
@@ -323,79 +380,127 @@ class IntentAPI:
def send_message(self, room_id, body): def send_message(self, room_id, body):
return self.send_event(room_id, "m.room.message", body) return self.send_event(room_id, "m.room.message", body)
def error_and_leave(self, room_id, text, html=None): async def error_and_leave(self, room_id, text, html=None):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
self.send_notice(room_id, text, html=html) await self.send_notice(room_id, text, html=html)
self.leave_room(room_id) await self.leave_room(room_id)
def kick(self, room_id, user_id, message): def kick(self, room_id, user_id, message):
self.ensure_joined(room_id) return self.set_membership(room_id, user_id, "leave", message)
return self.client.kick_user(room_id, user_id, message)
def send_event(self, room_id, event_type, body, txn_id=None): def get_membership(self, room_id, user_id):
self.ensure_joined(room_id) return self.get_state_event(room_id, "m.room.member", state_key=user_id)
self._ensure_has_power_level_for(room_id, event_type)
return self.client.send_message_event(room_id, event_type, body, txn_id)
def send_state_event(self, room_id, event_type, body, state_key=""): def set_membership(self, room_id, user_id, membership, reason="", profile=None):
self.ensure_joined(room_id) body = {
self._ensure_has_power_level_for(room_id, event_type) "membership": membership,
return self.client.send_state_event(room_id, event_type, body, state_key) "reason": reason
}
profile = profile or {}
if "displayname" in profile:
body["displayname"] = profile["displayname"]
if "avatar_url" in profile:
body["avatar_url"] = profile["avatar_url"]
return self.send_state_event(room_id, "m.room.member", body, state_key=user_id)
@staticmethod
def _get_event_url(room_id, event_type, txn_id):
return f"/rooms/{quote(room_id)}/send/{quote(event_type)}/{quote(txn_id)}"
async def send_event(self, room_id, event_type, content, txn_id=None):
await self.ensure_joined(room_id)
await self._ensure_has_power_level_for(room_id, event_type)
txn_id = txn_id or str(self.client.txn_id) + str(int(time() * 1000))
self.client.txn_id += 1
url = self._get_event_url(room_id, event_type, txn_id)
return await self.client.request("PUT", url, content)
@staticmethod
def _get_state_url(room_id, event_type, state_key=""):
url = f"/rooms/{quote(room_id)}/state/{quote(event_type)}"
if state_key:
url += f"/{quote(state_key)}"
return url
async def send_state_event(self, room_id, event_type, content, state_key=""):
await self.ensure_joined(room_id)
await self._ensure_has_power_level_for(room_id, event_type)
url = self._get_state_url(room_id, event_type, state_key)
return await self.client.request("PUT", url, content)
async def get_state_event(self, room_id, event_type, state_key=""):
await self.ensure_joined(room_id)
url = self._get_state_url(room_id, event_type, state_key)
return await self.client.request("GET", url)
def join_room(self, room_id): def join_room(self, room_id):
return self.ensure_joined(room_id, ignore_cache=True) return self.ensure_joined(room_id, ignore_cache=True)
def _join_room_direct(self, room):
return self.client.request("POST", f"/join/{quote(room)}")
def leave_room(self, room_id): def leave_room(self, room_id):
self.state_store.left(room_id, self.mxid) self.state_store.left(room_id, self.mxid)
return self.client.leave_room(room_id) return self.client.request("POST", f"/rooms/{quote(room_id)}/leave")
def get_room_memberships(self, room_id): def get_room_memberships(self, room_id):
return self.client.get_room_members(room_id) return self.client.request("GET", f"/rooms/{quote(room_id)}/members")
def get_room_members(self, room_id, allowed_memberships=("join",)): async def get_room_members(self, room_id, allowed_memberships=("join",)):
memberships = self.get_room_memberships(room_id) memberships = await self.get_room_memberships(room_id)
return [membership["state_key"] for membership in memberships["chunk"] if return [membership["state_key"] for membership in memberships["chunk"] if
membership["content"]["membership"] in allowed_memberships] membership["content"]["membership"] in allowed_memberships]
def get_room_state(self, room_id): async def get_room_state(self, room_id):
self.ensure_joined(room_id) await self.ensure_joined(room_id)
return self.client.get_room_state(room_id) state = await self.client.request("GET", f"/rooms/{quote(room_id)}/state")
# TODO update values based on state?
return state
# endregion # endregion
# region Ensure functions # region Ensure functions
def ensure_joined(self, room_id, ignore_cache=False): async def ensure_joined(self, room_id, ignore_cache=False):
if not ignore_cache and self.state_store.is_joined(room_id, self.mxid): if not ignore_cache and self.state_store.is_joined(room_id, self.mxid):
return return
self.ensure_registered() await self.ensure_registered()
try: try:
self.client.join_room(room_id) await self._join_room_direct(room_id)
self.state_store.joined(room_id, self.mxid) self.state_store.joined(room_id, self.mxid)
except MatrixRequestError as e: except MatrixRequestError as e:
if matrix_error_code(e) != "M_FORBIDDEN" or not self.bot: if e.errcode != "M_FORBIDDEN" or not self.bot:
raise IntentError(f"Failed to join room {room_id} as {self.mxid}", e) raise IntentError(f"Failed to join room {room_id} as {self.mxid}", e)
try: try:
self.bot.invite_user(room_id, self.mxid) await self.bot.invite(room_id, self.mxid)
self.client.join_room(room_id) await self._join_room_direct(room_id)
self.state_store.joined(room_id, self.mxid) self.state_store.joined(room_id, self.mxid)
except MatrixRequestError as e2: except MatrixRequestError as e2:
raise IntentError(f"Failed to join room {room_id} as {self.mxid}", e2) raise IntentError(f"Failed to join room {room_id} as {self.mxid}", e2)
def ensure_registered(self): def _register(self):
content = {"username": self.localpart}
query_params = {"kind": "user"}
return self.client.request("POST", "/register", content, query_params)
async def ensure_registered(self):
if self.state_store.is_registered(self.mxid): if self.state_store.is_registered(self.mxid):
return return
try: try:
self.client.register({"username": self.localpart}) await self._register()
except MatrixRequestError as e: except MatrixRequestError as e:
if matrix_error_code(e) != "M_USER_IN_USE": if e.errcode != "M_USER_IN_USE":
self.log.exception(f"Failed to register {self.mxid}!") self.log.exception(f"Failed to register {self.mxid}!")
# raise IntentError(f"Failed to register {self.mxid}", e) # raise IntentError(f"Failed to register {self.mxid}", e)
return return
self.state_store.registered(self.mxid) self.state_store.registered(self.mxid)
def _ensure_has_power_level_for(self, room_id, event_type): async def _ensure_has_power_level_for(self, room_id, event_type):
if not self.state_store.has_power_levels(room_id): if not self.state_store.has_power_levels(room_id):
self.get_power_levels(room_id) await self.get_power_levels(room_id)
if self.state_store.has_power_level(room_id, self.mxid, event_type): if self.state_store.has_power_level(room_id, self.mxid, event_type):
return return
elif not self.bot: elif not self.bot:
+16 -8
View File
@@ -17,6 +17,7 @@
import argparse import argparse
import sys import sys
import logging import logging
import asyncio
import sqlalchemy as sql import sqlalchemy as sql
from sqlalchemy import orm from sqlalchemy import orm
@@ -28,10 +29,9 @@ from .config import Config
from .matrix import MatrixHandler from .matrix import MatrixHandler
from .db import init as init_db from .db import init as init_db
from .user import init as init_user from .user import init as init_user, User
from .portal import init as init_portal from .portal import init as init_portal
from .puppet import init as init_puppet from .puppet import init as init_puppet
# from .formatter import init as init_formatter
log = logging.getLogger("mau") log = logging.getLogger("mau")
time_formatter = logging.Formatter("[%(asctime)s] [%(levelname)s@%(name)s] %(message)s") time_formatter = logging.Formatter("[%(asctime)s] [%(levelname)s@%(name)s] %(message)s")
@@ -72,16 +72,24 @@ db_session = orm.scoping.scoped_session(db_factory)
Base.metadata.bind = db_engine Base.metadata.bind = db_engine
Base.metadata.create_all() Base.metadata.create_all()
loop = asyncio.get_event_loop()
appserv = AppService(config["homeserver.address"], config["homeserver.domain"], appserv = AppService(config["homeserver.address"], config["homeserver.domain"],
config["appservice.as_token"], config["appservice.hs_token"], config["appservice.as_token"], config["appservice.hs_token"],
config["appservice.bot_username"], log="mau.as") config["appservice.bot_username"], log="mau.as", loop=loop)
context = (appserv, db_session, config) context = (appserv, db_session, config, loop)
with appserv.run(config["appservice.hostname"], config["appservice.port"]) as start: with appserv.run(config["appservice.hostname"], config["appservice.port"]) as start:
MatrixHandler(context)
init_db(db_session) init_db(db_session)
# init_formatter(context)
init_portal(context) init_portal(context)
init_puppet(context) init_puppet(context)
init_user(context) startup_actions = []
MatrixHandler(context) startup_actions += init_user(context)
start() startup_actions += [start]
try:
loop.run_until_complete(asyncio.gather(*startup_actions, loop=loop))
loop.run_forever()
except KeyboardInterrupt:
for user in User.by_tgid.values():
user.stop()
sys.exit(0)
+227 -221
View File
@@ -14,11 +14,11 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from contextlib import contextmanager
import markdown import markdown
import logging import logging
import asyncio
from matrix_client.errors import MatrixRequestError from mautrix_appservice import MatrixRequestError
from telethon.errors import * from telethon.errors import *
from telethon.tl.types import * from telethon.tl.types import *
@@ -54,33 +54,45 @@ def format_duration(seconds):
return " and ".join(parts) return " and ".join(parts)
class CommandEvent:
def __init__(self, az, command_prefix, room, sender, args, is_management, is_portal):
self.az = az
self.command_prefix = command_prefix
self.room_id = room
self.sender = sender
self.args = args
self.is_management = is_management
self.is_portal = is_portal
def reply(self, message, allow_html=False, render_markdown=True):
if not self.room_id:
raise AttributeError("the reply function can only be used from within"
"the `CommandHandler.run` context manager")
message = message.replace("$cmdprefix+sp ",
"" if self.is_management else f"{self.command_prefix} ")
message = message.replace("$cmdprefix", self.command_prefix)
html = None
if render_markdown:
html = markdown.markdown(message, safe_mode="escape" if allow_html else False)
elif allow_html:
html = message
return self.az.intent.send_notice(self.room_id, message, html=html)
class CommandHandler: class CommandHandler:
log = logging.getLogger("mau.commands") log = logging.getLogger("mau.commands")
def __init__(self, context): def __init__(self, context):
self.az, self.db, self.config = context self.az, self.db, self.config, self.loop = context
self.command_prefix = self.config["bridge.command_prefix"] self.command_prefix = self.config["bridge.command_prefix"]
self._room_id = None
self._is_management = False
self._is_portal = False
# region Utility functions for handling commands # region Utility functions for handling commands
def handle(self, room, sender, command, args, is_management, is_portal): async def handle(self, room, sender, command, args, is_management, is_portal):
with self.handler(sender, room, command, args, is_management, is_portal) as handle_command: evt = CommandEvent(self.az, self.command_prefix, room, sender, args, is_management,
try: is_portal)
handle_command(self, sender, args)
except FloodWaitError as e:
self.reply(f"Flood error: Please wait {format_duration(e.seconds)}")
except Exception:
self.reply("Fatal error while handling command. Check logs for more details.")
self.log.exception(f"Fatal error handling command "
+ f"'$cmdprefix {command} {''.join(args)}' from {sender.mxid}")
@contextmanager
def handler(self, sender, room, command, args, is_management, is_portal):
command = command.lower() command = command.lower()
self._room_id = room
try: try:
command = command_handlers[command] command = command_handlers[command]
except KeyError: except KeyError:
@@ -89,215 +101,205 @@ class CommandHandler:
command = sender.command_status["next"] command = sender.command_status["next"]
else: else:
command = command_handlers["unknown_command"] command = command_handlers["unknown_command"]
self._is_management = is_management try:
self._is_portal = is_portal await command(self, evt)
yield command except FloodWaitError as e:
self._is_management = None return evt.reply(f"Flood error: Please wait {format_duration(e.seconds)}")
self._is_portal = None except Exception:
self._room_id = None self.log.exception(f"Fatal error handling command "
+ f"'$cmdprefix {command} {''.join(args)}' from {sender.mxid}")
def reply(self, message, allow_html=False, render_markdown=True): return evt.reply("Fatal error while handling command. Check logs for more details.")
if not self._room_id:
raise AttributeError("the reply function can only be used from within"
"the `CommandHandler.run` context manager")
message = message.replace("$cmdprefix+sp ",
"" if self._is_management else f"{self.command_prefix} ")
message = message.replace("$cmdprefix", self.command_prefix)
html = None
if render_markdown:
html = markdown.markdown(message, safe_mode="escape" if allow_html else False)
elif allow_html:
html = message
self.az.intent.send_notice(self._room_id, message, html=html)
# endregion # endregion
# region Command handlers # region Command handlers
@command_handler @command_handler
def ping(self, sender, args): async def ping(self, evt):
if not sender.logged_in: if not evt.sender.logged_in:
return self.reply("You're not logged in.") return await evt.reply("You're not logged in.")
me = sender.client.get_me() me = await evt.sender.client.get_me()
if me: if me:
return self.reply(f"You're logged in as @{me.username}") return await evt.reply(f"You're logged in as @{me.username}")
else: else:
return self.reply("You're not logged in.") return await evt.reply("You're not logged in.")
# region Authentication commands # region Authentication commands
@command_handler @command_handler
def register(self, sender, args): def register(self, evt):
self.reply("Not yet implemented.") return evt.reply("Not yet implemented.")
@command_handler @command_handler
def login(self, sender, args): async def login(self, evt):
if not self._is_management: if not evt.is_management:
return self.reply( return await evt.reply(
"`login` is a restricted command: you may only run it in management rooms.") "`login` is a restricted command: you may only run it in management rooms.")
elif sender.logged_in: elif evt.sender.logged_in:
return self.reply("You are already logged in.") return await evt.reply("You are already logged in.")
elif len(args) == 0: elif len(evt.args) == 0:
return self.reply("**Usage:** `$cmdprefix+sp login <phone number>`") return await evt.reply("**Usage:** `$cmdprefix+sp login <phone number>`")
phone_number = args[0] phone_number = evt.args[0]
sender.client.sign_in(phone_number) await evt.sender.client.sign_in(phone_number)
sender.command_status = { evt.sender.command_status = {
"next": command_handlers["enter_code"], "next": command_handlers["enter_code"],
"action": "Login", "action": "Login",
} }
return self.reply(f"Login code sent to {phone_number}. Please send the code here.") return await evt.reply(f"Login code sent to {phone_number}. Please send the code here.")
@command_handler @command_handler
def enter_code(self, sender, args): async def enter_code(self, evt):
if not sender.command_status: if not evt.sender.command_status:
return self.reply("Request a login code first with `$cmdprefix+sp login <phone>`") return await evt.reply(
elif len(args) == 0: "Request a login code first with `$cmdprefix+sp login <phone>`")
return self.reply("**Usage:** `$cmdprefix+sp enter_code <code>`") elif len(evt.args) == 0:
return await evt.reply("**Usage:** `$cmdprefix+sp enter_code <code>`")
try: try:
user = sender.client.sign_in(code=args[0]) user = await evt.sender.client.sign_in(code=evt.args[0])
sender.post_login(user) asyncio.ensure_future(evt.sender.post_login(user), loop=self.loop)
sender.command_status = None evt.sender.command_status = None
return self.reply(f"Successfully logged in as @{user.username}") return await evt.reply(f"Successfully logged in as @{user.username}")
except PhoneNumberUnoccupiedError: except PhoneNumberUnoccupiedError:
return self.reply("That phone number has not been registered." return await evt.reply("That phone number has not been registered."
"Please register with `$cmdprefix+sp register <phone>`.") "Please register with `$cmdprefix+sp register <phone>`.")
except PhoneCodeExpiredError: except PhoneCodeExpiredError:
return self.reply( return await evt.reply(
"Phone code expired. Try again with `$cmdprefix+sp login <phone>`.") "Phone code expired. Try again with `$cmdprefix+sp login <phone>`.")
except PhoneCodeInvalidError: except PhoneCodeInvalidError:
return self.reply("Invalid phone code.") return await evt.reply("Invalid phone code.")
except PhoneNumberAppSignupForbiddenError: except PhoneNumberAppSignupForbiddenError:
return self.reply( return await evt.reply(
"Your phone number does not allow 3rd party apps to sign in.") "Your phone number does not allow 3rd party apps to sign in.")
except PhoneNumberFloodError: except PhoneNumberFloodError:
return self.reply( return await evt.reply(
"Your phone number has been temporarily blocked for flooding. " "Your phone number has been temporarily blocked for flooding. "
"The block is usually applied for around a day.") "The block is usually applied for around a day.")
except PhoneNumberBannedError: except PhoneNumberBannedError:
return self.reply("Your phone number has been banned from Telegram.") return await evt.reply("Your phone number has been banned from Telegram.")
except SessionPasswordNeededError: except SessionPasswordNeededError:
sender.command_status = { evt.sender.command_status = {
"next": command_handlers["enter_password"], "next": command_handlers["enter_password"],
"action": "Login (password entry)", "action": "Login (password entry)",
} }
return self.reply("Your account has two-factor authentication." return await evt.reply("Your account has two-factor authentication."
"Please send your password here.") "Please send your password here.")
except Exception: except Exception:
self.log.exception() self.log.exception("Error sending phone code")
return self.reply("Unhandled exception while sending code." return await evt.reply("Unhandled exception while sending code."
"Check console for more details.") "Check console for more details.")
@command_handler @command_handler
def enter_password(self, sender, args): async def enter_password(self, evt):
if not sender.command_status: if not evt.sender.command_status:
return self.reply("Request a login code first with `$cmdprefix+sp login <phone>`") return await evt.reply(
elif len(args) == 0: "Request a login code first with `$cmdprefix+sp login <phone>`")
return self.reply("**Usage:** `$cmdprefix+sp enter_password <password>`") elif len(evt.args) == 0:
return await evt.reply("**Usage:** `$cmdprefix+sp enter_password <password>`")
try: try:
user = sender.client.sign_in(password=args[0]) user = await evt.sender.client.sign_in(password=evt.args[0])
sender.post_login(user) asyncio.ensure_future(evt.sender.post_login(user), loop=self.loop)
sender.command_status = None evt.sender.command_status = None
return self.reply(f"Successfully logged in as @{user.username}") return await evt.reply(f"Successfully logged in as @{user.username}")
except PasswordHashInvalidError: except PasswordHashInvalidError:
return self.reply("Incorrect password.") return await evt.reply("Incorrect password.")
except Exception: except Exception:
self.log.exception() self.log.exception("Error sending password")
return self.reply("Unhandled exception while sending password. " return await evt.reply("Unhandled exception while sending password. "
"Check console for more details.") "Check console for more details.")
@command_handler @command_handler
def logout(self, sender, args): async def logout(self, evt):
if not sender.logged_in: if not evt.sender.logged_in:
return self.reply("You're not logged in.") return await evt.reply("You're not logged in.")
if sender.log_out(): if await evt.sender.log_out():
return self.reply("Logged out successfully.") return await evt.reply("Logged out successfully.")
return self.reply("Failed to log out.") return await evt.reply("Failed to log out.")
# endregion # endregion
# region Telegram interaction commands # region Telegram interaction commands
@command_handler @command_handler
def search(self, sender, args): async def search(self, evt):
if len(args) == 0: if len(evt.args) == 0:
return self.reply("**Usage:** `$cmdprefix+sp search [-r|--remote] <query>`") return await evt.reply("**Usage:** `$cmdprefix+sp search [-r|--remote] <query>`")
elif not sender.logged_in: elif not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
# force_remote = False # force_remote = False
if args[0] in {"-r", "--remote"}: if evt.args[0] in {"-r", "--remote"}:
# force_remote = True # force_remote = True
args.pop(0) evt.args.pop(0)
query = " ".join(args) query = " ".join(evt.args)
if len(query) < 5: if len(query) < 5:
return self.reply("Minimum length of query for remote search is 5 characters.") return await evt.reply("Minimum length of query for remote search is 5 characters.")
found = sender.client(SearchRequest(q=query, limit=10)) found = await evt.sender.client(SearchRequest(q=query, limit=10))
# reply = ["**People:**", ""] # reply = ["**People:**", ""]
reply = ["**Results from Telegram server:**", ""] reply = ["**Results from Telegram server:**", ""]
for result in found.users: for result in found.users:
puppet = pu.Puppet.get(result.id) puppet = pu.Puppet.get(result.id)
puppet.update_info(sender, result) await puppet.update_info(evt.sender, result)
reply.append( reply.append(
f"* [{puppet.displayname}](https://matrix.to/#/{puppet.mxid}): {puppet.id}") f"* [{puppet.displayname}](https://matrix.to/#/{puppet.mxid}): {puppet.id}")
# reply.extend(("", "**Chats:**", "")) # reply.extend(("", "**Chats:**", ""))
# for result in found.chats: # for result in found.chats:
# reply.append(f"* {result.title}") # reply.append(f"* {result.title}")
return self.reply("\n".join(reply)) return await evt.reply("\n".join(reply))
@command_handler @command_handler
def pm(self, sender, args): async def pm(self, evt):
if len(args) == 0: if len(evt.args) == 0:
return self.reply("**Usage:** `$cmdprefix+sp pm <user identifier>`") return await evt.reply("**Usage:** `$cmdprefix+sp pm <user identifier>`")
elif not sender.logged_in: elif not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
user = sender.client.get_entity(args[0]) user = await evt.sender.client.get_entity(evt.args[0])
if not user: if not user:
return self.reply("User not found.") return await evt.reply("User not found.")
elif not isinstance(user, User): elif not isinstance(user, User):
return self.reply("That doesn't seem to be a user.") return await evt.reply("That doesn't seem to be a user.")
portal = po.Portal.get_by_entity(user, sender.tgid) portal = po.Portal.get_by_entity(user, evt.sender.tgid)
portal.create_matrix_room(sender, user, [sender.mxid]) await portal.create_matrix_room(evt.sender, user, [evt.sender.mxid])
self.reply(f"Created private chat room with {pu.Puppet.get_displayname(user, False)}") return await evt.reply(
f"Created private chat room with {pu.Puppet.get_displayname(user, False)}")
@command_handler @command_handler
def invitelink(self, sender, args): async def invitelink(self, evt):
if not sender.logged_in: if not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
portal = po.Portal.get_by_mxid(self._room_id) portal = po.Portal.get_by_mxid(evt.room_id)
if not portal: if not portal:
return self.reply("This is not a portal room.") return await evt.reply("This is not a portal room.")
if portal.peer_type == "user": if portal.peer_type == "user":
return self.reply("You can't invite users to private chats.") return await evt.reply("You can't invite users to private chats.")
try: try:
link = portal.get_invite_link(sender) link = await portal.get_invite_link(evt.sender)
return self.reply(f"Invite link to {portal.title}: {link}") return await evt.reply(f"Invite link to {portal.title}: {link}")
except ValueError as e: except ValueError as e:
return self.reply(e.args[0]) return await evt.reply(e.args[0])
except ChatAdminRequiredError: except ChatAdminRequiredError:
return self.reply("You don't have the permission to create an invite link.") return await evt.reply("You don't have the permission to create an invite link.")
@command_handler @command_handler
def deleteportal(self, sender, args): async def deleteportal(self, evt):
if not sender.logged_in: if not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
elif not sender.is_admin: elif not evt.sender.is_admin:
return self.reply("This is command requires administrator privileges.") return await evt.reply("This is command requires administrator privileges.")
portal = po.Portal.get_by_mxid(self._room_id) portal = po.Portal.get_by_mxid(evt.room_id)
if not portal: if not portal:
return self.reply("This is not a portal room.") return await evt.reply("This is not a portal room.")
for user in portal.main_intent.get_room_members(portal.mxid): for user in await portal.main_intent.get_room_members(portal.mxid):
if user != portal.main_intent.mxid: if user != portal.main_intent.mxid:
try: try:
portal.main_intent.kick(portal.mxid, user, "Portal deleted.") await portal.main_intent.kick(portal.mxid, user, "Portal deleted.")
except MatrixRequestError: except MatrixRequestError:
pass pass
portal.main_intent.leave_room(portal.mxid) await portal.main_intent.leave_room(portal.mxid)
portal.delete() portal.delete()
@staticmethod @staticmethod
@@ -308,55 +310,56 @@ class CommandHandler:
return value return value
@command_handler @command_handler
def join(self, sender, args): async def join(self, evt):
if len(args) == 0: if len(evt.args) == 0:
return self.reply("**Usage:** `$cmdprefix+sp join <invite link>`") return await evt.reply("**Usage:** `$cmdprefix+sp join <invite link>`")
elif not sender.logged_in: elif not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
regex = re.compile(r"(?:https?://)?t(?:elegram)?\.(?:dog|me)(?:joinchat/)?/(.+)") regex = re.compile(r"(?:https?://)?t(?:elegram)?\.(?:dog|me)(?:joinchat/)?/(.+)")
arg = regex.match(args[0]) arg = regex.match(evt.args[0])
if not arg: if not arg:
return self.reply("That doesn't look like a Telegram invite link.") return await evt.reply("That doesn't look like a Telegram invite link.")
arg = arg.group(1) arg = arg.group(1)
if arg.startswith("joinchat/"): if arg.startswith("joinchat/"):
invite_hash = arg[len("joinchat/"):] invite_hash = arg[len("joinchat/"):]
try: try:
sender.client(CheckChatInviteRequest(invite_hash)) await evt.sender.client(CheckChatInviteRequest(invite_hash))
except InviteHashInvalidError: except InviteHashInvalidError:
return self.reply("Invalid invite link.") return await evt.reply("Invalid invite link.")
except InviteHashExpiredError: except InviteHashExpiredError:
return self.reply("Invite link expired.") return await evt.reply("Invite link expired.")
try: try:
updates = sender.client(ImportChatInviteRequest(invite_hash)) updates = evt.sender.client(ImportChatInviteRequest(invite_hash))
except UserAlreadyParticipantError: except UserAlreadyParticipantError:
return self.reply("You are already in that chat.") return await evt.reply("You are already in that chat.")
else: else:
channel = sender.client.get_entity(arg) channel = await evt.sender.client.get_entity(arg)
if not channel: if not channel:
return self.reply("Channel/supergroup not found.") return await evt.reply("Channel/supergroup not found.")
updates = sender.client(JoinChannelRequest(channel)) updates = await evt.sender.client(JoinChannelRequest(channel))
for chat in updates.chats: for chat in updates.chats:
portal = po.Portal.get_by_entity(chat) portal = po.Portal.get_by_entity(chat)
if portal.mxid: if portal.mxid:
portal.create_matrix_room(sender, chat, [sender.mxid]) await portal.create_matrix_room(evt.sender, chat, [evt.sender.mxid])
self.reply(f"Created room for {portal.title}") return await evt.reply(f"Created room for {portal.title}")
else: else:
portal.invite_matrix([sender.mxid]) await portal.invite_matrix([evt.sender.mxid])
self.reply(f"Invited you to portal of {portal.title}") return await evt.reply(f"Invited you to portal of {portal.title}")
@command_handler @command_handler
def create(self, sender, args): async def create(self, evt):
type = args[0] if len(args) > 0 else "group" type = evt.args[0] if len(evt.args) > 0 else "group"
if type not in {"chat", "group", "supergroup", "channel"}: if type not in {"chat", "group", "supergroup", "channel"}:
return self.reply("**Usage:** `$cmdprefix+sp create ['group'/'supergroup'/'channel']`") return await evt.reply(
elif not sender.logged_in: "**Usage:** `$cmdprefix+sp create ['group'/'supergroup'/'channel']`")
return self.reply("This command requires you to be logged in.") elif not evt.sender.logged_in:
return await evt.reply("This command requires you to be logged in.")
if po.Portal.get_by_mxid(self._room_id): if po.Portal.get_by_mxid(evt.room_id):
return self.reply("This is already a portal room.") return await evt.reply("This is already a portal room.")
state = self.az.intent.get_room_state(self._room_id) state = await self.az.intent.get_room_state(evt.room_id)
title = None title = None
about = None about = None
levels = None levels = None
@@ -368,18 +371,19 @@ class CommandHandler:
elif event["type"] == "m.room.power_levels": elif event["type"] == "m.room.power_levels":
levels = event["content"] levels = event["content"]
if not title: if not title:
return self.reply("Please set a title before creating a Telegram chat.") return await evt.reply("Please set a title before creating a Telegram chat.")
elif (not levels or not levels["users"] or self.az.intent.mxid not in levels["users"] or elif (not levels or not levels["users"] or self.az.intent.mxid not in levels["users"] or
levels["users"][self.az.intent.mxid] < 100): levels["users"][self.az.intent.mxid] < 100):
return self.reply(f"Please give " return await evt.reply(f"Please give "
+ f"[the bridge bot](https://matrix.to/#/{self.az.intent.mxid}) " + f"[the bridge bot](https://matrix.to/#/{self.az.intent.mxid})"
+ f"a power level of 100 before creating a Telegram chat.") + f" a power level of 100 before creating a Telegram chat.")
else: else:
for user, level in levels["users"].items(): for user, level in levels["users"].items():
if level >= 100 and user != self.az.intent.mxid: if level >= 100 and user != self.az.intent.mxid:
return self.reply(f"Please make sure only the bridge bot has power level above" return await evt.reply(
+ f"99 before creating a Telegram chat.\n\n" f"Please make sure only the bridge bot has power level above"
+ f"Use power level 95 instead of 100 for admins.") + f"99 before creating a Telegram chat.\n\n"
+ f"Use power level 95 instead of 100 for admins.")
supergroup = type == "supergroup" supergroup = type == "supergroup"
type = { type = {
@@ -389,86 +393,88 @@ class CommandHandler:
"group": "chat", "group": "chat",
}[type] }[type]
portal = po.Portal(tgid=None, mxid=self._room_id, title=title, about=about, peer_type=type) portal = po.Portal(tgid=None, mxid=evt.room_id, title=title, about=about, peer_type=type)
try: try:
portal.create_telegram_chat(sender, supergroup=supergroup) await portal.create_telegram_chat(evt.sender, supergroup=supergroup)
except ValueError as e: except ValueError as e:
return self.reply(e.args[0]) return await evt.reply(e.args[0])
self.reply(f"Telegram chat created. ID: {portal.tgid}") return await evt.reply(f"Telegram chat created. ID: {portal.tgid}")
@command_handler @command_handler
def upgrade(self, sender, args): async def upgrade(self, evt):
if not sender.logged_in: if not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
portal = po.Portal.get_by_mxid(self._room_id) portal = po.Portal.get_by_mxid(evt.room_id)
if not portal: if not portal:
return self.reply("This is not a portal room.") return await evt.reply("This is not a portal room.")
elif portal.peer_type == "channel": elif portal.peer_type == "channel":
return self.reply("This is already a supergroup or a channel.") return await evt.reply("This is already a supergroup or a channel.")
elif portal.peer_type == "user": elif portal.peer_type == "user":
return self.reply("You can't upgrade private chats.") return await evt.reply("You can't upgrade private chats.")
try: try:
portal.upgrade_telegram_chat(sender) await portal.upgrade_telegram_chat(evt.sender)
return self.reply(f"Group upgraded to supergroup. New ID: {portal.tgid}") return await evt.reply(f"Group upgraded to supergroup. New ID: {portal.tgid}")
except ChatAdminRequiredError: except ChatAdminRequiredError:
return self.reply("You don't have the permission to upgrade this group.") return await evt.reply("You don't have the permission to upgrade this group.")
except ValueError as e: except ValueError as e:
return self.reply(e.args[0]) return await evt.reply(e.args[0])
@command_handler @command_handler
def groupname(self, sender, args): async def groupname(self, evt):
if len(args) == 0: if len(evt.args) == 0:
return self.reply("**Usage:** `$cmdprefix+sp groupname <name/->`") return await evt.reply("**Usage:** `$cmdprefix+sp groupname <name/->`")
if not sender.logged_in: if not evt.sender.logged_in:
return self.reply("This command requires you to be logged in.") return await evt.reply("This command requires you to be logged in.")
portal = po.Portal.get_by_mxid(self._room_id) portal = po.Portal.get_by_mxid(evt.room_id)
if not portal: if not portal:
return self.reply("This is not a portal room.") return await evt.reply("This is not a portal room.")
elif portal.peer_type != "channel": elif portal.peer_type != "channel":
return self.reply("Only channels and supergroups have usernames.") return await evt.reply("Only channels and supergroups have usernames.")
try: try:
portal.set_telegram_username(sender, args[0] if args[0] != "-" else "") await portal.set_telegram_username(evt.sender,
evt.args[0] if evt.args[0] != "-" else "")
if portal.username: if portal.username:
return self.reply(f"Username of channel changed to {portal.username}.") return await evt.reply(f"Username of channel changed to {portal.username}.")
else: else:
return self.reply(f"Channel is now private.") return await evt.reply(f"Channel is now private.")
except ChatAdminRequiredError: except ChatAdminRequiredError:
return self.reply("You don't have the permission to set the username of this channel.") return await evt.reply(
"You don't have the permission to set the username of this channel.")
except UsernameNotModifiedError: except UsernameNotModifiedError:
if portal.username: if portal.username:
return self.reply("That is already the username of this channel.") return await evt.reply("That is already the username of this channel.")
else: else:
return self.reply("This channel is already private") return await evt.reply("This channel is already private")
except UsernameOccupiedError: except UsernameOccupiedError:
return self.reply("That username is already in use.") return await evt.reply("That username is already in use.")
except UsernameInvalidError: except UsernameInvalidError:
return self.reply("Invalid username") return await evt.reply("Invalid username")
# endregion # endregion
# region Command-related commands # region Command-related commands
@command_handler @command_handler
def cancel(self, sender, args): def cancel(self, evt):
if sender.command_status: if evt.sender.command_status:
action = sender.command_status["action"] action = evt.sender.command_status["action"]
sender.command_status = None evt.sender.command_status = None
return self.reply(f"{action} cancelled.") return evt.reply(f"{action} cancelled.")
else: else:
return self.reply("No ongoing command.") return evt.reply("No ongoing command.")
@command_handler @command_handler
def unknown_command(self, sender, args): def unknown_command(self, evt):
return self.reply("Unknown command. Try `$cmdprefix+sp help` for help.") return evt.reply("Unknown command. Try `$cmdprefix+sp help` for help.")
@command_handler @command_handler
def help(self, sender, args): def help(self, evt):
if self._is_management: if evt.is_management:
management_status = ("This is a management room: prefixing commands " management_status = ("This is a management room: prefixing commands "
"with `$cmdprefix` is not required.\n") "with `$cmdprefix` is not required.\n")
elif self._is_portal: elif evt.is_portal:
management_status = ("**This is a portal room**: you must always " management_status = ("**This is a portal room**: you must always "
"prefix commands with `$cmdprefix`.\n" "prefix commands with `$cmdprefix`.\n"
"Management commands will not be sent to Telegram.") "Management commands will not be sent to Telegram.")
@@ -503,7 +509,7 @@ class CommandHandler:
**groupname** <_name_|`-`> - Change the username of a supergroup/channel. To disable, use a dash **groupname** <_name_|`-`> - Change the username of a supergroup/channel. To disable, use a dash
(`-`) as the name. (`-`) as the name.
""" """
return self.reply(management_status + help) return evt.reply(management_status + help)
# endregion # endregion
# endregion # endregion
+4 -4
View File
@@ -20,7 +20,7 @@ from collections import deque
import re import re
import logging import logging
from matrix_client.errors import MatrixRequestError from mautrix_appservice import MatrixRequestError
from telethon.tl.types import * from telethon.tl.types import *
@@ -213,7 +213,7 @@ def matrix_to_telegram(html, tg_space=None):
# endregion # endregion
# region Telegram to Matrix # region Telegram to Matrix
def telegram_event_to_matrix(evt, source, native_replies=False, message_link_in_reply=False, async def telegram_event_to_matrix(evt, source, native_replies=False, message_link_in_reply=False,
main_intent=None): main_intent=None):
text = evt.message text = evt.message
html = telegram_to_matrix(evt.message, evt.entities) if evt.entities else None html = telegram_to_matrix(evt.message, evt.entities) if evt.entities else None
@@ -230,7 +230,7 @@ def telegram_event_to_matrix(evt, source, native_replies=False, message_link_in_
if puppet and puppet.displayname: if puppet and puppet.displayname:
fwd_from = f"<a href='https://matrix.to/#/{puppet.mxid}'>{puppet.displayname}</a>" fwd_from = f"<a href='https://matrix.to/#/{puppet.mxid}'>{puppet.displayname}</a>"
else: else:
user = source.client.get_entity(from_id) user = await source.client.get_entity(from_id)
if user: if user:
fwd_from = p.Puppet.get_displayname(user, format=False) fwd_from = p.Puppet.get_displayname(user, format=False)
else: else:
@@ -249,7 +249,7 @@ def telegram_event_to_matrix(evt, source, native_replies=False, message_link_in_
quote = f"<a href=\"https://matrix.to/#/{msg.mx_room}/{msg.mxid}\">Quote<br></a>" quote = f"<a href=\"https://matrix.to/#/{msg.mx_room}/{msg.mxid}\">Quote<br></a>"
else: else:
try: try:
event = main_intent.get_event(msg.mx_room, msg.mxid) event = await main_intent.get_event(msg.mx_room, msg.mxid)
content = event["content"] content = event["content"]
body = (content["formatted_body"] body = (content["formatted_body"]
if "formatted_body" in content if "formatted_body" in content
+51 -49
View File
@@ -16,7 +16,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging import logging
from matrix_client.errors import MatrixRequestError from mautrix_appservice import MatrixRequestError
from .user import User from .user import User
from .portal import Portal from .portal import Portal
@@ -28,85 +28,87 @@ class MatrixHandler:
log = logging.getLogger("mau.mx") log = logging.getLogger("mau.mx")
def __init__(self, context): def __init__(self, context):
self.az, self.db, self.config = context self.az, self.db, self.config, _ = context
self.commands = CommandHandler(context) self.commands = CommandHandler(context)
self.az.matrix_event_handler(self.handle_event) self.az.matrix_event_handler(self.handle_event)
async def init_as_bot(self):
self.az.intent.set_display_name( self.az.intent.set_display_name(
self.config.get("appservice.bot_displayname", "Telegram bridge bot")) self.config.get("appservice.bot_displayname", "Telegram bridge bot"))
def handle_puppet_invite(self, room, puppet, inviter): async def handle_puppet_invite(self, room, puppet, inviter):
self.log.debug(f"{inviter} invited puppet for {puppet.tgid} to {room}") self.log.debug(f"{inviter} invited puppet for {puppet.tgid} to {room}")
if not inviter.logged_in: if not inviter.logged_in:
puppet.intent.error_and_leave( await puppet.intent.error_and_leave(
room, text="Please log in before inviting Telegram puppets.") room, text="Please log in before inviting Telegram puppets.")
return return
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
if portal: if portal:
if portal.peer_type == "user": if portal.peer_type == "user":
puppet.intent.error_and_leave( await puppet.intent.error_and_leave(
room, text="You can not invite additional users to private chats.") room, text="You can not invite additional users to private chats.")
return return
portal.invite_telegram(inviter, puppet) await portal.invite_telegram(inviter, puppet)
puppet.intent.join_room(room) await puppet.intent.join_room(room)
return return
try: try:
members = self.az.intent.get_room_members(room) members = await self.az.intent.get_room_members(room)
except MatrixRequestError: except MatrixRequestError:
members = [] members = []
if self.az.intent.mxid not in members: if self.az.intent.mxid not in members:
if len(members) > 1: if len(members) > 1:
puppet.intent.error_and_leave(room, text=None, html=( await puppet.intent.error_and_leave(room, text=None, html=(
f"Please invite " f"Please invite "
+ f"<a href='https://matrix.to/#/{self.az.intent.mxid}'>the bridge bot</a> " + f"<a href='https://matrix.to/#/{self.az.intent.mxid}'>the bridge bot</a> "
+ f"first if you want to create a Telegram chat.")) + f"first if you want to create a Telegram chat."))
return return
puppet.intent.join_room(room) await puppet.intent.join_room(room)
portal = Portal.get_by_tgid(puppet.tgid, inviter.tgid, "user") portal = Portal.get_by_tgid(puppet.tgid, inviter.tgid, "user")
if portal.mxid: if portal.mxid:
try: try:
puppet.intent.invite(portal.mxid, inviter.mxid) await puppet.intent.invite(portal.mxid, inviter.mxid)
puppet.intent.send_notice(room, text=None, html=( await puppet.intent.send_notice(room, text=None, html=(
"You already have a private chat with me: " "You already have a private chat with me: "
+ f"<a href='https://matrix.to/#/{portal.mxid}'>" + f"<a href='https://matrix.to/#/{portal.mxid}'>"
+ "Link to room" + "Link to room"
+ "</a>")) + "</a>"))
puppet.intent.leave_room(room) await puppet.intent.leave_room(room)
return return
except MatrixRequestError: except MatrixRequestError:
pass pass
portal.mxid = room portal.mxid = room
portal.save() portal.save()
puppet.intent.send_notice(room, "Portal to private chat created.") await puppet.intent.send_notice(room, "Portal to private chat created.")
else: else:
puppet.intent.join_room(room) await puppet.intent.join_room(room)
puppet.intent.send_notice(room, "This puppet will remain inactive until a Telegram " await puppet.intent.send_notice(room, "This puppet will remain inactive until a "
"chat is created for this room.") "Telegram chat is created for this room.")
def handle_invite(self, room, user, inviter): async def handle_invite(self, room, user, inviter):
inviter = User.get_by_mxid(inviter) inviter = User.get_by_mxid(inviter)
if not inviter.whitelisted: if not inviter.whitelisted:
return return
elif user == self.az.bot_mxid: elif user == self.az.bot_mxid:
self.az.intent.join_room(room) await self.az.intent.join_room(room)
return return
puppet = Puppet.get_by_mxid(user) puppet = Puppet.get_by_mxid(user)
if puppet: if puppet:
self.handle_puppet_invite(room, puppet, inviter) await self.handle_puppet_invite(room, puppet, inviter)
return return
user = User.get_by_mxid(user, create=False) user = User.get_by_mxid(user, create=False)
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
if user and user.has_full_access and portal: if user and user.has_full_access and portal:
portal.invite_telegram(inviter, user) await portal.invite_telegram(inviter, user)
return return
# The rest can probably be ignored # The rest can probably be ignored
self.log.debug(f"{inviter} invited {user} to {room}") self.log.debug(f"{inviter} invited {user} to {room}")
def handle_join(self, room, user): async def handle_join(self, room, user):
user = User.get_by_mxid(user) user = User.get_by_mxid(user)
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
@@ -114,19 +116,19 @@ class MatrixHandler:
return return
if not user.whitelisted: if not user.whitelisted:
portal.main_intent.kick(room, user.mxid, await portal.main_intent.kick(room, user.mxid,
"You are not whitelisted on this Telegram bridge.") "You are not whitelisted on this Telegram bridge.")
return return
elif not user.logged_in: elif not user.logged_in:
# TODO[waiting-for-bots] once we have bot support, this won't be needed. # TODO[waiting-for-bots] once we have bot support, this won't be needed.
portal.main_intent.kick(room, user.mxid, await portal.main_intent.kick(room, user.mxid,
"You are not logged into this Telegram bridge.") "You are not logged into this Telegram bridge.")
return return
self.log.debug(f"{user} joined {room}") self.log.debug(f"{user} joined {room}")
# TODO join Telegram chat if applicable # TODO join Telegram chat if applicable
def handle_part(self, room, user, sender): async def handle_part(self, room, user, sender):
self.log.debug(f"{user} left {room}") self.log.debug(f"{user} left {room}")
sender = User.get_by_mxid(sender, create=False) sender = User.get_by_mxid(sender, create=False)
@@ -137,11 +139,11 @@ class MatrixHandler:
puppet = Puppet.get_by_mxid(user) puppet = Puppet.get_by_mxid(user)
if sender and puppet: if sender and puppet:
portal.leave_matrix(puppet, sender) await portal.leave_matrix(puppet, sender)
user = User.get_by_mxid(user, create=False) user = User.get_by_mxid(user, create=False)
if user and user.logged_in: if user and user.logged_in:
portal.leave_matrix(user, sender) await portal.leave_matrix(user, sender)
def is_command(self, message): def is_command(self, message):
text = message.get("body", "") text = message.get("body", "")
@@ -151,7 +153,7 @@ class MatrixHandler:
text = text[len(prefix) + 1:] text = text[len(prefix) + 1:]
return is_command, text return is_command, text
def handle_message(self, room, sender, message, event_id): async def handle_message(self, room, sender, message, event_id):
self.log.debug(f"{sender} sent {message} to ${room}") self.log.debug(f"{sender} sent {message} to ${room}")
is_command, text = self.is_command(message) is_command, text = self.is_command(message)
@@ -159,14 +161,14 @@ class MatrixHandler:
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
if sender.has_full_access and portal and not is_command: if sender.has_full_access and portal and not is_command:
portal.handle_matrix_message(sender, message, event_id) await portal.handle_matrix_message(sender, message, event_id)
return return
if message["msgtype"] != "m.text": if message["msgtype"] != "m.text":
return return
try: try:
is_management = len(self.az.intent.get_room_members(room)) == 2 is_management = len(await self.az.intent.get_room_members(room)) == 2
except MatrixRequestError: except MatrixRequestError:
# The AS bot is not in the room. # The AS bot is not in the room.
return return
@@ -179,22 +181,22 @@ class MatrixHandler:
# Not enough values to unpack, i.e. no arguments # Not enough values to unpack, i.e. no arguments
command = text command = text
args = [] args = []
self.commands.handle(room, sender, command, args, is_management, await self.commands.handle(room, sender, command, args, is_management,
is_portal=portal is not None) is_portal=portal is not None)
def handle_redaction(self, room, sender, event_id): async def handle_redaction(self, room, sender, event_id):
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
sender = User.get_by_mxid(sender) sender = User.get_by_mxid(sender)
if sender.has_full_access and portal: if sender.has_full_access and portal:
portal.handle_matrix_deletion(sender, event_id) await portal.handle_matrix_deletion(sender, event_id)
def handle_power_levels(self, room, sender, new, old): async def handle_power_levels(self, room, sender, new, old):
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
sender = User.get_by_mxid(sender) sender = User.get_by_mxid(sender)
if sender.has_full_access and portal: if sender.has_full_access and portal:
portal.handle_matrix_power_levels(sender, new["users"], old["users"]) await portal.handle_matrix_power_levels(sender, new["users"], old["users"])
def handle_room_meta(self, type, room, sender, content): async def handle_room_meta(self, type, room, sender, content):
portal = Portal.get_by_mxid(room) portal = Portal.get_by_mxid(room)
sender = User.get_by_mxid(sender) sender = User.get_by_mxid(sender)
if sender.has_full_access and portal: if sender.has_full_access and portal:
@@ -206,13 +208,13 @@ class MatrixHandler:
if content_key not in content: if content_key not in content:
# FIXME handle # FIXME handle
pass pass
handler(sender, content[content_key]) await handler(sender, content[content_key])
def filter_matrix_event(self, event): def filter_matrix_event(self, event):
return (event["sender"] == self.az.bot_mxid return (event["sender"] == self.az.bot_mxid
or Puppet.get_id_from_mxid(event["sender"]) is not None) or Puppet.get_id_from_mxid(event["sender"]) is not None)
def handle_event(self, evt): async def handle_event(self, evt):
if self.filter_matrix_event(evt): if self.filter_matrix_event(evt):
return return
self.log.debug("Received event: %s", evt) self.log.debug("Received event: %s", evt)
@@ -221,17 +223,17 @@ class MatrixHandler:
if type == "m.room.member": if type == "m.room.member":
membership = content.get("membership", "") membership = content.get("membership", "")
if membership == "invite": if membership == "invite":
self.handle_invite(evt["room_id"], evt["state_key"], evt["sender"]) await self.handle_invite(evt["room_id"], evt["state_key"], evt["sender"])
elif membership == "leave": elif membership == "leave":
self.handle_part(evt["room_id"], evt["state_key"], evt["sender"]) await self.handle_part(evt["room_id"], evt["state_key"], evt["sender"])
elif membership == "join": elif membership == "join":
self.handle_join(evt["room_id"], evt["state_key"]) await self.handle_join(evt["room_id"], evt["state_key"])
elif type == "m.room.message": elif type == "m.room.message":
self.handle_message(evt["room_id"], evt["sender"], content, evt["event_id"]) await self.handle_message(evt["room_id"], evt["sender"], content, evt["event_id"])
elif type == "m.room.redaction": elif type == "m.room.redaction":
self.handle_redaction(evt["room_id"], evt["sender"], evt["redacts"]) await self.handle_redaction(evt["room_id"], evt["sender"], evt["redacts"])
elif type == "m.room.power_levels": elif type == "m.room.power_levels":
self.handle_power_levels(evt["room_id"], evt["sender"], evt["content"], await self.handle_power_levels(evt["room_id"], evt["sender"], evt["content"],
evt["prev_content"]) evt["prev_content"])
elif type == "m.room.name" or type == "m.room.avatar" or type == "m.room.topic": elif type == "m.room.name" or type == "m.room.avatar" or type == "m.room.topic":
self.handle_room_meta(type, evt["room_id"], evt["sender"], evt["content"]) await self.handle_room_meta(type, evt["room_id"], evt["sender"], evt["content"])
+184 -168
View File
@@ -17,6 +17,7 @@
from io import BytesIO from io import BytesIO
from collections import deque from collections import deque
from datetime import datetime from datetime import datetime
import asyncio
import random import random
import mimetypes import mimetypes
import hashlib import hashlib
@@ -56,6 +57,7 @@ class Portal:
self.about = about self.about = about
self.photo_id = photo_id self.photo_id = photo_id
self._main_intent = None self._main_intent = None
self._room_create_lock = asyncio.Lock()
self._dedup = deque() self._dedup = deque()
self._dedup_mxid = {} self._dedup_mxid = {}
@@ -138,39 +140,48 @@ class Portal:
self._main_intent = puppet.intent if direct else self.az.intent self._main_intent = puppet.intent if direct else self.az.intent
return self._main_intent return self._main_intent
def invite_matrix(self, users): async def invite_matrix(self, users):
if isinstance(users, str): if isinstance(users, str):
self.main_intent.invite(self.mxid, users, check_cache=True) await self.main_intent.invite(self.mxid, users, check_cache=True)
elif isinstance(users, list): elif isinstance(users, list):
for user in users: for user in users:
self.main_intent.invite(self.mxid, user, check_cache=True) await self.main_intent.invite(self.mxid, user, check_cache=True)
else: else:
raise ValueError("Invalid invite identifier given to invite_matrix()") raise ValueError("Invalid invite identifier given to invite_matrix()")
def update_after_create(self, user, entity, direct, puppet=None): async def update_after_create(self, user, entity, direct, puppet=None):
if not direct: if not direct:
self.update_info(user, entity) await self.update_info(user, entity)
users, participants = self.get_users(user, entity) users, participants = await self.get_users(user, entity)
self.sync_telegram_users(user, users) await self.sync_telegram_users(user, users)
self.update_telegram_participants(participants) await self.update_telegram_participants(participants)
else: else:
if not puppet: if not puppet:
puppet = p.Puppet.get(self.tgid) puppet = p.Puppet.get(self.tgid)
puppet.update_info(user, entity) await puppet.update_info(user, entity)
puppet.intent.join_room(self.mxid) await puppet.intent.join_room(self.mxid)
def create_matrix_room(self, user, entity=None, invites=None, update_if_exists=True): async def create_matrix_room(self, user, entity=None, invites=None, update_if_exists=True):
if not entity: if self.mxid:
entity = user.client.get_entity(self.peer) if update_if_exists:
self.log.debug("Fetched data: %s", entity) if not entity:
entity = await user.client.get_entity(self.peer)
await self.update_after_create(user, entity, self.peer_type == "user")
await self.invite_matrix(invites or [])
return self.mxid
async with self._room_create_lock:
return await self._create_matrix_room(user, entity, invites)
async def _create_matrix_room(self, user, entity, invites):
direct = self.peer_type == "user" direct = self.peer_type == "user"
if self.mxid: if self.mxid:
if update_if_exists:
self.update_after_create(user, entity, direct)
self.invite_matrix(invites or [])
return self.mxid return self.mxid
if not entity:
entity = await user.client.get_entity(self.peer)
self.log.debug("Fetched data: %s", entity)
self.log.debug(f"Creating room for {self.tgid_log}") self.log.debug(f"Creating room for {self.tgid_log}")
try: try:
@@ -194,8 +205,8 @@ class Portal:
if alias: if alias:
# TODO properly handle existing room aliases # TODO properly handle existing room aliases
intent.remove_room_alias(alias) intent.remove_room_alias(alias)
room = intent.create_room(alias=alias, is_public=public, invitees=invites or [], room = await intent.create_room(alias=alias, is_public=public, invitees=invites or [],
name=self.title, is_direct=direct) name=self.title, is_direct=direct)
if not room: if not room:
raise Exception(f"Failed to create room for {self.tgid_log}") raise Exception(f"Failed to create room for {self.tgid_log}")
@@ -204,93 +215,93 @@ class Portal:
self.save() self.save()
power_level_requirement = 0 if self.peer_type == "chat" and entity.admins_enabled else 50 power_level_requirement = 0 if self.peer_type == "chat" and entity.admins_enabled else 50
levels = self.main_intent.get_power_levels(self.mxid) levels = await self.main_intent.get_power_levels(self.mxid)
levels["ban"] = 100 levels["ban"] = 100
levels["invite"] = 50 levels["invite"] = 50
levels["events"]["m.room.name"] = power_level_requirement levels["events"]["m.room.name"] = power_level_requirement
levels["events"]["m.room.avatar"] = power_level_requirement levels["events"]["m.room.avatar"] = power_level_requirement
levels["events"]["m.room.topic"] = 50 if self.peer_type == "channel" else 100 levels["events"]["m.room.topic"] = 50 if self.peer_type == "channel" else 100
levels["events"]["m.room.power_levels"] = 75 levels["events"]["m.room.power_levels"] = 75
self.main_intent.set_power_levels(self.mxid, levels) await self.main_intent.set_power_levels(self.mxid, levels)
self.update_after_create(user, entity, direct, puppet) await self.update_after_create(user, entity, direct, puppet)
def _get_room_alias(self, username=None): def _get_room_alias(self, username=None):
username = username or self.username username = username or self.username
return config.get("bridge.alias_template", "telegram_{groupname}").format( return config.get("bridge.alias_template", "telegram_{groupname}").format(
groupname=username) groupname=username)
def sync_telegram_users(self, source, users): async def sync_telegram_users(self, source, users):
for entity in users: for entity in users:
puppet = p.Puppet.get(entity.id) puppet = p.Puppet.get(entity.id)
puppet.update_info(source, entity) await puppet.intent.ensure_joined(self.mxid)
puppet.intent.ensure_joined(self.mxid) await puppet.update_info(source, entity)
def add_telegram_user(self, user_id, source=None): async def add_telegram_user(self, user_id, source=None):
puppet = p.Puppet.get(user_id) puppet = p.Puppet.get(user_id)
if source: if source:
entity = source.client.get_entity(user_id) entity = await source.client.get_entity(user_id)
puppet.update_info(source, entity) await puppet.update_info(source, entity)
puppet.intent.join_room(self.mxid) await puppet.intent.join_room(self.mxid)
user = u.User.get_by_tgid(user_id) user = u.User.get_by_tgid(user_id)
if user: if user:
self.main_intent.invite(self.mxid, user.mxid) await self.main_intent.invite(self.mxid, user.mxid)
def delete_telegram_user(self, user_id, kick_message=None): async def delete_telegram_user(self, user_id, kick_message=None):
puppet = p.Puppet.get(user_id) puppet = p.Puppet.get(user_id)
user = u.User.get_by_tgid(user_id) user = u.User.get_by_tgid(user_id)
if kick_message: if kick_message:
self.main_intent.kick(self.mxid, puppet.mxid, kick_message) await self.main_intent.kick(self.mxid, puppet.mxid, kick_message)
else: else:
puppet.intent.leave_room(self.mxid) await puppet.intent.leave_room(self.mxid)
if user: if user:
self.main_intent.kick(self.mxid, user.mxid, kick_message or "Left Telegram chat") await self.main_intent.kick(self.mxid, user.mxid, kick_message or "Left Telegram chat")
def update_info(self, user, entity=None): async def update_info(self, user, entity=None):
if self.peer_type == "user": if self.peer_type == "user":
self.log.warn(f"Called update_info() for direct chat portal {self.tgid_log}") self.log.warning(f"Called update_info() for direct chat portal {self.tgid_log}")
return return
self.log.debug(f"Updating info of {self.tgid_log}") self.log.debug(f"Updating info of {self.tgid_log}")
if not entity: if not entity:
entity = user.client.get_entity(self.peer) entity = await user.client.get_entity(self.peer)
self.log.debug("Fetched data: %s", entity) self.log.debug("Fetched data: %s", entity)
changed = False changed = False
if self.peer_type == "channel": if self.peer_type == "channel":
changed = self.update_username(entity.username) or changed changed = await self.update_username(entity.username) or changed
# TODO update about text # TODO update about text
# changed = self.update_about(entity.about) or changed # changed = self.update_about(entity.about) or changed
changed = self.update_title(entity.title) or changed changed = await self.update_title(entity.title) or changed
if isinstance(entity.photo, ChatPhoto): if isinstance(entity.photo, ChatPhoto):
changed = self.update_avatar(user, entity.photo.photo_big) or changed changed = await self.update_avatar(user, entity.photo.photo_big) or changed
if changed: if changed:
self.save() self.save()
def update_username(self, username): async def update_username(self, username):
if self.username != username: if self.username != username:
if self.username: if self.username:
self.main_intent.remove_room_alias(self._get_room_alias()) await self.main_intent.remove_room_alias(self._get_room_alias())
self.username = username or None self.username = username or None
if self.username: if self.username:
self.main_intent.add_room_alias(self.mxid, self._get_room_alias()) await self.main_intent.add_room_alias(self.mxid, self._get_room_alias())
return True return True
return False return False
def update_about(self, about): async def update_about(self, about):
if self.about != about: if self.about != about:
self.about = about self.about = about
self.main_intent.set_room_topic(self.mxid, self.about) await self.main_intent.set_room_topic(self.mxid, self.about)
return True return True
return False return False
def update_title(self, title): async def update_title(self, title):
if self.title != title: if self.title != title:
self.title = title self.title = title
self.main_intent.set_room_name(self.mxid, self.title) await self.main_intent.set_room_name(self.mxid, self.title)
return True return True
return False return False
@@ -299,26 +310,26 @@ class Portal:
return max(photo.sizes, key=(lambda photo2: ( return max(photo.sizes, key=(lambda photo2: (
len(photo2.bytes) if isinstance(photo2, PhotoCachedSize) else photo2.size))) len(photo2.bytes) if isinstance(photo2, PhotoCachedSize) else photo2.size)))
def update_avatar(self, user, photo): async def update_avatar(self, user, photo):
photo_id = f"{photo.volume_id}-{photo.local_id}" photo_id = f"{photo.volume_id}-{photo.local_id}"
if self.photo_id != photo_id: if self.photo_id != photo_id:
try: try:
file = user.client.download_file_bytes(photo) file = await user.client.download_file_bytes(photo)
except LocationInvalidError: except LocationInvalidError:
return False return False
uploaded = self.main_intent.upload_file(file) uploaded = await self.main_intent.upload_file(file)
self.main_intent.set_room_avatar(self.mxid, uploaded["content_uri"]) await self.main_intent.set_room_avatar(self.mxid, uploaded["content_uri"])
self.photo_id = photo_id self.photo_id = photo_id
return True return True
return False return False
def get_users(self, user, entity): async def get_users(self, user, entity):
if self.peer_type == "chat": if self.peer_type == "chat":
chat = user.client(GetFullChatRequest(chat_id=self.tgid)) chat = await user.client(GetFullChatRequest(chat_id=self.tgid))
return chat.users, chat.full_chat.participants.participants return chat.users, chat.full_chat.participants.participants
elif self.peer_type == "channel": elif self.peer_type == "channel":
try: try:
participants = user.client(GetParticipantsRequest( participants = await user.client(GetParticipantsRequest(
entity, ChannelParticipantsRecent(), offset=0, limit=100, hash=0 entity, ChannelParticipantsRecent(), offset=0, limit=100, hash=0
)) ))
return participants.users, participants.participants return participants.users, participants.participants
@@ -327,16 +338,16 @@ class Portal:
elif self.peer_type == "user": elif self.peer_type == "user":
return [entity], [] return [entity], []
def get_invite_link(self, user): async def get_invite_link(self, user):
if self.peer_type == "user": if self.peer_type == "user":
raise ValueError("You can't invite users to private chats.") raise ValueError("You can't invite users to private chats.")
elif self.peer_type == "chat": elif self.peer_type == "chat":
link = user.client(ExportChatInviteRequest(chat_id=self.tgid)) link = await user.client(ExportChatInviteRequest(chat_id=self.tgid))
elif self.peer_type == "channel": elif self.peer_type == "channel":
if self.username: if self.username:
return f"https://t.me/{self.username}" return f"https://t.me/{self.username}"
link = user.client( link = await user.client(
ExportInviteRequest(channel=self.get_input_entity(user))) ExportInviteRequest(channel=await self.get_input_entity(user)))
else: else:
raise ValueError(f"Invalid peer type '{self.peer_type}' for invite link.") raise ValueError(f"Invalid peer type '{self.peer_type}' for invite link.")
@@ -360,48 +371,47 @@ class Portal:
file_name = f"matrix_upload{mimetypes.guess_extension(mime)}" file_name = f"matrix_upload{mimetypes.guess_extension(mime)}"
return file_name, None if file_name == body else body return file_name, None if file_name == body else body
def leave_matrix(self, user, source): async def leave_matrix(self, user, source):
if self.peer_type == "user": if self.peer_type == "user":
self.main_intent.leave_room(self.mxid) await self.main_intent.leave_room(self.mxid)
self.delete() self.delete()
del self.by_tgid[self.tgid_full] del self.by_tgid[self.tgid_full]
del self.by_mxid[self.mxid] del self.by_mxid[self.mxid]
elif source and source.tgid != user.tgid: elif source and source.tgid != user.tgid:
target = user.get_input_entity(source) target = await user.get_input_entity(source)
if self.peer_type == "chat": if self.peer_type == "chat":
source.client(DeleteChatUserRequest(chat_id=self.tgid, user_id=target)) await source.client(DeleteChatUserRequest(chat_id=self.tgid, user_id=target))
else: else:
channel = self.get_input_entity(source) channel = await self.get_input_entity(source)
rights = ChannelBannedRights(datetime.fromtimestamp(0), True) rights = ChannelBannedRights(datetime.fromtimestamp(0), True)
source.client(EditBannedRequest(channel=channel, await source.client(EditBannedRequest(channel=channel,
user_id=target, user_id=target,
banned_rights=rights)) banned_rights=rights))
elif self.peer_type == "chat": elif self.peer_type == "chat":
user.client(DeleteChatUserRequest(chat_id=self.tgid, user_id=InputUserSelf())) await user.client(DeleteChatUserRequest(chat_id=self.tgid, user_id=InputUserSelf()))
elif self.peer_type == "channel": elif self.peer_type == "channel":
channel = self.get_input_entity(user) channel = await self.get_input_entity(user)
user.client(LeaveChannelRequest(channel=channel)) await user.client(LeaveChannelRequest(channel=channel))
def handle_matrix_message(self, sender, message, event_id): async def handle_matrix_message(self, sender, message, event_id):
type = message["msgtype"] type = message["msgtype"]
if type in {"m.text", "m.emote"}: if type in {"m.text", "m.emote"}:
if "format" in message and message["format"] == "org.matrix.custom.html": if "format" in message and message["format"] == "org.matrix.custom.html":
space = self.tgid if self.peer_type == "channel" else sender.tgid space = self.tgid if self.peer_type == "channel" else sender.tgid
print(sender.username, sender.tgid, space)
message, entities = formatter.matrix_to_telegram(message["formatted_body"], space) message, entities = formatter.matrix_to_telegram(message["formatted_body"], space)
if type == "m.emote": if type == "m.emote":
message = "/me " + message message = "/me " + message
reply_to = None reply_to = None
if len(entities) > 0 and isinstance(entities[0], formatter.MessageEntityReply): if len(entities) > 0 and isinstance(entities[0], formatter.MessageEntityReply):
reply_to = entities.pop(0).msg_id reply_to = entities.pop(0).msg_id
response = sender.client.send_message(self.peer, message, entities=entities, response = await sender.client.send_message(self.peer, message, entities=entities,
reply_to=reply_to) reply_to=reply_to)
else: else:
if type == "m.emote": if type == "m.emote":
message["body"] = "/me " + message["body"] message["body"] = "/me " + message["body"]
response = sender.client.send_message(self.peer, message["body"]) response = await sender.client.send_message(self.peer, message["body"])
elif type in {"m.image", "m.file", "m.audio", "m.video"}: elif type in {"m.image", "m.file", "m.audio", "m.video"}:
file = self.main_intent.download_file(message["url"]) file = await self.main_intent.download_file(message["url"])
info = message["info"] info = message["info"]
mime = info["mimetype"] mime = info["mimetype"]
@@ -412,8 +422,8 @@ class Portal:
if "w" in info and "h" in info: if "w" in info and "h" in info:
attributes.append(DocumentAttributeImageSize(w=info["w"], h=info["h"])) attributes.append(DocumentAttributeImageSize(w=info["w"], h=info["h"]))
response = sender.client.send_file(self.peer, file, mime, caption, attributes, response = await sender.client.send_file(self.peer, file, mime, caption, attributes,
file_name) file_name)
else: else:
self.log.debug("Unhandled Matrix event: %s", message) self.log.debug("Unhandled Matrix event: %s", message)
return return
@@ -426,16 +436,16 @@ class Portal:
mxid=event_id)) mxid=event_id))
self.db.commit() self.db.commit()
def handle_matrix_deletion(self, deleter, event_id): async def handle_matrix_deletion(self, deleter, event_id):
space = self.tgid if self.peer_type == "channel" else deleter.tgid space = self.tgid if self.peer_type == "channel" else deleter.tgid
message = DBMessage.query.filter(DBMessage.mxid == event_id and message = DBMessage.query.filter(DBMessage.mxid == event_id and
DBMessage.tg_space == space and DBMessage.tg_space == space and
DBMessage.mx_room == self.mxid).one_or_none() DBMessage.mx_room == self.mxid).one_or_none()
if not message: if not message:
return return
deleter.client.delete_messages(self.peer, [message.tgid]) await deleter.client.delete_messages(self.peer, [message.tgid])
def handle_matrix_power_levels(self, sender, new_users, old_users): async def handle_matrix_power_levels(self, sender, new_users, old_users):
# TODO handle all power level changes and bridge exact admin rights to supergroups/channels # TODO handle all power level changes and bridge exact admin rights to supergroups/channels
for user, level in new_users.items(): for user, level in new_users.items():
user_id = p.Puppet.get_id_from_mxid(user) user_id = p.Puppet.get_id_from_mxid(user)
@@ -446,7 +456,7 @@ class Portal:
user_id = mx_user.tgid user_id = mx_user.tgid
if user not in old_users or level != old_users[user]: if user not in old_users or level != old_users[user]:
if self.peer_type == "chat": if self.peer_type == "chat":
sender.client(EditChatAdminRequest( await sender.client(EditChatAdminRequest(
chat_id=self.tgid, user_id=user_id, is_admin=level >= 50)) chat_id=self.tgid, user_id=user_id, is_admin=level >= 50))
elif self.peer_type == "channel": elif self.peer_type == "channel":
moderator = level >= 50 moderator = level >= 50
@@ -456,47 +466,48 @@ class Portal:
ban_users=moderator, invite_users=moderator, ban_users=moderator, invite_users=moderator,
invite_link=moderator, pin_messages=moderator, invite_link=moderator, pin_messages=moderator,
add_admins=admin, manage_call=moderator) add_admins=admin, manage_call=moderator)
sender.client( await sender.client(
EditAdminRequest(channel=self.get_input_entity(sender), EditAdminRequest(channel=await self.get_input_entity(sender),
user_id=sender.client.get_input_entity(PeerUser(user_id)), user_id=await sender.client.get_input_entity(
PeerUser(user_id)),
admin_rights=rights)) admin_rights=rights))
def handle_matrix_about(self, sender, about): async def handle_matrix_about(self, sender, about):
if self.peer_type not in {"channel"}: if self.peer_type not in {"channel"}:
return return
channel = self.get_input_entity(sender) channel = await self.get_input_entity(sender)
sender.client(EditAboutRequest(channel=channel, about=about)) await sender.client(EditAboutRequest(channel=channel, about=about))
self.about = about self.about = about
self.save() self.save()
def handle_matrix_title(self, sender, title): async def handle_matrix_title(self, sender, title):
if self.peer_type not in {"chat", "channel"}: if self.peer_type not in {"chat", "channel"}:
return return
if self.peer_type == "chat": if self.peer_type == "chat":
sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title)) await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title))
else: else:
channel = self.get_input_entity(sender) channel = await self.get_input_entity(sender)
sender.client(EditTitleRequest(channel=channel, title=title)) await sender.client(EditTitleRequest(channel=channel, title=title))
self.title = title self.title = title
self.save() self.save()
def handle_matrix_avatar(self, sender, url): async def handle_matrix_avatar(self, sender, url):
if self.peer_type not in {"chat", "channel"}: if self.peer_type not in {"chat", "channel"}:
# Invalid peer type # Invalid peer type
return return
file = self.main_intent.download_file(url) file = await self.main_intent.download_file(url)
mime = magic.from_buffer(file, mime=True) mime = magic.from_buffer(file, mime=True)
ext = mimetypes.guess_extension(mime) ext = mimetypes.guess_extension(mime)
uploaded = sender.client.upload_file(file, file_name=f"avatar{ext}") uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}")
photo = InputChatUploadedPhoto(file=uploaded) photo = InputChatUploadedPhoto(file=uploaded)
if self.peer_type == "chat": if self.peer_type == "chat":
updates = sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo)) updates = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo))
else: else:
channel = self.get_input_entity(sender) channel = await self.get_input_entity(sender)
updates = sender.client(EditPhotoRequest(channel=channel, photo=photo)) updates = await sender.client(EditPhotoRequest(channel=channel, photo=photo))
for update in updates.updates: for update in updates.updates:
is_photo_update = (isinstance(update, UpdateNewMessage) is_photo_update = (isinstance(update, UpdateNewMessage)
and isinstance(update.message, MessageService) and isinstance(update.message, MessageService)
@@ -510,9 +521,9 @@ class Portal:
# endregion # endregion
# region Telegram chat info updating # region Telegram chat info updating
def _get_telegram_users_in_matrix_room(self): async def _get_telegram_users_in_matrix_room(self):
user_tgids = set() user_tgids = set()
user_mxids = self.main_intent.get_room_members(self.mxid, ("join", "invite")) user_mxids = await self.main_intent.get_room_members(self.mxid, ("join", "invite"))
for user in user_mxids: for user in user_mxids:
if user == self.az.intent.mxid: if user == self.az.intent.mxid:
continue continue
@@ -524,11 +535,11 @@ class Portal:
user_tgids.add(puppet_id) user_tgids.add(puppet_id)
return user_tgids return user_tgids
def upgrade_telegram_chat(self, source): async def upgrade_telegram_chat(self, source):
if self.peer_type != "chat": if self.peer_type != "chat":
raise ValueError("Only normal group chats are upgradable to supergroups.") raise ValueError("Only normal group chats are upgradable to supergroups.")
updates = source.client(MigrateChatRequest(chat_id=self.tgid)) updates = await source.client(MigrateChatRequest(chat_id=self.tgid))
entity = None entity = None
for chat in updates.chats: for chat in updates.chats:
if isinstance(chat, Channel): if isinstance(chat, Channel):
@@ -538,67 +549,71 @@ class Portal:
raise ValueError("Upgrade may have failed: output channel not found.") raise ValueError("Upgrade may have failed: output channel not found.")
self.peer_type = "channel" self.peer_type = "channel"
self.migrate_and_save(entity.id) self.migrate_and_save(entity.id)
self.update_info(source, entity) await self.update_info(source, entity)
def set_telegram_username(self, source, username): async def set_telegram_username(self, source, username):
if self.peer_type != "channel": if self.peer_type != "channel":
raise ValueError("Only channels and supergroups have usernames.") raise ValueError("Only channels and supergroups have usernames.")
success = source.client(UpdateUsernameRequest(self.get_input_entity(source), username)) await source.client(
if self.update_username(username): UpdateUsernameRequest(await self.get_input_entity(source), username))
if await self.update_username(username):
self.save() self.save()
def create_telegram_chat(self, source, supergroup=False): async def create_telegram_chat(self, source, supergroup=False):
if not self.mxid: if not self.mxid:
raise ValueError("Can't create Telegram chat for portal without Matrix room.") raise ValueError("Can't create Telegram chat for portal without Matrix room.")
elif self.tgid: elif self.tgid:
raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.") raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.")
invites = self._get_telegram_users_in_matrix_room() invites = await self._get_telegram_users_in_matrix_room()
if len(invites) < 2: if len(invites) < 2:
# TODO[waiting-for-bots] This won't happen when the bot is enabled # TODO[waiting-for-bots] This won't happen when the bot is enabled
raise ValueError("Not enough Telegram users to create a chat") raise ValueError("Not enough Telegram users to create a chat")
invites = [source.client.get_input_entity(id) for id in invites] invites = [await source.client.get_input_entity(id) for id in invites]
if self.peer_type == "chat": if self.peer_type == "chat":
updates = source.client(CreateChatRequest(title=self.title, users=invites)) updates = await source.client(CreateChatRequest(title=self.title, users=invites))
entity = updates.chats[0] entity = updates.chats[0]
elif self.peer_type == "channel": elif self.peer_type == "channel":
updates = source.client(CreateChannelRequest(title=self.title, about=self.about or "", updates = await source.client(CreateChannelRequest(title=self.title,
megagroup=supergroup)) about=self.about or "",
megagroup=supergroup))
entity = updates.chats[0] entity = updates.chats[0]
source.client(InviteToChannelRequest(channel=source.client.get_input_entity(entity), await source.client(InviteToChannelRequest(
users=invites)) channel=await source.client.get_input_entity(entity),
users=invites))
else: else:
raise ValueError("Invalid peer type for Telegram chat creation") raise ValueError("Invalid peer type for Telegram chat creation")
self.tgid = entity.id self.tgid = entity.id
self.tg_receiver = self.tgid self.tg_receiver = self.tgid
self.by_tgid[self.tgid_full] = self self.by_tgid[self.tgid_full] = self
self.update_info(source, entity) await self.update_info(source, entity)
self.save() self.save()
def invite_telegram(self, source, puppet): async def invite_telegram(self, source, puppet):
if self.peer_type == "chat": if self.peer_type == "chat":
source.client(AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0)) await source.client(
AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0))
elif self.peer_type == "channel": elif self.peer_type == "channel":
target = puppet.get_input_entity(source) target = await puppet.get_input_entity(source)
source.client(InviteToChannelRequest(channel=self.peer, users=[target])) await source.client(InviteToChannelRequest(channel=self.peer, users=[target]))
else: else:
raise ValueError("Invalid peer type for Telegram user invite") raise ValueError("Invalid peer type for Telegram user invite")
# endregion # endregion
# region Telegram event handling # region Telegram event handling
def handle_telegram_typing(self, user, event): async def handle_telegram_typing(self, user, event):
if self.mxid: if self.mxid:
user.intent.set_typing(self.mxid, is_typing=True) await user.intent.set_typing(self.mxid, is_typing=True)
def handle_telegram_photo(self, source, sender, media): async def handle_telegram_photo(self, source, sender, media):
largest_size = self._get_largest_photo_size(media.photo) largest_size = self._get_largest_photo_size(media.photo)
file = source.client.download_file_bytes(largest_size.location) file = await source.client.download_file_bytes(largest_size.location)
mime_type = magic.from_buffer(file, mime=True) mime_type = magic.from_buffer(file, mime=True)
uploaded = sender.intent.upload_file(file, mime_type) uploaded = await sender.intent.upload_file(file, mime_type)
info = { info = {
"h": largest_size.h, "h": largest_size.h,
"w": largest_size.w, "w": largest_size.w,
@@ -608,8 +623,9 @@ class Portal:
"mimetype": mime_type, "mimetype": mime_type,
} }
name = media.caption name = media.caption
sender.intent.set_typing(self.mxid, is_typing=False) await sender.intent.set_typing(self.mxid, is_typing=False)
return sender.intent.send_image(self.mxid, uploaded["content_uri"], info=info, text=name) return await sender.intent.send_image(self.mxid, uploaded["content_uri"], info=info,
text=name)
def convert_webp(self, file, to="png"): def convert_webp(self, file, to="png"):
try: try:
@@ -621,14 +637,14 @@ class Portal:
self.log.exception(f"Failed to convert webp to {to}") self.log.exception(f"Failed to convert webp to {to}")
return "image/webp", file return "image/webp", file
def handle_telegram_document(self, source, sender, media): async def handle_telegram_document(self, source, sender, media):
file = source.client.download_file_bytes(media.document) file = await source.client.download_file_bytes(media.document)
mime_type = magic.from_buffer(file, mime=True) mime_type = magic.from_buffer(file, mime=True)
dont_change_mime = False dont_change_mime = False
if mime_type == "image/webp": if mime_type == "image/webp":
mime_type, file = self.convert_webp(file, to="png") mime_type, file = self.convert_webp(file, to="png")
dont_change_mime = True dont_change_mime = True
uploaded = sender.intent.upload_file(file, mime_type) uploaded = await sender.intent.upload_file(file, mime_type)
name = media.caption name = media.caption
for attr in media.document.attributes: for attr in media.document.attributes:
if not name and isinstance(attr, DocumentAttributeFilename): if not name and isinstance(attr, DocumentAttributeFilename):
@@ -650,9 +666,9 @@ class Portal:
type = "m.audio" type = "m.audio"
elif mime_type.startswith("image/"): elif mime_type.startswith("image/"):
type = "m.image" type = "m.image"
sender.intent.set_typing(self.mxid, is_typing=False) await sender.intent.set_typing(self.mxid, is_typing=False)
return sender.intent.send_file(self.mxid, uploaded["content_uri"], info=info, text=name, return await sender.intent.send_file(self.mxid, uploaded["content_uri"], info=info,
file_type=type) text=name, file_type=type)
def handle_telegram_location(self, source, sender, location): def handle_telegram_location(self, source, sender, location):
long = location.long long = location.long
@@ -679,18 +695,18 @@ class Portal:
"formatted_body": formatted_body, "formatted_body": formatted_body,
}) })
def handle_telegram_text(self, source, sender, evt): async def handle_telegram_text(self, source, sender, evt):
self.log.debug(f"Sending {evt.message} to {self.mxid} by {sender.id}") self.log.debug(f"Sending {evt.message} to {self.mxid} by {sender.id}")
text, html = formatter.telegram_event_to_matrix(evt, source, text, html = await formatter.telegram_event_to_matrix(evt, source,
config["bridge.native_replies"], config["bridge.native_replies"],
config["bridge.link_in_reply"], config["bridge.link_in_reply"],
self.main_intent) self.main_intent)
sender.intent.set_typing(self.mxid, is_typing=False) await sender.intent.set_typing(self.mxid, is_typing=False)
return sender.intent.send_text(self.mxid, text, html=html) return await sender.intent.send_text(self.mxid, text, html=html)
def handle_telegram_message(self, source, sender, evt): async def handle_telegram_message(self, source, sender, evt):
if not self.mxid: if not self.mxid:
self.create_matrix_room(source, invites=[source.mxid]) await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False)
tg_space = self.tgid if self.peer_type == "channel" else source.tgid tg_space = self.tgid if self.peer_type == "channel" else source.tgid
@@ -705,14 +721,14 @@ class Portal:
return return
if evt.message: if evt.message:
response = self.handle_telegram_text(source, sender, evt) response = await self.handle_telegram_text(source, sender, evt)
elif evt.media: elif evt.media:
if isinstance(evt.media, MessageMediaPhoto): if isinstance(evt.media, MessageMediaPhoto):
response = self.handle_telegram_photo(source, sender, evt.media) response = await self.handle_telegram_photo(source, sender, evt.media)
elif isinstance(evt.media, MessageMediaDocument): elif isinstance(evt.media, MessageMediaDocument):
response = self.handle_telegram_document(source, sender, evt.media) response = await self.handle_telegram_document(source, sender, evt.media)
elif isinstance(evt.media, MessageMediaGeo): elif isinstance(evt.media, MessageMediaGeo):
response = self.handle_telegram_location(source, sender, evt.media.geo) response = await self.handle_telegram_location(source, sender, evt.media.geo)
else: else:
self.log.debug("Unhandled Telegram media: %s", evt.media) self.log.debug("Unhandled Telegram media: %s", evt.media)
return return
@@ -728,58 +744,58 @@ class Portal:
self.db.add(DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=mxid, tg_space=tg_space)) self.db.add(DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=mxid, tg_space=tg_space))
self.db.commit() self.db.commit()
def handle_telegram_action(self, source, sender, action): async def handle_telegram_action(self, source, sender, action):
if not self.mxid: if not self.mxid:
create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate) create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate)
create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink) create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink)
if isinstance(action, create_and_exit + create_and_continue): if isinstance(action, create_and_exit + create_and_continue):
self.create_matrix_room(source, invites=[source.mxid]) await self.create_matrix_room(source, invites=[source.mxid],
update_if_exists=isinstance(action, create_and_exit))
if not isinstance(action, create_and_continue): if not isinstance(action, create_and_continue):
return return
# TODO figure out how to see changes to about text / channel username # TODO figure out how to see changes to about text / channel username
if isinstance(action, MessageActionChatEditTitle): if isinstance(action, MessageActionChatEditTitle):
if self.update_title(action.title): if await self.update_title(action.title):
self.save() self.save()
elif isinstance(action, MessageActionChatEditPhoto): elif isinstance(action, MessageActionChatEditPhoto):
largest_size = self._get_largest_photo_size(action.photo) largest_size = self._get_largest_photo_size(action.photo)
if self.update_avatar(source, largest_size.location): if await self.update_avatar(source, largest_size.location):
self.save() self.save()
elif isinstance(action, MessageActionChatAddUser): elif isinstance(action, MessageActionChatAddUser):
for user_id in action.users: for user_id in action.users:
self.add_telegram_user(user_id, source) await self.add_telegram_user(user_id, source)
elif isinstance(action, MessageActionChatJoinedByLink): elif isinstance(action, MessageActionChatJoinedByLink):
self.add_telegram_user(sender.id, source) await self.add_telegram_user(sender.id, source)
elif isinstance(action, MessageActionChatDeleteUser): elif isinstance(action, MessageActionChatDeleteUser):
kick_message = None kick_message = (f"Kicked by {sender.displayname}"
if sender.id != action.user_id: if sender.id != action.user_id else None)
kick_message = f"Kicked by {sender.displayname}" await self.delete_telegram_user(action.user_id, kick_message)
self.delete_telegram_user(action.user_id, kick_message)
elif isinstance(action, MessageActionChatMigrateTo): elif isinstance(action, MessageActionChatMigrateTo):
self.peer_type = "channel" self.peer_type = "channel"
self.migrate_and_save(action.channel_id) self.migrate_and_save(action.channel_id)
sender.intent.send_emote(self.mxid, "upgraded this group to a supergroup.") await sender.intent.send_emote(self.mxid, "upgraded this group to a supergroup.")
else: else:
self.log.debug("Unhandled Telegram action in %s: %s", self.title, action) self.log.debug("Unhandled Telegram action in %s: %s", self.title, action)
def set_telegram_admin(self, puppet, user): async def set_telegram_admin(self, puppet, user):
levels = self.main_intent.get_power_levels(self.mxid) levels = await self.main_intent.get_power_levels(self.mxid)
if user: if user:
levels["users"][user.mxid] = 50 levels["users"][user.mxid] = 50
if puppet: if puppet:
levels["users"][puppet.mxid] = 50 levels["users"][puppet.mxid] = 50
self.main_intent.set_power_levels(self.mxid, levels) await self.main_intent.set_power_levels(self.mxid, levels)
def update_telegram_pin(self, source, id): async def update_telegram_pin(self, source, id):
space = self.tgid if self.peer_type == "channel" else source.tgid space = self.tgid if self.peer_type == "channel" else source.tgid
message = DBMessage.query.get((id, space)) message = DBMessage.query.get((id, space))
if message: if message:
self.main_intent.set_pinned_messages(self.mxid, [message.mxid]) await self.main_intent.set_pinned_messages(self.mxid, [message.mxid])
else: else:
self.main_intent.set_pinned_messages(self.mxid, []) await self.main_intent.set_pinned_messages(self.mxid, [])
def update_telegram_participants(self, participants): async def update_telegram_participants(self, participants):
levels = self.main_intent.get_power_levels(self.mxid) levels = await self.main_intent.get_power_levels(self.mxid)
changed = False changed = False
admin_power_level = 75 if self.peer_type == "channel" else 50 admin_power_level = 75 if self.peer_type == "channel" else 50
@@ -800,7 +816,7 @@ class Portal:
if user: if user:
user_level_defined = user.mxid in user_levels user_level_defined = user.mxid in user_levels
user_has_right_level = (user_levels[user.mxid] != new_level user_has_right_level = (user_levels[user.mxid] == new_level
if user_level_defined else new_level == 0) if user_level_defined else new_level == 0)
if not user_has_right_level: if not user_has_right_level:
levels["users"][user.mxid] = new_level levels["users"][user.mxid] = new_level
@@ -808,22 +824,22 @@ class Portal:
if puppet: if puppet:
puppet_level_defined = puppet.mxid in user_levels puppet_level_defined = puppet.mxid in user_levels
puppet_has_right_level = (user_levels[puppet.mxid] != new_level puppet_has_right_level = (user_levels[puppet.mxid] == new_level
if puppet_level_defined else new_level == 0) if puppet_level_defined else new_level == 0)
if not puppet_has_right_level: if not puppet_has_right_level:
levels["users"][puppet.mxid] = new_level levels["users"][puppet.mxid] = new_level
changed = True changed = True
if changed: if changed:
self.main_intent.set_power_levels(self.mxid, levels) await self.main_intent.set_power_levels(self.mxid, levels)
def set_telegram_admins_enabled(self, enabled): async def set_telegram_admins_enabled(self, enabled):
level = 50 if enabled else 10 level = 50 if enabled else 10
levels = self.main_intent.get_power_levels(self.mxid) levels = await self.main_intent.get_power_levels(self.mxid)
levels["invite"] = level levels["invite"] = level
levels["events"]["m.room.name"] = level levels["events"]["m.room.name"] = level
levels["events"]["m.room.avatar"] = level levels["events"]["m.room.avatar"] = level
self.main_intent.set_power_levels(self.mxid, levels) await self.main_intent.set_power_levels(self.mxid, levels)
# endregion # endregion
# region Database conversion # region Database conversion
@@ -839,7 +855,7 @@ class Portal:
def migrate_and_save(self, new_id): def migrate_and_save(self, new_id):
existing = DBPortal.query.get(self.tgid_full) existing = DBPortal.query.get(self.tgid_full)
if existing: if existing:
self.db.object_session(existing).delete(existing) self.db.delete(existing)
try: try:
del self.by_tgid[self.tgid_full] del self.by_tgid[self.tgid_full]
except KeyError: except KeyError:
@@ -933,4 +949,4 @@ class Portal:
def init(context): def init(context):
global config global config
Portal.az, Portal.db, config = context Portal.az, Portal.db, config, _ = context
+10 -10
View File
@@ -91,35 +91,35 @@ class Puppet:
return config.get("bridge.displayname_template", "{displayname} (Telegram)").format( return config.get("bridge.displayname_template", "{displayname} (Telegram)").format(
displayname=name) displayname=name)
def update_info(self, source, info): async def update_info(self, source, info):
changed = False changed = False
if self.username != info.username: if self.username != info.username:
self.username = info.username self.username = info.username
changed = True changed = True
changed = self.update_displayname(source, info) or changed changed = await self.update_displayname(source, info) or changed
if isinstance(info.photo, UserProfilePhoto): if isinstance(info.photo, UserProfilePhoto):
changed = self.update_avatar(source, info.photo.photo_big) changed = await self.update_avatar(source, info.photo.photo_big)
if changed: if changed:
self.save() self.save()
def update_displayname(self, source, info): async def update_displayname(self, source, info):
displayname = self.get_displayname(info) displayname = self.get_displayname(info)
if displayname != self.displayname: if displayname != self.displayname:
self.intent.set_display_name(displayname) await self.intent.set_display_name(displayname)
self.displayname = displayname self.displayname = displayname
return True return True
def update_avatar(self, source, photo): async def update_avatar(self, source, photo):
photo_id = f"{photo.volume_id}-{photo.local_id}" photo_id = f"{photo.volume_id}-{photo.local_id}"
if self.photo_id != photo_id: if self.photo_id != photo_id:
try: try:
file = source.client.download_file_bytes(photo) file = await source.client.download_file_bytes(photo)
except LocationInvalidError: except LocationInvalidError:
return False return False
uploaded = self.intent.upload_file(file) uploaded = await self.intent.upload_file(file)
self.intent.set_avatar(uploaded["content_uri"]) await self.intent.set_avatar(uploaded["content_uri"])
self.photo_id = photo_id self.photo_id = photo_id
return True return True
return False return False
@@ -170,7 +170,7 @@ class Puppet:
def init(context): def init(context):
global config global config
Puppet.az, Puppet.db, config = context Puppet.az, Puppet.db, config, _ = context
localpart = config.get("bridge.username_template", "telegram_{userid}").format(userid="(.+)") localpart = config.get("bridge.username_template", "telegram_{userid}").format(userid="(.+)")
hs = config["homeserver"]["domain"] hs = config["homeserver"]["domain"]
Puppet.mxid_regex = re.compile(f"@{localpart}:{hs}") Puppet.mxid_regex = re.compile(f"@{localpart}:{hs}")
+9 -9
View File
@@ -22,8 +22,8 @@ from telethon.tl.types import *
class MautrixTelegramClient(TelegramClient): class MautrixTelegramClient(TelegramClient):
def send_message(self, entity, message, reply_to=None, entities=None, link_preview=True): async def send_message(self, entity, message, reply_to=None, entities=None, link_preview=True):
entity = self.get_input_entity(entity) entity = await self.get_input_entity(entity)
request = SendMessageRequest( request = SendMessageRequest(
peer=entity, peer=entity,
@@ -32,7 +32,7 @@ class MautrixTelegramClient(TelegramClient):
no_webpage=not link_preview, no_webpage=not link_preview,
reply_to_msg_id=self._get_reply_to(reply_to) reply_to_msg_id=self._get_reply_to(reply_to)
) )
result = self(request) result = await self(request)
if isinstance(result, UpdateShortSentMessage): if isinstance(result, UpdateShortSentMessage):
return Message( return Message(
id=result.id, id=result.id,
@@ -46,12 +46,12 @@ class MautrixTelegramClient(TelegramClient):
return self._get_response_message(request, result) return self._get_response_message(request, result)
def send_file(self, entity, file, mime_type=None, caption=None, attributes=None, file_name=None, async def send_file(self, entity, file, mime_type=None, caption=None, attributes=None, file_name=None,
reply_to=None, **kwargs): reply_to=None, **kwargs):
entity = self.get_input_entity(entity) entity = await self.get_input_entity(entity)
reply_to = self._get_reply_to(reply_to) reply_to = self._get_reply_to(reply_to)
file_handle = self.upload_file(file, file_name=file_name, use_cache=False) file_handle = await self.upload_file(file, file_name=file_name, use_cache=False)
if mime_type == "image/png": if mime_type == "image/png":
media = InputMediaUploadedPhoto(file_handle, caption or "") media = InputMediaUploadedPhoto(file_handle, caption or "")
@@ -66,9 +66,9 @@ class MautrixTelegramClient(TelegramClient):
caption=caption or "") caption=caption or "")
request = SendMediaRequest(entity, media, reply_to_msg_id=reply_to) request = SendMediaRequest(entity, media, reply_to_msg_id=reply_to)
return self._get_response_message(request, self(request)) return self._get_response_message(request, await self(request))
def download_file_bytes(self, location): async def download_file_bytes(self, location):
if isinstance(location, Document): if isinstance(location, Document):
location = InputDocumentFileLocation(location.id, location.access_hash, location = InputDocumentFileLocation(location.id, location.access_hash,
location.version) location.version)
@@ -77,7 +77,7 @@ class MautrixTelegramClient(TelegramClient):
file = BytesIO() file = BytesIO()
self.download_file(location, file) await self.download_file(location, file)
data = file.getvalue() data = file.getvalue()
file.close() file.close()
+62 -48
View File
@@ -15,6 +15,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging import logging
import asyncio
import platform import platform
from telethon.tl.types import * from telethon.tl.types import *
@@ -28,6 +29,7 @@ config = None
class User: class User:
loop = None
log = logging.getLogger("mau.user") log = logging.getLogger("mau.user")
db = None db = None
az = None az = None
@@ -82,25 +84,28 @@ class User:
# endregion # endregion
# region Telegram connection management # region Telegram connection management
def start(self): async def start(self):
device = f"{platform.system()} {platform.release()}" device = f"{platform.system()} {platform.release()}"
sysversion = MautrixTelegramClient.__version__ sysversion = MautrixTelegramClient.__version__
self.client = MautrixTelegramClient(self.mxid, self.client = MautrixTelegramClient(self.mxid,
config["telegram.api_id"], config["telegram.api_id"],
config["telegram.api_hash"], config["telegram.api_hash"],
update_workers=2, loop=self.loop,
app_version=__version__, app_version=__version__,
system_version=sysversion, system_version=sysversion,
device_model=device) device_model=device)
self.connected = self.client.connect()
if self.logged_in:
self.post_login()
self.client.add_update_handler(self.update_catch) self.client.add_update_handler(self.update_catch)
self.connected = await self.client.connect()
if self.logged_in:
asyncio.ensure_future(self.post_login(), loop=self.loop)
return self return self
def post_login(self, info=None): async def post_login(self, info=None):
self.sync_dialogs() try:
self.update_info(info) await self.sync_dialogs()
await self.update_info(info)
except Exception:
self.log.exception("Failed to run post-login functions")
def stop(self): def stop(self):
self.client.disconnect() self.client.disconnect()
@@ -110,8 +115,8 @@ class User:
# endregion # endregion
# region Telegram actions that need custom methods # region Telegram actions that need custom methods
def update_info(self, info=None): async def update_info(self, info=None):
info = info or self.client.get_me() info = info or await self.client.get_me()
changed = False changed = False
if self.username != info.username: if self.username != info.username:
self.username = info.username self.username = info.username
@@ -122,7 +127,7 @@ class User:
if changed: if changed:
self.save() self.save()
def log_out(self): async def log_out(self):
self.connected = False self.connected = False
if self.tgid: if self.tgid:
try: try:
@@ -131,53 +136,56 @@ class User:
pass pass
self.tgid = None self.tgid = None
self.save() self.save()
return self.client.log_out() await self.client.log_out()
# TODO kick user from portals
def sync_dialogs(self): async def sync_dialogs(self):
dialogs = self.client.get_dialogs(limit=30) dialogs = await self.client.get_dialogs(limit=30)
creators = []
for dialog in dialogs: for dialog in dialogs:
entity = dialog.entity entity = dialog.entity
if (isinstance(entity, (TLUser, ChatForbidden, ChannelForbidden)) or ( if (isinstance(entity, (TLUser, ChatForbidden, ChannelForbidden)) or (
isinstance(entity, Chat) and (entity.deactivated or entity.left))): isinstance(entity, Chat) and (entity.deactivated or entity.left))):
continue continue
portal = po.Portal.get_by_entity(entity) portal = po.Portal.get_by_entity(entity)
portal.create_matrix_room(self, entity, invites=[self.mxid]) creators.append(portal.create_matrix_room(self, entity, invites=[self.mxid]))
await asyncio.gather(*creators, loop=self.loop)
# endregion # endregion
# region Telegram update handling # region Telegram update handling
def update_catch(self, update): async def update_catch(self, update):
try: try:
self.update(update) await self.update(update)
except Exception: except Exception:
self.log.exception("Failed to handle Telegram update") self.log.exception("Failed to handle Telegram update")
def update(self, update): async def update(self, update):
if isinstance(update, (UpdateShortChatMessage, UpdateShortMessage, UpdateNewMessage, if isinstance(update, (UpdateShortChatMessage, UpdateShortMessage, UpdateNewMessage,
UpdateNewChannelMessage)): UpdateNewChannelMessage)):
self.update_message(update) await self.update_message(update)
elif isinstance(update, (UpdateChatUserTyping, UpdateUserTyping)): elif isinstance(update, (UpdateChatUserTyping, UpdateUserTyping)):
self.update_typing(update) await self.update_typing(update)
elif isinstance(update, UpdateUserStatus): elif isinstance(update, UpdateUserStatus):
self.update_status(update) await self.update_status(update)
elif isinstance(update, (UpdateChatAdmins, UpdateChatParticipantAdmin)): elif isinstance(update, (UpdateChatAdmins, UpdateChatParticipantAdmin)):
self.update_admin(update) await self.update_admin(update)
elif isinstance(update, UpdateChatParticipants): elif isinstance(update, UpdateChatParticipants):
portal = po.Portal.get_by_tgid(update.participants.chat_id) portal = po.Portal.get_by_tgid(update.participants.chat_id)
if portal and portal.mxid: if portal and portal.mxid:
portal.update_telegram_participants(update.participants.participants) await portal.update_telegram_participants(update.participants.participants)
elif isinstance(update, UpdateChannelPinnedMessage): elif isinstance(update, UpdateChannelPinnedMessage):
portal = po.Portal.get_by_tgid(update.channel_id) portal = po.Portal.get_by_tgid(update.channel_id)
if portal and portal.mxid: if portal and portal.mxid:
portal.update_telegram_pin(self, update.id) await portal.update_telegram_pin(self, update.id)
elif isinstance(update, (UpdateUserName, UpdateUserPhoto)): elif isinstance(update, (UpdateUserName, UpdateUserPhoto)):
self.update_others_info(update) await self.update_others_info(update)
elif isinstance(update, UpdateReadHistoryOutbox): elif isinstance(update, UpdateReadHistoryOutbox):
self.update_read_receipt(update) await self.update_read_receipt(update)
else: else:
self.log.debug("Unhandled update: %s", update) self.log.debug("Unhandled update: %s", update)
def update_read_receipt(self, update): async def update_read_receipt(self, update):
if not isinstance(update.peer, PeerUser): if not isinstance(update.peer, PeerUser):
self.log.debug("Unexpected read receipt peer: %s", update.peer) self.log.debug("Unexpected read receipt peer: %s", update.peer)
return return
@@ -192,40 +200,42 @@ class User:
return return
puppet = pu.Puppet.get(update.peer.user_id) puppet = pu.Puppet.get(update.peer.user_id)
puppet.intent.mark_read(portal.mxid, message.mxid) await puppet.intent.mark_read(portal.mxid, message.mxid)
def update_admin(self, update): async def update_admin(self, update):
portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat")
if isinstance(update, UpdateChatAdmins): if isinstance(update, UpdateChatAdmins):
portal.set_telegram_admins_enabled(update.enabled) await portal.set_telegram_admins_enabled(update.enabled)
elif isinstance(update, UpdateChatParticipantAdmin): elif isinstance(update, UpdateChatParticipantAdmin):
puppet = pu.Puppet.get(update.user_id) puppet = pu.Puppet.get(update.user_id)
user = User.get_by_tgid(update.user_id) user = User.get_by_tgid(update.user_id)
portal.set_telegram_admin(puppet, user) await portal.set_telegram_admin(puppet, user)
def update_typing(self, update): async def update_typing(self, update):
if isinstance(update, UpdateUserTyping): if isinstance(update, UpdateUserTyping):
portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user")
else: else:
portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat")
sender = pu.Puppet.get(update.user_id) sender = pu.Puppet.get(update.user_id)
return portal.handle_telegram_typing(sender, update) await portal.handle_telegram_typing(sender, update)
def update_others_info(self, update): async def update_others_info(self, update):
puppet = pu.Puppet.get(update.user_id) puppet = pu.Puppet.get(update.user_id)
if isinstance(update, UpdateUserName): if isinstance(update, UpdateUserName):
if puppet.update_displayname(self, update): if await puppet.update_displayname(self, update):
puppet.save() puppet.save()
elif isinstance(update, UpdateUserPhoto): elif isinstance(update, UpdateUserPhoto):
if puppet.update_avatar(self, update.photo.photo_big): if await puppet.update_avatar(self, update.photo.photo_big):
puppet.save() puppet.save()
def update_status(self, update): async def update_status(self, update):
puppet = pu.Puppet.get(update.user_id) puppet = pu.Puppet.get(update.user_id)
if isinstance(update.status, UserStatusOnline): if isinstance(update.status, UserStatusOnline):
puppet.intent.set_presence("online") await puppet.intent.set_presence("online")
elif isinstance(update.status, UserStatusOffline): elif isinstance(update.status, UserStatusOffline):
puppet.intent.set_presence("offline") await puppet.intent.set_presence("offline")
else:
self.log.warning("Unexpected user status update: %s", update)
return return
def get_message_details(self, update): def get_message_details(self, update):
@@ -249,7 +259,7 @@ class User:
return update, None, None return update, None, None
return update, sender, portal return update, sender, portal
def update_message(self, update): async def update_message(self, update):
update, sender, portal = self.get_message_details(update) update, sender, portal = self.get_message_details(update)
if isinstance(update, MessageService): if isinstance(update, MessageService):
@@ -259,10 +269,10 @@ class User:
return return
self.log.debug("Handling action %s to %s by %d", update.action, portal.tgid_log, self.log.debug("Handling action %s to %s by %d", update.action, portal.tgid_log,
sender.id) sender.id)
portal.handle_telegram_action(self, sender, update.action) await portal.handle_telegram_action(self, sender, update.action)
else: else:
self.log.debug("Handling message %s to %s by %d", update, portal.tgid_log, sender.tgid) self.log.debug("Handling message %s to %s by %d", update, portal.tgid_log, sender.tgid)
portal.handle_telegram_message(self, sender, update) await portal.handle_telegram_message(self, sender, update)
# endregion # endregion
# region Class instance lookup # region Class instance lookup
@@ -276,13 +286,16 @@ class User:
user = DBUser.query.get(mxid) user = DBUser.query.get(mxid)
if user: if user:
return cls.from_db(user).start() user = cls.from_db(user)
asyncio.ensure_future(user.start(), loop=cls.loop)
return user
if create: if create:
user = cls(mxid) user = cls(mxid)
cls.db.add(user.to_db()) cls.db.add(user.to_db())
cls.db.commit() cls.db.commit()
return user.start() asyncio.ensure_future(user.start(), loop=cls.loop)
return user
return None return None
@@ -295,7 +308,9 @@ class User:
user = DBUser.query.filter(DBUser.tgid == tgid).one_or_none() user = DBUser.query.filter(DBUser.tgid == tgid).one_or_none()
if user: if user:
return cls.from_db(user).start() user = cls.from_db(user)
asyncio.ensure_future(user.start(), loop=cls.loop)
return user
return None return None
@@ -315,8 +330,7 @@ class User:
def init(context): def init(context):
global config global config
User.az, User.db, config = context User.az, User.db, config, User.loop = context
users = [User.from_db(user) for user in DBUser.query.all()] users = [User.from_db(user) for user in DBUser.query.all()]
for user in users: return [user.start() for user in users]
user.start()
+1 -2
View File
@@ -1,9 +1,8 @@
aiohttp aiohttp
-e git+git://github.com/Cadair/matrix-python-sdk#egg=matrix_client
ruamel.yaml ruamel.yaml
python-magic python-magic
SQLAlchemy SQLAlchemy
Telethon -e git+git://github.com/LonamiWebs/Telethon@asyncio#egg=Telethon
Markdown Markdown
Pillow Pillow
future-fstrings future-fstrings
+1 -2
View File
@@ -15,7 +15,6 @@ setuptools.setup(
packages=setuptools.find_packages(), packages=setuptools.find_packages(),
install_requires=[ install_requires=[
"Telethon>=0.17.0.0,<0.18",
"aiohttp>=2.3.10,<3", "aiohttp>=2.3.10,<3",
"SQLAlchemy>=1.2.2,<2", "SQLAlchemy>=1.2.2,<2",
"Markdown>=2.6.11,<3", "Markdown>=2.6.11,<3",
@@ -25,7 +24,7 @@ setuptools.setup(
"python-magic>=0.4.15,<0.5", "python-magic>=0.4.15,<0.5",
], ],
dependency_links=[ dependency_links=[
"https://github.com/Cadair/matrix-python-sdk/tarball/1fab9821d98d15769e44e66f714d00a32a48d692#egg=matrix_client" "https://github.com/LonamiWebs/Telethon/tarball/7da092894b306d720cc60c04daa2bfba58f81946#egg=Telethon"
], ],
classifiers=[ classifiers=[