Start migrating to mautrix-python

This commit is contained in:
Tulir Asokan
2019-07-13 00:23:46 +03:00
parent e0d3c940f8
commit 8d4a9dc231
14 changed files with 263 additions and 797 deletions
+33 -172
View File
@@ -13,157 +13,33 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Any, Dict, Optional, Tuple
from ruamel.yaml import YAML
from typing import Any, Dict, List, NamedTuple
from ruamel.yaml.comments import CommentedMap
import random
import string
import os
yaml: YAML = YAML()
yaml.indent(4)
from mautrix.types import UserID
from mautrix.client import Client
from mautrix.bridge.config import BaseBridgeConfig, ConfigUpdateHelper
Permissions = NamedTuple("Permissions", relaybot=bool, user=bool, puppeting=bool,
matrix_puppeting=bool, admin=bool, level=str)
class DictWithRecursion:
_data: CommentedMap
def __init__(self, data: Optional[CommentedMap] = None) -> None:
self._data = data or CommentedMap()
@staticmethod
def _parse_key(key: str) -> Tuple[str, Optional[str]]:
if '.' not in key:
return key, None
key, next_key = key.split('.', 1)
if len(key) > 0 and key[0] == "[":
end_index = next_key.index("]")
key = key[1:] + "." + next_key[:end_index]
next_key = next_key[end_index + 2:] if len(next_key) > end_index + 1 else None
return key, next_key
def _recursive_get(self, data: CommentedMap, key: str, default_value: Any) -> Any:
key, next_key = self._parse_key(key)
if next_key is not None:
next_data = data.get(key, CommentedMap())
return self._recursive_get(next_data, next_key, default_value)
return data.get(key, default_value)
def get(self, key: str, default_value: Any, allow_recursion: bool = True) -> Any:
if allow_recursion and '.' in key:
return self._recursive_get(self._data, key, default_value)
return self._data.get(key, default_value)
def __getitem__(self, key: str) -> Any:
return self.get(key, None)
def __contains__(self, key: str) -> bool:
return self[key] is not None
def _recursive_set(self, data: CommentedMap, key: str, value: Any) -> None:
key, next_key = self._parse_key(key)
if next_key is not None:
if key not in data:
data[key] = CommentedMap()
next_data = data.get(key, CommentedMap())
return self._recursive_set(next_data, next_key, value)
data[key] = value
def set(self, key: str, value: Any, allow_recursion: bool = True) -> None:
if allow_recursion and '.' in key:
self._recursive_set(self._data, key, value)
return
self._data[key] = value
def __setitem__(self, key: str, value: Any) -> None:
self.set(key, value)
def _recursive_del(self, data: CommentedMap, key: str) -> None:
key, next_key = self._parse_key(key)
if next_key is not None:
if key not in data:
return
next_data = data[key]
return self._recursive_del(next_data, next_key)
try:
del data[key]
del data.ca.items[key]
except KeyError:
pass
def delete(self, key: str, allow_recursion: bool = True) -> None:
if allow_recursion and '.' in key:
self._recursive_del(self._data, key)
return
try:
del self._data[key]
del self._data.ca.items[key]
except KeyError:
pass
def __delitem__(self, key: str) -> None:
self.delete(key)
class Config(DictWithRecursion):
path: str
registration_path: str
base_path: str
_registration: Optional[Dict[str, Any]]
_overrides: Dict[str, Any]
def __init__(self, path: str, registration_path: str, base_path: str,
overrides: Dict[str, Any] = None) -> None:
super().__init__()
self.path = path
self.registration_path = registration_path
self.base_path = base_path
self._registration = None
self._overrides = overrides or {}
class Config(BaseBridgeConfig):
def __getitem__(self, key: str) -> Any:
try:
return self._overrides[f"MAUTRIX_TELEGRAM_{key.replace('.', '_').upper()}"]
return os.environ[f"MAUTRIX_TELEGRAM_{key.replace('.', '_').upper()}"]
except KeyError:
return super().__getitem__(key)
def load(self) -> None:
with open(self.path, 'r') as stream:
self._data = yaml.load(stream)
def load_base(self) -> Optional[DictWithRecursion]:
try:
with open(self.base_path, 'r') as stream:
return DictWithRecursion(yaml.load(stream))
except OSError:
pass
return None
def save(self) -> None:
with open(self.path, 'w') as stream:
yaml.dump(self._data, stream)
if self._registration and self.registration_path:
with open(self.registration_path, 'w') as stream:
yaml.dump(self._registration, stream)
@staticmethod
def _new_token() -> str:
return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(64))
def update(self) -> None:
base = self.load_base()
if not base:
return
def copy(from_path, to_path=None) -> None:
if from_path in self:
base[to_path or from_path] = self[from_path]
def copy_dict(from_path, to_path=None, override_existing_map=True) -> None:
if from_path in self:
to_path = to_path or from_path
if override_existing_map or to_path not in base:
base[to_path] = CommentedMap()
for key, value in self[from_path].items():
base[to_path][key] = value
def do_update(self, helper: ConfigUpdateHelper) -> None:
copy, copy_dict, base = helper
copy("homeserver.address")
copy("homeserver.domain")
@@ -309,58 +185,43 @@ class Config(DictWithRecursion):
else:
copy("logging")
self._data = base._data
self.save()
def _get_permissions(self, key: str) -> Tuple[bool, bool, bool, bool, bool, bool]:
def _get_permissions(self, key: str) -> Permissions:
level = self["bridge.permissions"].get(key, "")
admin = level == "admin"
matrix_puppeting = level == "full" or admin
puppeting = level == "puppeting" or matrix_puppeting
user = level == "user" or puppeting
relaybot = level == "relaybot" or user
return relaybot, user, puppeting, matrix_puppeting, admin, level
return Permissions(relaybot, user, puppeting, matrix_puppeting, admin, level)
def get_permissions(self, mxid: str) -> Tuple[bool, bool, bool, bool, bool, bool]:
permissions = self["bridge.permissions"] or {}
def get_permissions(self, mxid: UserID) -> Permissions:
permissions = self["bridge.permissions"]
if mxid in permissions:
return self._get_permissions(mxid)
homeserver = mxid[mxid.index(":") + 1:]
_, homeserver = Client.parse_user_id(mxid)
if homeserver in permissions:
return self._get_permissions(homeserver)
return self._get_permissions("*")
def generate_registration(self) -> None:
@property
def namespaces(self) -> Dict[str, List[Dict[str, Any]]]:
homeserver = self["homeserver.domain"]
username_format = self.get("bridge.username_template",
"telegram_{userid}").format(userid=".+")
alias_format = self.get("bridge.alias_template",
"telegram_{groupname}").format(groupname=".+")
username_format = self["bridge.username_template"].format(userid=".+")
alias_format = self["bridge.alias_template"].format(groupname=".+")
group_id = ({"group_id": self["appservice.community_id"]}
if self["appservice.community_id"] else {})
self.set("appservice.as_token", self._new_token())
self.set("appservice.hs_token", self._new_token())
self._registration = {
"id": self["appservice.id"] or "telegram",
"as_token": self["appservice.as_token"],
"hs_token": self["appservice.hs_token"],
"namespaces": {
"users": [{
"exclusive": True,
"regex": f"@{username_format}:{homeserver}"
}],
"aliases": [{
"exclusive": True,
"regex": f"#{alias_format}:{homeserver}"
}]
},
"url": self["appservice.address"],
"sender_localpart": self["appservice.bot_username"],
"rate_limited": False
return {
"users": [{
"exclusive": True,
"regex": f"@{username_format}:{homeserver}",
**group_id,
}],
"aliases": [{
"exclusive": True,
"regex": f"#{alias_format}:{homeserver}",
}]
}
if self["appservice.community_id"]:
self._registration["namespaces"]["users"][0]["group_id"] = self[
"appservice.community_id"]