Make help message dynamic based on permissions

This commit is contained in:
Tulir Asokan
2018-07-10 14:17:03 +03:00
parent ecf0e262df
commit 7a4d29e1e4
8 changed files with 236 additions and 168 deletions
+90 -36
View File
@@ -14,45 +14,38 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, Dict, Callable, Optional
from collections import namedtuple
import markdown
import logging
from telethon.errors import FloodWaitError
from ..util import format_duration
from ..context import Context
from .. import user as u
command_handlers = {}
command_handlers = {} # type: Dict[str, CommandHandler]
HelpSection = namedtuple("HelpSection", "name order description")
def command_handler(needs_auth=True, management_only=False, needs_puppeting=True,
needs_admin=False, name=None):
def decorator(func):
async def wrapper(evt):
if management_only and not evt.is_management:
return await evt.reply(f"`{evt.command}` is a restricted command:"
"you may only run it in management rooms.")
elif needs_auth and not await evt.sender.is_logged_in():
return await evt.reply("This command requires you to be logged in.")
elif needs_puppeting and not evt.sender.puppet_whitelisted:
return await evt.reply("This command requires puppeting privileges.")
elif needs_admin and not evt.sender.is_admin:
return await evt.reply("This command requires administrator privileges.")
return await func(evt)
command_handlers[name or func.__name__.replace("_", "-")] = wrapper
return wrapper
return decorator
SECTION_GENERAL = HelpSection("General", 0, "")
SECTION_AUTH = HelpSection("Authentication", 10, "")
SECTION_CREATING_PORTALS = HelpSection("Creating portals", 20, "")
SECTION_PORTAL_MANAGEMENT = HelpSection("Portal management", 30, "")
SECTION_MISC = HelpSection("Miscellaneous", 40, "")
SECTION_ADMIN = HelpSection("Administration", 50, "")
class CommandEvent:
def __init__(self, handler, room, sender, command, args, is_management, is_portal):
self.az = handler.az
self.log = handler.log
self.loop = handler.loop
self.tgbot = handler.tgbot
self.config = handler.config
self.command_prefix = handler.command_prefix
def __init__(self, processor: "CommandProcessor", room: str, sender: u.User, command: str,
args: List[str], is_management: bool, is_portal: bool):
self.az = processor.az
self.log = processor.log
self.loop = processor.loop
self.tgbot = processor.tgbot
self.config = processor.config
self.command_prefix = processor.command_prefix
self.room_id = room
self.sender = sender
self.command = command
@@ -60,7 +53,7 @@ class CommandEvent:
self.is_management = is_management
self.is_portal = is_portal
def reply(self, message, allow_html=False, render_markdown=True):
def reply(self, message: str, allow_html: bool = False, render_markdown: bool = True):
message = message.replace("$cmdprefix+sp ",
"" if self.is_management else f"{self.command_prefix} ")
message = message.replace("$cmdprefix", self.command_prefix)
@@ -73,17 +66,78 @@ class CommandEvent:
class CommandHandler:
def __init__(self, handler: Callable[[CommandEvent], None],
needs_auth: bool, needs_puppeting: bool, needs_admin: bool, management_only: bool,
name: str, help_text: str, help_args: str, help_section: HelpSection):
self._handler = handler
self.needs_auth = needs_auth
self.needs_puppeting = needs_puppeting
self.needs_admin = needs_admin
self.management_only = management_only
self.name = name
self._help_text = help_text
self._help_args = help_args
self.help_section = help_section
async def get_permission_error(self, evt: CommandEvent) -> Optional[str]:
if self.management_only and not evt.is_management:
return (f"`{evt.command}` is a restricted command: "
"you may only run it in management rooms.")
elif self.needs_puppeting and not evt.sender.puppet_whitelisted:
return "This command requires puppeting privileges."
elif self.needs_admin and not evt.sender.is_admin:
return "This command requires administrator privileges."
elif self.needs_auth and not await evt.sender.is_logged_in():
return "This command requires you to be logged in."
return None
def has_permission(self, is_management: bool, puppet_whitelisted: bool, is_admin: bool,
is_logged_in: bool) -> bool:
return ((not self.management_only or is_management) and
(not self.needs_puppeting or puppet_whitelisted) and
(not self.needs_admin or is_admin) and
(not self.needs_auth or is_logged_in))
async def __call__(self, evt: CommandEvent):
error = await self.get_permission_error(evt)
if error is not None:
return await evt.reply(error)
return await self._handler(evt)
@property
def has_help(self) -> bool:
return bool(self.help_section) and bool(self._help_text)
@property
def help(self) -> str:
return f"**{self.name}** {self._help_args} - {self._help_text}"
def command_handler(_func: Optional[Callable[[CommandEvent], None]] = None, *, needs_auth=True,
needs_puppeting=True, needs_admin=False, management_only=False,
name=None, help_text="", help_args="", help_section=None):
input_name = name
def decorator(func: Callable[[CommandEvent], None]):
name = input_name or func.__name__.replace("_", "-")
handler = CommandHandler(func, needs_auth, needs_puppeting, needs_admin, management_only,
name, help_text, help_args, help_section)
command_handlers[handler.name] = handler
return handler
return decorator if _func is None else decorator(_func)
class CommandProcessor:
log = logging.getLogger("mau.commands")
def __init__(self, context):
def __init__(self, context: Context):
self.az, self.db, self.config, self.loop, self.tgbot = context
self.command_prefix = self.config["bridge.command_prefix"]
# region Utility functions for handling commands
async def handle(self, room, sender, command, args, is_management, is_portal):
evt = CommandEvent(self, room, sender, command, args,
is_management, is_portal)
async def handle(self, room: str, sender: u.User, command: str, args: List[str],
is_management: bool, is_portal: bool):
evt = CommandEvent(self, room, sender, command, args, is_management, is_portal)
orig_command = command
command = command.lower()
try:
@@ -100,7 +154,7 @@ class CommandHandler:
except FloodWaitError as e:
return await evt.reply(f"Flood error: Please wait {format_duration(e.seconds)}")
except Exception:
self.log.exception("Fatal error handling command "
self.log.exception("Unhandled error while handling command "
f"{evt.command} {' '.join(args)} from {sender.mxid}")
return await evt.reply("Fatal error while handling command. "
return await evt.reply("Unhandled error while handling command. "
"Check logs for more details.")