Use new wrapper for creating background tasks

This commit is contained in:
Tulir Asokan
2023-02-11 22:41:15 +02:00
parent be6f6bbfac
commit f7b694c9e4
8 changed files with 27 additions and 24 deletions
+4 -3
View File
@@ -75,6 +75,7 @@ from telethon.tl.types import (
from mautrix.appservice import AppService from mautrix.appservice import AppService
from mautrix.errors import MatrixError from mautrix.errors import MatrixError
from mautrix.types import PresenceState, UserID from mautrix.types import PresenceState, UserID
from mautrix.util import background_task
from mautrix.util.logging import TraceLogger from mautrix.util.logging import TraceLogger
from mautrix.util.opt_prometheus import Counter, Histogram from mautrix.util.opt_prometheus import Counter, Histogram
@@ -241,7 +242,7 @@ class AbstractUser(ABC):
async def _telethon_update_error_callback(self, err: Exception) -> None: async def _telethon_update_error_callback(self, err: Exception) -> None:
if isinstance(err, (UnauthorizedError, AuthKeyError)): if isinstance(err, (UnauthorizedError, AuthKeyError)):
asyncio.create_task(self.on_signed_out(err)) background_task.create(self.on_signed_out(err))
return return
if self.config["telegram.exit_on_update_error"]: if self.config["telegram.exit_on_update_error"]:
self.log.critical(f"Stopping due to update handling error {type(err).__name__}") self.log.critical(f"Stopping due to update handling error {type(err).__name__}")
@@ -325,7 +326,7 @@ class AbstractUser(ABC):
async def _update(self, update: TypeUpdate) -> None: async def _update(self, update: TypeUpdate) -> None:
if isinstance(update, UpdateShort): if isinstance(update, UpdateShort):
update = update.update update = update.update
asyncio.create_task(self._handle_entity_updates(getattr(update, "_entities", {}))) background_task.create(self._handle_entity_updates(getattr(update, "_entities", {})))
if isinstance( if isinstance(
update, update,
( (
@@ -625,7 +626,7 @@ class AbstractUser(ABC):
await portal.delete_telegram_user(self.tgid, sender=None) await portal.delete_telegram_user(self.tgid, sender=None)
elif chan := getattr(update, "mau_channel", None): elif chan := getattr(update, "mau_channel", None):
if not portal.mxid: if not portal.mxid:
asyncio.create_task(self._delayed_create_channel(chan)) background_task.create(self._delayed_create_channel(chan))
else: else:
self.log.debug("Updating channel info with data fetched by Telethon") self.log.debug("Updating channel info with data fetched by Telethon")
await portal.update_info(self, chan) await portal.update_info(self, chan)
+3 -2
View File
@@ -21,6 +21,7 @@ import asyncio
from telethon.tl.types import ChannelForbidden, ChatForbidden from telethon.tl.types import ChannelForbidden, ChatForbidden
from mautrix.types import EventID, RoomID from mautrix.types import EventID, RoomID
from mautrix.util import background_task
from ... import portal as po from ... import portal as po
from ...types import TelegramID from ...types import TelegramID
@@ -184,7 +185,7 @@ async def confirm_bridge(evt: CommandEvent) -> EventID | None:
if not ok: if not ok:
return None return None
elif coro: elif coro:
asyncio.create_task(coro) background_task.create(coro)
await evt.reply("Cleaning up previous portal room...") await evt.reply("Cleaning up previous portal room...")
elif portal.mxid: elif portal.mxid:
evt.sender.command_status = None evt.sender.command_status = None
@@ -251,7 +252,7 @@ async def _locked_confirm_bridge(
await portal.save() await portal.save()
await portal.update_bridge_info() await portal.update_bridge_info()
asyncio.create_task(portal.update_matrix_room(user, entity, levels=levels)) background_task.create(portal.update_matrix_room(user, entity, levels=levels))
await warn_missing_power(levels, evt) await warn_missing_power(levels, evt)
+2 -4
View File
@@ -22,7 +22,6 @@ import io
from telethon.errors import ( from telethon.errors import (
AccessTokenExpiredError, AccessTokenExpiredError,
AccessTokenInvalidError, AccessTokenInvalidError,
FirstNameInvalidError,
FloodWaitError, FloodWaitError,
PasswordHashInvalidError, PasswordHashInvalidError,
PhoneCodeExpiredError, PhoneCodeExpiredError,
@@ -31,14 +30,12 @@ from telethon.errors import (
PhoneNumberBannedError, PhoneNumberBannedError,
PhoneNumberFloodError, PhoneNumberFloodError,
PhoneNumberInvalidError, PhoneNumberInvalidError,
PhoneNumberOccupiedError,
PhoneNumberUnoccupiedError, PhoneNumberUnoccupiedError,
SessionPasswordNeededError, SessionPasswordNeededError,
) )
from telethon.tl.types import User from telethon.tl.types import User
from mautrix.client import Client from mautrix.client import Client
from mautrix.errors import MForbidden
from mautrix.types import ( from mautrix.types import (
EventID, EventID,
ImageInfo, ImageInfo,
@@ -47,6 +44,7 @@ from mautrix.types import (
TextMessageEventContent, TextMessageEventContent,
UserID, UserID,
) )
from mautrix.util import background_task
from mautrix.util.format_duration import format_duration as fmt_duration from mautrix.util.format_duration import format_duration as fmt_duration
from ... import user as u from ... import user as u
@@ -368,7 +366,7 @@ async def _finish_sign_in(evt: CommandEvent, user: User, login_as: u.User = None
f"[{existing_user.displayname}] (https://matrix.to/#/{existing_user.mxid})" f"[{existing_user.displayname}] (https://matrix.to/#/{existing_user.mxid})"
" was logged out from the account." " was logged out from the account."
) )
asyncio.create_task(login_as.post_login(user, first_login=True)) background_task.create(login_as.post_login(user, first_login=True))
evt.sender.command_status = None evt.sender.command_status = None
name = f"@{user.username}" if user.username else f"+{user.phone}" name = f"@{user.username}" if user.username else f"+{user.phone}"
if login_as != evt.sender: if login_as != evt.sender:
+10 -10
View File
@@ -171,7 +171,7 @@ from mautrix.types import (
UserID, UserID,
VideoInfo, VideoInfo,
) )
from mautrix.util import magic, variation_selector from mautrix.util import background_task, magic, variation_selector
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from mautrix.util.simple_lock import SimpleLock from mautrix.util.simple_lock import SimpleLock
from mautrix.util.simple_template import SimpleTemplate from mautrix.util.simple_template import SimpleTemplate
@@ -727,7 +727,7 @@ class Portal(DBPortal, BasePortal):
self.log.exception(f"Failed to get entity through {user.tgid} for update") self.log.exception(f"Failed to get entity through {user.tgid} for update")
return self.mxid return self.mxid
update = self.update_matrix_room(user, entity) update = self.update_matrix_room(user, entity)
asyncio.create_task(update) background_task.create(update)
await self.invite_to_matrix(invites or []) await self.invite_to_matrix(invites or [])
return self.mxid return self.mxid
async with self._room_create_lock: async with self._room_create_lock:
@@ -1525,11 +1525,11 @@ class Portal(DBPortal, BasePortal):
) )
if self.peer_type == "channel": if self.peer_type == "channel":
if not self.megagroup: if not self.megagroup:
asyncio.create_task( background_task.create(
self._try_handle_read_for_sponsored_msg(user, event_id, timestamp) self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)
) )
else: else:
asyncio.create_task(self._poll_telegram_reactions(user)) background_task.create(self._poll_telegram_reactions(user))
async def _preproc_kick_ban( async def _preproc_kick_ban(
self, user: u.User | p.Puppet, source: u.User self, user: u.User | p.Puppet, source: u.User
@@ -1964,7 +1964,7 @@ class Portal(DBPortal, BasePortal):
message_type=msgtype, message_type=msgtype,
) )
await self._send_delivery_receipt(event_id) await self._send_delivery_receipt(event_id)
asyncio.create_task(self._send_message_status(event_id, err=None)) background_task.create(self._send_message_status(event_id, err=None))
if response.ttl_period: if response.ttl_period:
await self._mark_disappearing( await self._mark_disappearing(
event_id=event_id, event_id=event_id,
@@ -2257,7 +2257,7 @@ class Portal(DBPortal, BasePortal):
EventType.ROOM_REDACTION, EventType.ROOM_REDACTION,
) )
await self._send_delivery_receipt(redaction_event_id) await self._send_delivery_receipt(redaction_event_id)
asyncio.create_task(self._send_message_status(redaction_event_id, err=None)) background_task.create(self._send_message_status(redaction_event_id, err=None))
async def _handle_matrix_reaction_deletion( async def _handle_matrix_reaction_deletion(
self, deleter: u.User, event_id: EventID, tg_space: TelegramID self, deleter: u.User, event_id: EventID, tg_space: TelegramID
@@ -2370,7 +2370,7 @@ class Portal(DBPortal, BasePortal):
EventType.REACTION, EventType.REACTION,
) )
await self._send_delivery_receipt(reaction_event_id) await self._send_delivery_receipt(reaction_event_id)
asyncio.create_task(self._send_message_status(reaction_event_id, err=None)) background_task.create(self._send_message_status(reaction_event_id, err=None))
async def _handle_matrix_reaction( async def _handle_matrix_reaction(
self, self,
@@ -2586,7 +2586,7 @@ class Portal(DBPortal, BasePortal):
return return
if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None: if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None:
asyncio.create_task( background_task.create(
self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions) self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions)
) )
sender_id = sender.tgid if sender else self.tgid sender_id = sender.tgid if sender else self.tgid
@@ -3419,7 +3419,7 @@ class Portal(DBPortal, BasePortal):
await intent.redact(self.mxid, event_id) await intent.redact(self.mxid, event_id)
return return
if isinstance(evt, Message) and evt.reactions: if isinstance(evt, Message) and evt.reactions:
asyncio.create_task( background_task.create(
self.try_handle_telegram_reactions( self.try_handle_telegram_reactions(
source, dbm.tgid, evt.reactions, dbm=dbm, timestamp=evt.date source, dbm.tgid, evt.reactions, dbm=dbm, timestamp=evt.date
) )
@@ -3440,7 +3440,7 @@ class Portal(DBPortal, BasePortal):
dm = DisappearingMessage(self.mxid, event_id, seconds, expiration_ts=expires_at * 1000) dm = DisappearingMessage(self.mxid, event_id, seconds, expiration_ts=expires_at * 1000)
await dm.insert() await dm.insert()
if expires_at: if expires_at:
asyncio.create_task(self._disappear_event(dm)) background_task.create(self._disappear_event(dm))
async def _create_room_on_action( async def _create_room_on_action(
self, source: au.AbstractUser, action: TypeMessageAction self, source: au.AbstractUser, action: TypeMessageAction
+2 -1
View File
@@ -59,6 +59,7 @@ from mautrix.bridge import BaseUser, async_getter_lock
from mautrix.client import Client from mautrix.client import Client
from mautrix.errors import MatrixRequestError, MNotFound from mautrix.errors import MatrixRequestError, MNotFound
from mautrix.types import PushActionType, PushRuleKind, PushRuleScope, RoomID, RoomTagInfo, UserID from mautrix.types import PushActionType, PushRuleKind, PushRuleScope, RoomID, RoomTagInfo, UserID
from mautrix.util import background_task
from mautrix.util.bridge_state import BridgeState, BridgeStateEvent from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
from mautrix.util.opt_prometheus import Gauge from mautrix.util.opt_prometheus import Gauge
@@ -259,7 +260,7 @@ class User(DBUser, AbstractUser, BaseUser):
else: else:
# Authenticated, run post login # Authenticated, run post login
self.log.debug(f"Ensuring post_login() for {self.name}") self.log.debug(f"Ensuring post_login() for {self.name}")
asyncio.create_task(self.post_login()) background_task.create(self.post_login())
return self return self
# Not authenticated, delete data if necessary # Not authenticated, delete data if necessary
if delete_unless_authenticated and self.client is not None: if delete_unless_authenticated and self.client is not None:
+2 -1
View File
@@ -38,6 +38,7 @@ from telethon.errors import (
) )
from mautrix.bridge import InvalidAccessToken, OnlyLoginSelf from mautrix.bridge import InvalidAccessToken, OnlyLoginSelf
from mautrix.util import background_task
from mautrix.util.format_duration import format_duration from mautrix.util.format_duration import format_duration
from ...commands.telegram.auth import enter_password from ...commands.telegram.auth import enter_password
@@ -199,7 +200,7 @@ class AuthAPI(abc.ABC):
existing_user = await User.get_by_tgid(user_info.id) existing_user = await User.get_by_tgid(user_info.id)
if existing_user and existing_user != user: if existing_user and existing_user != user:
await existing_user.log_out() await existing_user.log_out()
asyncio.create_task(user.post_login(user_info, first_login=True)) background_task.create(user.post_login(user_info, first_login=True))
if user.command_status and user.command_status["action"] == "Login": if user.command_status and user.command_status["action"] == "Login":
user.command_status = None user.command_status = None
@@ -32,6 +32,7 @@ from mautrix.appservice import AppService
from mautrix.client import Client from mautrix.client import Client
from mautrix.errors import IntentError, MatrixRequestError from mautrix.errors import IntentError, MatrixRequestError
from mautrix.types import UserID from mautrix.types import UserID
from mautrix.util import background_task
from ...commands.portal.util import get_initial_state, user_has_power_level from ...commands.portal.util import get_initial_state, user_has_power_level
from ...portal import Portal from ...portal import Portal
@@ -227,7 +228,7 @@ class ProvisioningAPI(AuthAPI):
portal.photo_id = "" portal.photo_id = ""
await portal.save() await portal.save()
asyncio.create_task(portal.update_matrix_room(user, entity, levels=levels)) background_task.create(portal.update_matrix_room(user, entity, levels=levels))
return web.Response(status=202, body="{}") return web.Response(status=202, body="{}")
@@ -348,7 +349,7 @@ class ProvisioningAPI(AuthAPI):
self.log.exception("Failed to disconnect chat") self.log.exception("Failed to disconnect chat")
return self.get_error_response(500, "exception", "Failed to disconnect chat") return self.get_error_response(500, "exception", "Failed to disconnect chat")
else: else:
asyncio.create_task(coro) background_task.create(coro)
return web.json_response({}, status=200 if sync else 202) return web.json_response({}, status=200 if sync else 202)
async def get_user_info(self, request: web.Request) -> web.Response: async def get_user_info(self, request: web.Request) -> web.Response:
+1 -1
View File
@@ -3,7 +3,7 @@ python-magic>=0.4,<0.5
commonmark>=0.8,<0.10 commonmark>=0.8,<0.10
aiohttp>=3,<4 aiohttp>=3,<4
yarl>=1,<2 yarl>=1,<2
mautrix>=0.19.3,<0.20 mautrix>=0.19.4,<0.20
#telethon>=1.25.4,<1.27 #telethon>=1.25.4,<1.27
tulir-telethon==1.28.0a1 tulir-telethon==1.28.0a1
asyncpg>=0.20,<0.28 asyncpg>=0.20,<0.28