Initial asyncio version

This commit is contained in:
Tulir Asokan
2018-02-09 23:17:03 +02:00
parent 1e6ff3c28f
commit 55dc1ff3c7
12 changed files with 616 additions and 494 deletions
+16 -10
View File
@@ -47,8 +47,8 @@ class AppService:
self.log = (logging.getLogger(log) if isinstance(log, str)
else log or logging.getLogger("mautrix_appservice"))
self.query_user = query_user or (lambda user: None)
self.query_alias = query_alias or (lambda alias: None)
self.query_user = query_user or self.default_query_handler
self.query_alias = query_alias or self.default_query_handler
self.event_handlers = []
@@ -60,6 +60,9 @@ class AppService:
self.matrix_event_handler(self.update_state_store)
async def default_query_handler(self, param):
return None
@property
def http_session(self):
if self._http_session is None:
@@ -80,10 +83,10 @@ class AppService:
def run(self, host="127.0.0.1", port=8080):
self._http_session = aiohttp.ClientSession(loop=self.loop)
self._intent = HTTPAPI(base_url=self.server, domain=self.domain, bot_mxid=self.bot_mxid,
token=self.as_token, log=self.log,
state_store=self.state_store).bot_intent()
token=self.as_token, log=self.log, state_store=self.state_store,
client_session=self._http_session).bot_intent()
yield partial(aiohttp.web.run_app, self.app, host=host, port=port)
yield self.loop.create_server(self.app.make_handler(), host, port)
self._intent = None
self._http_session.close()
@@ -107,7 +110,7 @@ class AppService:
user_id = request.match_info["userId"]
try:
response = self.query_user(user_id)
response = await self.query_user(user_id)
except Exception:
self.log.exception("Exception in user query handler")
return web.Response(status=500)
@@ -123,7 +126,7 @@ class AppService:
alias = request.match_info["alias"]
try:
response = self.query_alias(alias)
response = await self.query_alias(alias)
except Exception:
self.log.exception("Exception in alias query handler")
return web.Response(status=500)
@@ -154,7 +157,7 @@ class AppService:
return web.json_response({})
def update_state_store(self, event):
async def update_state_store(self, event):
event_type = event["type"]
if event_type == "m.room.power_levels":
self.state_store.set_power_levels(event["room_id"], event["content"])
@@ -163,12 +166,15 @@ class AppService:
event["content"]["membership"])
def handle_matrix_event(self, event):
for handler in self.event_handlers:
async def try_handle(handler):
try:
handler(event)
await handler(event)
except Exception:
self.log.exception("Exception in Matrix event handler")
for handler in self.event_handlers:
asyncio.ensure_future(try_handle(handler))
def matrix_event_handler(self, func):
self.event_handlers.append(func)
return func
+90 -87
View File
@@ -19,14 +19,15 @@ import json
import magic
import urllib.request
from matrix_client.api import MatrixHttpApi
from matrix_client.errors import MatrixRequestError
from .temp_async_api import AsyncHTTPAPI
class HTTPAPI(MatrixHttpApi):
class HTTPAPI(AsyncHTTPAPI):
def __init__(self, base_url, domain=None, bot_mxid=None, token=None, identity=None, log=None,
state_store=None):
super().__init__(base_url, token, identity)
state_store=None, client_session=None):
super().__init__(base_url, client_session, token, identity)
self.domain = domain
self.bot_mxid = bot_mxid
self.intent_log = log.getChild("intent")
@@ -53,7 +54,8 @@ class HTTPAPI(MatrixHttpApi):
api_path="/_matrix/client/r0"):
if not query_params:
query_params = {}
query_params["user_id"] = self.identity
if self.identity:
query_params["user_id"] = self.identity
log_content = content if not isinstance(content, bytes) else f"<{len(content)} bytes>"
self.log.debug("%s %s %s", method, path, log_content)
return super()._send(method, path, content, query_params, headers or {}, api_path=api_path)
@@ -104,6 +106,7 @@ class ChildHTTPAPI(HTTPAPI):
self.log = parent.log
self.domain = parent.domain
self.parent = parent
self.client_session = parent.client_session
@property
def txn_id(self):
@@ -127,6 +130,7 @@ def matrix_error_code(err):
except Exception:
return err.content
def matrix_error_data(err):
try:
data = json.loads(err.content)
@@ -135,8 +139,6 @@ def matrix_error_data(err):
return err.content
class IntentAPI:
mxid_regex = re.compile("@(.+):(.+)")
@@ -162,51 +164,51 @@ class IntentAPI:
# region User actions
def get_joined_rooms(self):
self.ensure_registered()
response = self.client._send("GET", "/joined_rooms")
async def get_joined_rooms(self):
await self.ensure_registered()
response = await self.client._send("GET", "/joined_rooms")
return response["joined_rooms"]
def set_display_name(self, name):
self.ensure_registered()
return self.client.set_display_name(self.mxid, name)
async def set_display_name(self, name):
await self.ensure_registered()
return await self.client.set_display_name(self.mxid, name)
def set_presence(self, status="online"):
self.ensure_registered()
return self.client.set_presence(status)
async def set_presence(self, status="online"):
await self.ensure_registered()
return await self.client.set_presence(status)
def set_avatar(self, url):
self.ensure_registered()
return self.client.set_avatar_url(self.mxid, url)
async def set_avatar(self, url):
await self.ensure_registered()
return await self.client.set_avatar_url(self.mxid, url)
def upload_file(self, data, mime_type=None):
self.ensure_registered()
async def upload_file(self, data, mime_type=None):
await self.ensure_registered()
mime_type = mime_type or magic.from_buffer(data, mime=True)
return self.client.media_upload(data, mime_type)
return await self.client.media_upload(data, mime_type)
def download_file(self, url):
self.ensure_registered()
async def download_file(self, url):
await self.ensure_registered()
url = self.client.get_download_url(url)
response = urllib.request.urlopen(url)
return response.read()
async with self.client.client_session.get(url) as response:
return await response.read()
# endregion
# region Room actions
def create_room(self, alias=None, is_public=False, name=None, topic=None, is_direct=False,
async def create_room(self, alias=None, is_public=False, name=None, topic=None, is_direct=False,
invitees=(), initial_state=None):
self.ensure_registered()
return self.client.create_room(alias, is_public, name, topic, is_direct, invitees,
await self.ensure_registered()
return await self.client.create_room(alias, is_public, name, topic, is_direct, invitees,
initial_state or {})
def invite(self, room_id, user_id, check_cache=False):
self.ensure_joined(room_id)
async def invite(self, room_id, user_id, check_cache=False):
await self.ensure_joined(room_id)
try:
ok_states = {"invite", "join"}
do_invite = (not check_cache
or self.state_store.get_membership(room_id, user_id) not in ok_states)
if do_invite:
response = self.client.invite_user(room_id, user_id)
response = await self.client.invite_user(room_id, user_id)
self.state_store.invited(room_id, user_id)
return response
except MatrixRequestError as e:
@@ -224,38 +226,38 @@ class IntentAPI:
content["info"] = info
return self.send_state_event(room_id, "m.room.avatar", content)
def add_room_alias(self, room_id, alias):
self.ensure_registered()
self.client.set_room_alias(room_id, f"#{alias}:{self.client.domain}")
async def add_room_alias(self, room_id, alias):
await self.ensure_registered()
return await self.client.set_room_alias(room_id, f"#{alias}:{self.client.domain}")
def remove_room_alias(self, alias):
self.ensure_registered()
self.client.remove_room_alias(f"#{alias}:{self.client.domain}")
async def remove_room_alias(self, alias):
await self.ensure_registered()
return await self.client.remove_room_alias(f"#{alias}:{self.client.domain}")
def set_room_name(self, room_id, name):
self.ensure_joined(room_id)
async def set_room_name(self, room_id, name):
await self.ensure_joined(room_id)
self._ensure_has_power_level_for(room_id, "m.room.name")
return self.client.set_room_name(room_id, name)
return await self.client.set_room_name(room_id, name)
def get_power_levels(self, room_id, ignore_cache=False):
self.ensure_joined(room_id)
async def get_power_levels(self, room_id, ignore_cache=False):
await self.ensure_joined(room_id)
if not ignore_cache:
try:
return self.state_store.get_power_levels(room_id)
except KeyError:
pass
levels = self.client.get_power_levels(room_id)
levels = await self.client.get_power_levels(room_id)
self.state_store.set_power_levels(room_id, levels)
return levels
def set_power_levels(self, room_id, content):
response = self.send_state_event(room_id, "m.room.power_levels", content)
async def set_power_levels(self, room_id, content):
response = await self.send_state_event(room_id, "m.room.power_levels", content)
self.state_store.set_power_levels(room_id, content)
return response
def get_pinned_messages(self, room_id):
self.ensure_joined(room_id)
response = self.client._send("GET", f"/rooms/{room_id}/state/m.room.pinned_events")
async def get_pinned_messages(self, room_id):
await self.ensure_joined(room_id)
response = await self.client._send("GET", f"/rooms/{room_id}/state/m.room.pinned_events")
return response["content"]["pinned"]
def set_pinned_messages(self, room_id, events):
@@ -263,29 +265,29 @@ class IntentAPI:
"pinned": events
})
def pin_message(self, room_id, event_id):
events = self.get_pinned_messages(room_id)
async def pin_message(self, room_id, event_id):
events = await self.get_pinned_messages(room_id)
if event_id not in events:
events.append(event_id)
self.set_pinned_messages(room_id, events)
await self.set_pinned_messages(room_id, events)
def unpin_message(self, room_id, event_id):
events = self.get_pinned_messages(room_id)
async def unpin_message(self, room_id, event_id):
events = await self.get_pinned_messages(room_id)
if event_id in events:
events.remove(event_id)
self.set_pinned_messages(room_id, events)
await self.set_pinned_messages(room_id, events)
def get_event(self, room_id, event_id):
self.ensure_joined(room_id)
return self.client._send("GET", f"/rooms/{room_id}/event/{event_id}")
async def get_event(self, room_id, event_id):
await self.ensure_joined(room_id)
return await self.client._send("GET", f"/rooms/{room_id}/event/{event_id}")
def set_typing(self, room_id, is_typing=True, timeout=5000):
self.ensure_joined(room_id)
return self.client.set_typing(room_id, is_typing, timeout)
async def set_typing(self, room_id, is_typing=True, timeout=5000):
await self.ensure_joined(room_id)
return await self.client.set_typing(room_id, is_typing, timeout)
def mark_read(self, room_id, event_id):
self.ensure_joined(room_id)
return self.client._send("POST", f"/rooms/{room_id}/receipt/m.read/{event_id}", content={})
async def mark_read(self, room_id, event_id):
await self.ensure_joined(room_id)
return await self.client._send("POST", f"/rooms/{room_id}/receipt/m.read/{event_id}", content={})
def send_notice(self, room_id, text, html=None):
return self.send_text(room_id, text, html, "m.notice")
@@ -323,24 +325,24 @@ class IntentAPI:
def send_message(self, room_id, body):
return self.send_event(room_id, "m.room.message", body)
def error_and_leave(self, room_id, text, html=None):
self.ensure_joined(room_id)
self.send_notice(room_id, text, html=html)
self.leave_room(room_id)
async def error_and_leave(self, room_id, text, html=None):
await self.ensure_joined(room_id)
await self.send_notice(room_id, text, html=html)
await self.leave_room(room_id)
def kick(self, room_id, user_id, message):
self.ensure_joined(room_id)
return self.client.kick_user(room_id, user_id, message)
async def kick(self, room_id, user_id, message):
await self.ensure_joined(room_id)
return await self.client.kick_user(room_id, user_id, message)
def send_event(self, room_id, event_type, body, txn_id=None):
self.ensure_joined(room_id)
async def send_event(self, room_id, event_type, body, txn_id=None):
await self.ensure_joined(room_id)
self._ensure_has_power_level_for(room_id, event_type)
return self.client.send_message_event(room_id, event_type, body, txn_id)
return await self.client.send_message_event(room_id, event_type, body, txn_id)
def send_state_event(self, room_id, event_type, body, state_key=""):
self.ensure_joined(room_id)
async def send_state_event(self, room_id, event_type, body, state_key=""):
await self.ensure_joined(room_id)
self._ensure_has_power_level_for(room_id, event_type)
return self.client.send_state_event(room_id, event_type, body, state_key)
return await self.client.send_state_event(room_id, event_type, body, state_key)
def join_room(self, room_id):
return self.ensure_joined(room_id, ignore_cache=True)
@@ -352,24 +354,25 @@ class IntentAPI:
def get_room_memberships(self, room_id):
return self.client.get_room_members(room_id)
def get_room_members(self, room_id, allowed_memberships=("join",)):
memberships = self.get_room_memberships(room_id)
async def get_room_members(self, room_id, allowed_memberships=("join",)):
memberships = await self.get_room_memberships(room_id)
return [membership["state_key"] for membership in memberships["chunk"] if
membership["content"]["membership"] in allowed_memberships]
def get_room_state(self, room_id):
self.ensure_joined(room_id)
return self.client.get_room_state(room_id)
async def get_room_state(self, room_id):
await self.ensure_joined(room_id)
state = await self.client.get_room_state(room_id)
return state
# endregion
# region Ensure functions
def ensure_joined(self, room_id, ignore_cache=False):
async def ensure_joined(self, room_id, ignore_cache=False):
if not ignore_cache and self.state_store.is_joined(room_id, self.mxid):
return
self.ensure_registered()
await self.ensure_registered()
try:
self.client.join_room(room_id)
await self.client.join_room(room_id)
self.state_store.joined(room_id, self.mxid)
except MatrixRequestError as e:
if matrix_error_code(e) != "M_FORBIDDEN" or not self.bot:
@@ -381,11 +384,11 @@ class IntentAPI:
except MatrixRequestError as e2:
raise IntentError(f"Failed to join room {room_id} as {self.mxid}", e2)
def ensure_registered(self):
async def ensure_registered(self):
if self.state_store.is_registered(self.mxid):
return
try:
self.client.register({"username": self.localpart})
await self.client.register({"username": self.localpart})
except MatrixRequestError as e:
if matrix_error_code(e) != "M_USER_IN_USE":
self.log.exception(f"Failed to register {self.mxid}!")
+92
View File
@@ -0,0 +1,92 @@
import json
from asyncio import sleep
from urllib.parse import quote
from matrix_client.api import MatrixHttpApi
from matrix_client.errors import MatrixError, MatrixRequestError
class AsyncHTTPAPI(MatrixHttpApi):
"""
Contains all raw matrix HTTP client-server API calls using asyncio and coroutines.
Examples
--------
.. code-block: python
async def main():
async with aiohttp.ClientSession() as session:
mapi = AsyncHTTPAPI("http://matrix.org", session)
resp = await mapi.get_room_id("#matrix:matrix.org")
print(resp)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""
def __init__(self, base_url, client_session, token=None, identity=None):
self.base_url = base_url
self.token = token
self.identity = identity
self.txn_id = 0
self.validate_cert = True
self.client_session = client_session
async def _send(self,
method,
path,
content=None,
query_params={},
headers={},
api_path="/_matrix/client/r0"):
if not content:
content = {}
method = method.upper()
if method not in ["GET", "PUT", "DELETE", "POST"]:
raise MatrixError("Unsupported HTTP method: %s" % method)
if "Content-Type" not in headers:
headers["Content-Type"] = "application/json"
if self.token:
query_params["access_token"] = self.token
endpoint = self.base_url + api_path + path
if headers["Content-Type"] == "application/json":
content = json.dumps(content)
while True:
request = self.client_session.request(
method,
endpoint,
params=query_params,
data=content,
headers=headers)
async with request as response:
if response.status < 200 or response.status >= 300:
raise MatrixRequestError(
code=response.status, content=await response.text())
if response.status == 429:
await sleep(response.json()['retry_after_ms'] / 1000)
else:
return await response.json()
async def get_display_name(self, user_id):
content = await self._send("GET", "/profile/%s/displayname" % user_id)
return content.get('displayname', None)
async def get_avatar_url(self, user_id):
content = await self._send("GET", "/profile/%s/avatar_url" % user_id)
return content.get('avatar_url', None)
async def get_room_id(self, room_alias):
"""Get room id from its alias
Args:
room_alias(str): The room alias name.
Returns:
Wanted room's id.
"""
content = await self._send(
"GET",
"/directory/room/{}".format(quote(room_alias)),
api_path="/_matrix/client/r0")
return content.get("room_id", None)