Add support for new reaction stuff
* Custom emojis in reactions * Premium users can react 3 times to a single message * Reactions to recent messages are now polled on read receipt
This commit is contained in:
+190
-93
@@ -16,6 +16,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, List, Union, cast
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from html import escape as escape_html
|
||||
from sqlite3 import IntegrityError
|
||||
@@ -52,6 +53,7 @@ from telethon.tl.functions.messages import (
|
||||
EditChatTitleRequest,
|
||||
ExportChatInviteRequest,
|
||||
GetMessageReactionsListRequest,
|
||||
GetMessagesReactionsRequest,
|
||||
MigrateChatRequest,
|
||||
SendReactionRequest,
|
||||
SetTypingRequest,
|
||||
@@ -102,6 +104,8 @@ from telethon.tl.types import (
|
||||
Photo,
|
||||
PhotoEmpty,
|
||||
ReactionCount,
|
||||
ReactionCustomEmoji,
|
||||
ReactionEmoji,
|
||||
SendMessageCancelAction,
|
||||
SendMessageTypingAction,
|
||||
SponsoredMessage,
|
||||
@@ -113,11 +117,13 @@ from telethon.tl.types import (
|
||||
TypeMessage,
|
||||
TypeMessageAction,
|
||||
TypePeer,
|
||||
TypeReaction,
|
||||
TypeUser,
|
||||
TypeUserFull,
|
||||
TypeUserProfilePhoto,
|
||||
UpdateChannelUserTyping,
|
||||
UpdateChatUserTyping,
|
||||
UpdateMessageReactions,
|
||||
UpdateNewMessage,
|
||||
UpdateUserTyping,
|
||||
User,
|
||||
@@ -181,6 +187,7 @@ from .db import (
|
||||
Message as DBMessage,
|
||||
Portal as DBPortal,
|
||||
Reaction as DBReaction,
|
||||
TelegramFile as DBTelegramFile,
|
||||
)
|
||||
from .tgclient import MautrixTelegramClient
|
||||
from .types import TelegramID
|
||||
@@ -204,6 +211,8 @@ UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTy
|
||||
TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty]
|
||||
MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]]
|
||||
|
||||
REACTION_POLL_MIN_INTERVAL = 20
|
||||
|
||||
|
||||
class BridgingError(Exception):
|
||||
pass
|
||||
@@ -261,6 +270,8 @@ class Portal(DBPortal, BasePortal):
|
||||
_sponsored_seen: dict[UserID, bool]
|
||||
_new_messages_after_sponsored: bool
|
||||
|
||||
_prev_reaction_poll: dict[UserID, float]
|
||||
|
||||
_msg_conv: putil.TelegramMessageConverter
|
||||
|
||||
def __init__(
|
||||
@@ -331,6 +342,8 @@ class Portal(DBPortal, BasePortal):
|
||||
self._new_messages_after_sponsored = True
|
||||
self._bridging_blocked_at_runtime = False
|
||||
|
||||
self._prev_reaction_poll = defaultdict(lambda: 0.0)
|
||||
|
||||
self._msg_conv = putil.TelegramMessageConverter(self)
|
||||
|
||||
# region Properties
|
||||
@@ -1440,8 +1453,13 @@ class Portal(DBPortal, BasePortal):
|
||||
await user.client.send_read_acknowledge(
|
||||
self.peer, max_id=message.tgid, clear_mentions=True, clear_reactions=True
|
||||
)
|
||||
if self.peer_type == "channel" and not self.megagroup:
|
||||
asyncio.create_task(self._try_handle_read_for_sponsored_msg(user, event_id, timestamp))
|
||||
if self.peer_type == "channel":
|
||||
if not self.megagroup:
|
||||
asyncio.create_task(
|
||||
self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)
|
||||
)
|
||||
else:
|
||||
asyncio.create_task(self._poll_telegram_reactions(user))
|
||||
|
||||
async def _preproc_kick_ban(
|
||||
self, user: u.User | p.Puppet, source: u.User
|
||||
@@ -2212,12 +2230,31 @@ class Portal(DBPortal, BasePortal):
|
||||
await real_deleter.client.delete_messages(self.peer, [message.tgid])
|
||||
|
||||
async def handle_matrix_reaction(
|
||||
self, user: u.User, target_event_id: EventID, reaction: str, reaction_event_id: EventID
|
||||
self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID
|
||||
) -> None:
|
||||
emoji_id = emoji
|
||||
reaction = ReactionEmoji(emoticon=variation_selector.remove(emoji))
|
||||
if emoji.startswith("mxc://"):
|
||||
db_reaction = await DBTelegramFile.find_by_mxc(ContentURI(emoji))
|
||||
if not db_reaction or not db_reaction.id.isdecimal():
|
||||
self.log.debug(f"Dropping unknown reaction {emoji} by {user.mxid}")
|
||||
if not self.has_bot:
|
||||
await self.main_intent.redact(
|
||||
self.mxid, reaction_event_id, reason="Unrecognized custom emoji"
|
||||
)
|
||||
await self._send_bridge_error(
|
||||
user,
|
||||
Exception("Unrecognized custom emoji"),
|
||||
reaction_event_id,
|
||||
EventType.REACTION,
|
||||
)
|
||||
return
|
||||
reaction = ReactionCustomEmoji(document_id=int(db_reaction.id))
|
||||
emoji_id = db_reaction.id
|
||||
try:
|
||||
async with self.reaction_lock(target_event_id):
|
||||
await self._handle_matrix_reaction(
|
||||
user, target_event_id, reaction, reaction_event_id
|
||||
user, target_event_id, emoji_id, reaction, reaction_event_id
|
||||
)
|
||||
except IgnoredMessageError as e:
|
||||
self.log.debug(str(e))
|
||||
@@ -2244,7 +2281,12 @@ class Portal(DBPortal, BasePortal):
|
||||
asyncio.create_task(self._send_message_status(reaction_event_id, err=None))
|
||||
|
||||
async def _handle_matrix_reaction(
|
||||
self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID
|
||||
self,
|
||||
user: u.User,
|
||||
target_event_id: EventID,
|
||||
emoji_id: str,
|
||||
reaction: TypeReaction,
|
||||
reaction_event_id: EventID,
|
||||
) -> None:
|
||||
tg_space = self.tgid if self.peer_type == "channel" else user.tgid
|
||||
msg = await DBMessage.get_by_mxid(target_event_id, self.mxid, tg_space)
|
||||
@@ -2259,23 +2301,34 @@ class Portal(DBPortal, BasePortal):
|
||||
elif msg.edit_index != 0:
|
||||
raise IgnoredMessageError(f"Ignoring Matrix reaction to edit event {target_event_id}")
|
||||
|
||||
emoji = variation_selector.remove(emoji)
|
||||
existing_react = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid)
|
||||
await user.client(SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=emoji))
|
||||
if existing_react:
|
||||
puppet = await user.get_puppet()
|
||||
await puppet.intent_for(self).redact(existing_react.mx_room, existing_react.mxid)
|
||||
existing_react.mxid = reaction_event_id
|
||||
existing_react.reaction = emoji
|
||||
await existing_react.save()
|
||||
else:
|
||||
await DBReaction(
|
||||
mxid=reaction_event_id,
|
||||
mx_room=self.mxid,
|
||||
msg_mxid=msg.mxid,
|
||||
tg_sender=user.tgid,
|
||||
reaction=emoji,
|
||||
).save()
|
||||
existing_reacts = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid)
|
||||
new_tg_reactions: list[TypeReaction] = []
|
||||
reactions_to_remove: list[DBReaction] = []
|
||||
max_reactions = 3 if user.is_premium else 1
|
||||
max_reactions -= 1 # Leave one reaction of space for the new reaction
|
||||
for db_reaction in existing_reacts:
|
||||
if db_reaction.reaction == emoji_id:
|
||||
raise IgnoredMessageError("Ignoring duplicate Matrix reaction")
|
||||
if len(new_tg_reactions) < max_reactions:
|
||||
new_tg_reactions.append(db_reaction.telegram)
|
||||
else:
|
||||
reactions_to_remove.append(db_reaction)
|
||||
new_tg_reactions.append(reaction)
|
||||
|
||||
await user.client(
|
||||
SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=new_tg_reactions)
|
||||
)
|
||||
puppet = await user.get_puppet()
|
||||
for db_reaction in reactions_to_remove:
|
||||
await db_reaction.delete()
|
||||
await puppet.intent_for(self).redact(db_reaction.mx_room, db_reaction.mxid)
|
||||
await DBReaction(
|
||||
mxid=reaction_event_id,
|
||||
mx_room=self.mxid,
|
||||
msg_mxid=msg.mxid,
|
||||
tg_sender=user.tgid,
|
||||
reaction=emoji_id,
|
||||
).save()
|
||||
|
||||
async def _update_telegram_power_level(
|
||||
self, sender: u.User, user_id: TelegramID, level: int
|
||||
@@ -2681,35 +2734,46 @@ class Portal(DBPortal, BasePortal):
|
||||
return False
|
||||
|
||||
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
|
||||
if len(counts) == 1:
|
||||
item = counts[0]
|
||||
reactions = []
|
||||
for item in counts:
|
||||
if item.count == 2:
|
||||
return [
|
||||
reactions += [
|
||||
MessagePeerReaction(reaction=item.reaction, peer_id=PeerUser(self.tgid)),
|
||||
MessagePeerReaction(
|
||||
reaction=item.reaction, peer_id=PeerUser(self.tg_receiver)
|
||||
),
|
||||
]
|
||||
elif item.count == 1:
|
||||
return [
|
||||
reactions.append(
|
||||
MessagePeerReaction(
|
||||
reaction=item.reaction,
|
||||
peer_id=PeerUser(self.tg_receiver if item.chosen else self.tgid),
|
||||
),
|
||||
]
|
||||
elif len(counts) == 2:
|
||||
item1, item2 = counts
|
||||
return [
|
||||
MessagePeerReaction(
|
||||
reaction=item1.reaction,
|
||||
peer_id=PeerUser(self.tg_receiver if item1.chosen else self.tgid),
|
||||
),
|
||||
MessagePeerReaction(
|
||||
reaction=item2.reaction,
|
||||
peer_id=PeerUser(self.tg_receiver if item2.chosen else self.tgid),
|
||||
),
|
||||
]
|
||||
return []
|
||||
peer_id=PeerUser(self.tg_receiver if item.chosen_order else self.tgid),
|
||||
)
|
||||
)
|
||||
return reactions
|
||||
|
||||
async def _poll_telegram_reactions(self, source: au.AbstractUser) -> None:
|
||||
now = time.monotonic()
|
||||
if self._prev_reaction_poll[source.mxid] + REACTION_POLL_MIN_INTERVAL > now:
|
||||
self.log.trace(
|
||||
f"Not polling reactions through {source.mxid}, "
|
||||
f"last poll was less than {REACTION_POLL_MIN_INTERVAL} seconds ago"
|
||||
)
|
||||
return
|
||||
self._prev_reaction_poll[source.mxid] = now
|
||||
self.log.debug(f"Polling reactions for recent messages through {source.mxid}")
|
||||
messages = await DBMessage.find_recent(self.mxid, source.tgid)
|
||||
message_ids = [message.tgid for message in messages]
|
||||
updates = await source.client(GetMessagesReactionsRequest(peer=self.peer, id=message_ids))
|
||||
for user in updates.users:
|
||||
user: User
|
||||
puppet = await p.Puppet.get_by_tgid(TelegramID(user.id))
|
||||
await puppet.update_info(source, user)
|
||||
for upd in updates.updates:
|
||||
if isinstance(upd, UpdateMessageReactions):
|
||||
await self.handle_telegram_reactions(source, TelegramID(upd.msg_id), upd.reactions)
|
||||
else:
|
||||
self.log.warning(f"Unexpected update type {type(upd)} in get reactions response")
|
||||
|
||||
async def try_handle_telegram_reactions(
|
||||
self,
|
||||
@@ -2732,9 +2796,15 @@ class Portal(DBPortal, BasePortal):
|
||||
dbm: DBMessage | None = None,
|
||||
timestamp: datetime | None = None,
|
||||
) -> None:
|
||||
if self.peer_type == "channel" and not self.megagroup:
|
||||
total_count = sum(item.count for item in data.results)
|
||||
recent_reactions = data.recent_reactions or []
|
||||
if total_count > 0 and not recent_reactions and not data.can_see_list:
|
||||
# We don't know who reacted in a channel, so we can't bridge it properly either
|
||||
return
|
||||
if self.peer_type == "channel" and not self.megagroup:
|
||||
# This should never happen with the previous if
|
||||
self.log.warning(f"Can see reaction list in channel ({data!s})")
|
||||
# return
|
||||
|
||||
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
||||
if dbm is None:
|
||||
@@ -2742,69 +2812,109 @@ class Portal(DBPortal, BasePortal):
|
||||
if dbm is None:
|
||||
return
|
||||
|
||||
total_count = sum(item.count for item in data.results)
|
||||
recent_reactions = data.recent_reactions or []
|
||||
if not recent_reactions and total_count > 0:
|
||||
if not recent_reactions or len(recent_reactions) < total_count:
|
||||
if self.peer_type == "user":
|
||||
recent_reactions = self._split_dm_reaction_counts(data.results)
|
||||
elif source.is_bot:
|
||||
# Can't fetch exact reaction senders as a bot
|
||||
return
|
||||
else:
|
||||
# TODO this doesn't work for some reason
|
||||
return
|
||||
# resp = await source.client(
|
||||
# GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=20)
|
||||
# )
|
||||
# recent_reactions = resp.reactions
|
||||
# TODO should calls to this be limited?
|
||||
resp = await source.client(
|
||||
GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=100)
|
||||
)
|
||||
recent_reactions = resp.reactions
|
||||
|
||||
async with self.reaction_lock(dbm.mxid):
|
||||
await self._handle_telegram_reactions_locked(
|
||||
dbm, recent_reactions, total_count, timestamp=timestamp
|
||||
source, dbm, recent_reactions, total_count, timestamp=timestamp
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _reactions_filter(lst: list[TypeReaction], existing: DBReaction) -> bool:
|
||||
if not lst:
|
||||
return False
|
||||
for reaction in lst:
|
||||
if isinstance(reaction, ReactionCustomEmoji) and existing.reaction == str(
|
||||
reaction.document_id
|
||||
):
|
||||
lst.remove(reaction)
|
||||
return True
|
||||
elif isinstance(reaction, ReactionEmoji) and existing.reaction == reaction.emoticon:
|
||||
lst.remove(reaction)
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
async def _get_reaction_limit(sender: TelegramID) -> int:
|
||||
puppet = await p.Puppet.get_by_tgid(sender, create=False)
|
||||
if puppet and puppet.is_premium:
|
||||
return 3
|
||||
return 1
|
||||
|
||||
async def _handle_telegram_reactions_locked(
|
||||
self,
|
||||
source: au.AbstractUser,
|
||||
msg: DBMessage,
|
||||
reaction_list: list[MessagePeerReaction],
|
||||
total_count: int,
|
||||
timestamp: datetime | None = None,
|
||||
) -> None:
|
||||
reactions = {
|
||||
p.Puppet.get_id_from_peer(reaction.peer_id): reaction.reaction
|
||||
for reaction in reaction_list
|
||||
if isinstance(reaction.peer_id, (PeerUser, PeerChannel))
|
||||
}
|
||||
is_full = len(reactions) == total_count
|
||||
reactions: dict[TelegramID, list[TypeReaction]] = {}
|
||||
custom_emoji_ids: list[int] = []
|
||||
for reaction in reaction_list:
|
||||
if isinstance(reaction.peer_id, (PeerUser, PeerChannel)) and isinstance(
|
||||
reaction.reaction, (ReactionEmoji, ReactionCustomEmoji)
|
||||
):
|
||||
reactions.setdefault(p.Puppet.get_id_from_peer(reaction.peer_id), []).append(
|
||||
reaction.reaction
|
||||
)
|
||||
if isinstance(reaction.reaction, ReactionCustomEmoji):
|
||||
custom_emoji_ids.append(reaction.reaction.document_id)
|
||||
is_full = len(reaction_list) == total_count
|
||||
custom_emojis = await util.transfer_custom_emojis_to_matrix(source, custom_emoji_ids)
|
||||
|
||||
existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room)
|
||||
|
||||
removed: list[DBReaction] = []
|
||||
changed: list[tuple[DBReaction, str]] = []
|
||||
for existing_reaction in existing_reactions:
|
||||
new_reaction = reactions.get(existing_reaction.tg_sender)
|
||||
if new_reaction is None:
|
||||
if is_full:
|
||||
sender_id = existing_reaction.tg_sender
|
||||
new_reactions = reactions.get(sender_id)
|
||||
if self._reactions_filter(new_reactions, existing_reaction):
|
||||
if new_reactions is not None and len(new_reactions) == 0:
|
||||
reactions.pop(sender_id)
|
||||
else:
|
||||
if is_full or (
|
||||
new_reactions is not None
|
||||
and len(new_reactions) == await self._get_reaction_limit(sender_id)
|
||||
):
|
||||
removed.append(existing_reaction)
|
||||
# else: assume the reaction is still there, too much effort to fetch it
|
||||
elif new_reaction == existing_reaction.reaction:
|
||||
reactions.pop(existing_reaction.tg_sender)
|
||||
else:
|
||||
changed.append((existing_reaction, new_reaction))
|
||||
|
||||
for sender, new_emoji in reactions.items():
|
||||
self.log.debug(f"Bridging reaction {new_emoji} by {sender} to {msg.tgid}")
|
||||
puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
|
||||
mxid = await puppet.intent_for(self).react(
|
||||
msg.mx_room, msg.mxid, variation_selector.add(new_emoji), timestamp=timestamp
|
||||
)
|
||||
await DBReaction(
|
||||
mxid=mxid,
|
||||
mx_room=msg.mx_room,
|
||||
msg_mxid=msg.mxid,
|
||||
tg_sender=sender,
|
||||
reaction=new_emoji,
|
||||
).save()
|
||||
new_reaction: TypeReaction
|
||||
for sender, new_reactions in reactions.items():
|
||||
for new_reaction in new_reactions:
|
||||
if isinstance(new_reaction, ReactionEmoji):
|
||||
emoji_id = new_reaction.emoticon
|
||||
matrix_reaction = variation_selector.add(new_reaction.emoticon)
|
||||
elif isinstance(new_reaction, ReactionCustomEmoji):
|
||||
emoji_id = str(new_reaction.document_id)
|
||||
matrix_reaction = custom_emojis[new_reaction.document_id].mxc
|
||||
else:
|
||||
self.log.warning("Unknown reaction type %s", type(new_reaction))
|
||||
continue
|
||||
self.log.debug(f"Bridging reaction {emoji_id} by {sender} to {msg.tgid}")
|
||||
puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
|
||||
mxid = await puppet.intent_for(self).react(
|
||||
msg.mx_room, msg.mxid, matrix_reaction, timestamp=timestamp
|
||||
)
|
||||
await DBReaction(
|
||||
mxid=mxid,
|
||||
mx_room=msg.mx_room,
|
||||
msg_mxid=msg.mxid,
|
||||
tg_sender=sender,
|
||||
reaction=emoji_id,
|
||||
).save()
|
||||
for removed_reaction in removed:
|
||||
self.log.debug(
|
||||
f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} "
|
||||
@@ -2813,19 +2923,6 @@ class Portal(DBPortal, BasePortal):
|
||||
puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender)
|
||||
await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid)
|
||||
await removed_reaction.delete()
|
||||
for changed_reaction, new_emoji in changed:
|
||||
self.log.debug(
|
||||
f"Updating reaction {changed_reaction.reaction} -> {new_emoji} "
|
||||
f"by {changed_reaction.tg_sender} to {msg.tgid}"
|
||||
)
|
||||
puppet = await p.Puppet.get_by_tgid(changed_reaction.tg_sender)
|
||||
intent = puppet.intent_for(self)
|
||||
await intent.redact(changed_reaction.mx_room, changed_reaction.mxid)
|
||||
changed_reaction.mxid = await intent.react(
|
||||
msg.mx_room, msg.mxid, variation_selector.add(new_emoji), timestamp=timestamp
|
||||
)
|
||||
changed_reaction.reaction = new_emoji
|
||||
await changed_reaction.save()
|
||||
|
||||
async def handle_telegram_message(
|
||||
self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
|
||||
|
||||
Reference in New Issue
Block a user