Improve Matrix -> Telegram formatter. Fixes #34

This commit is contained in:
Tulir Asokan
2018-02-23 15:53:55 +02:00
parent 74a0a3b621
commit a26f2c2c36
+113 -67
View File
@@ -17,6 +17,7 @@
from html import unescape from html import unescape
from html.parser import HTMLParser from html.parser import HTMLParser
from collections import deque from collections import deque
import math
import re import re
import logging import logging
@@ -32,6 +33,10 @@ log = logging.getLogger("mau.fmt.mx")
class MatrixParser(HTMLParser): class MatrixParser(HTMLParser):
mention_regex = re.compile("https://matrix.to/#/(@.+:.+)") mention_regex = re.compile("https://matrix.to/#/(@.+:.+)")
room_regex = re.compile("https://matrix.to/#/(#.+:.+)") room_regex = re.compile("https://matrix.to/#/(#.+:.+)")
block_tags = ("br", "p", "pre", "blockquote",
"ol", "ul", "li",
"h1", "h2", "h3", "h4", "h5", "h6"
"div", "hr", "table")
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@@ -41,112 +46,151 @@ class MatrixParser(HTMLParser):
self._list_counter = 0 self._list_counter = 0
self._open_tags = deque() self._open_tags = deque()
self._open_tags_meta = deque() self._open_tags_meta = deque()
self._previous_ended_line = True self._line_is_new = True
self._list_entry_is_new = False
def _parse_url(self, url, args):
mention = self.mention_regex.match(url)
if mention:
mxid = mention.group(1)
user = pu.Puppet.get_by_mxid(mxid, create=False)
if not user:
user = u.User.get_by_mxid(mxid, create=False)
if not user:
return None, None
if user.username:
entity_type = MessageEntityMention
url = f"@{user.username}"
else:
entity_type = MessageEntityMentionName
args["user_id"] = user.tgid
return url, entity_type
room = self.room_regex.match(url)
if room:
username = po.Portal.get_username_from_mx_alias(room.group(1))
portal = po.Portal.find_by_username(username)
if portal and portal.username:
return f"@{portal.username}", MessageEntityMention
if url.startswith("mailto:"):
return MessageEntityEmail, url[len("mailto:"):]
if self.get_starttag_text() == url:
return MessageEntityUrl, url
else:
args["url"] = url
return MessageEntityTextUrl, None
def handle_starttag(self, tag, attrs): def handle_starttag(self, tag, attrs):
self._open_tags.appendleft(tag) self._open_tags.appendleft(tag)
self._open_tags_meta.appendleft(0) self._open_tags_meta.appendleft(0)
attrs = dict(attrs) attrs = dict(attrs)
entity_type = None entity_type = None
args = {} args = {}
if tag == "strong" or tag == "b": if tag in ("strong", "b"):
entity_type = MessageEntityBold entity_type = MessageEntityBold
elif tag == "em" or tag == "i": elif tag in ("em", "i"):
entity_type = MessageEntityItalic entity_type = MessageEntityItalic
elif tag == "code": elif tag == "code":
try: try:
pre = self._building_entities["pre"] pre = self._building_entities["pre"]
try: try:
# Pre tag and language found, add language to MessageEntityPre
pre.language = attrs["class"][len("language-"):] pre.language = attrs["class"][len("language-"):]
except KeyError: except KeyError:
# Pre tag found, but language not found, keep pre as-is
pass pass
except KeyError: except KeyError:
# No pre tag found, this is inline code
entity_type = MessageEntityCode entity_type = MessageEntityCode
elif tag == "pre": elif tag == "pre":
entity_type = MessageEntityPre entity_type = MessageEntityPre
args["language"] = "" args["language"] = ""
elif tag == "li":
self._list_entry_is_new = True
elif tag == "a": elif tag == "a":
try: try:
url = attrs["href"] url = attrs["href"]
except KeyError: except KeyError:
return return
mention = self.mention_regex.match(url) entity_type, url = self._parse_url(url, args)
room = self.room_regex.match(url)
if mention:
mxid = mention.group(1)
user = pu.Puppet.get_by_mxid(mxid, create=False)
if not user:
user = u.User.get_by_mxid(mxid, create=False)
if not user:
return
if user.username:
entity_type = MessageEntityMention
url = f"@{user.username}"
else:
entity_type = MessageEntityMentionName
args["user_id"] = user.tgid
elif room:
username = po.Portal.get_username_from_mx_alias(room.group(1))
portal = po.Portal.find_by_username(username)
if portal and portal.username:
url = f"@{portal.username}"
entity_type = MessageEntityMention
elif url.startswith("mailto:"):
url = url[len("mailto:"):]
entity_type = MessageEntityEmail
else:
if self.get_starttag_text() == url:
entity_type = MessageEntityUrl
else:
entity_type = MessageEntityTextUrl
args["url"] = url
url = None
self._open_tags_meta.popleft() self._open_tags_meta.popleft()
self._open_tags_meta.appendleft(url) self._open_tags_meta.appendleft(url)
if tag in self.block_tags:
self._newline()
if entity_type and tag not in self._building_entities: if entity_type and tag not in self._building_entities:
offset = len(self.text) offset = len(self.text)
self._building_entities[tag] = entity_type(offset=offset, length=0, **args) self._building_entities[tag] = entity_type(offset=offset, length=0, **args)
def _list_depth(self): @property
depth = 0 def _list_indent(self):
for tag in self._open_tags: indent = 0
if tag == "ol" or tag == "ul": first_skipped = False
depth += 1 for index, tag in enumerate(self._open_tags):
return depth if not first_skipped and tag in ("ol", "ul"):
# The first list level isn't indented, so skip it.
first_skipped = True
continue
if tag == "ol":
n = self._open_tags_meta[index]
extra_length_for_long_index = (int(math.log(n, 10)) - 1) * 3
indent += 4 + extra_length_for_long_index
elif tag == "ul":
indent += 3
return indent
def _newline(self, allow_multi=False):
if self._line_is_new or allow_multi:
return
self.text += "\n"
self._line_is_new = True
for entity in self._building_entities.values():
entity.length += 1
def handle_data(self, text): def handle_data(self, text):
text = unescape(text) text = unescape(text)
previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else "" previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else ""
list_format_offset = 0 extra_offset = 0
if previous_tag == "a": if previous_tag == "a":
url = self._open_tags_meta[0] url = self._open_tags_meta[0]
if url: if url:
text = url text = url
elif len(self._open_tags) > 1 and self._previous_ended_line and previous_tag == "li": list_entry_handled_once = False
list_type = self._open_tags[1] # In order to maintain order of things like blockquotes in lists or lists in blockquotes,
indent = (self._list_depth() - 1) * 4 * " " # we can't just have ifs/elses and we need to actually loop through the open tags in order.
text = text.strip("\n") for index, tag in enumerate(self._open_tags):
if len(text) == 0: if tag == "blockquote" and self._line_is_new:
return text = f"> {text}"
elif list_type == "ul": extra_offset += 2
text = f"{indent}* {text}" elif tag == "li" and not list_entry_handled_once:
list_format_offset = len(indent) + 2 list_type_index = index + 1
elif list_type == "ol": list_type = self._open_tags[list_type_index]
n = self._open_tags_meta[1] indent = self._list_indent * " " if self._line_is_new else ""
n += 1 if list_type == "ol":
self._open_tags_meta[1] = n n = self._open_tags_meta[list_type_index]
text = f"{indent}{n}. {text}" if self._list_entry_is_new:
list_format_offset = len(indent) + 3 n += 1
self._open_tags_meta[list_type_index] = n
prefix = f"{n}. "
else:
prefix = int(math.log(n, 10)) * 3 * " " + 4 * " "
else:
prefix = "* " if self._list_entry_is_new else 3 * " "
if not self._list_entry_is_new and not self._line_is_new:
prefix = ""
extra_offset = len(indent) + len(prefix)
text = indent + prefix + text
self._list_entry_is_new = False
list_entry_handled_once = True
for tag, entity in self._building_entities.items(): for tag, entity in self._building_entities.items():
entity.length += len(text.strip("\n")) entity.length += len(text) - extra_offset
entity.offset += list_format_offset entity.offset += extra_offset
if text.endswith("\n"):
self._previous_ended_line = True
else:
self._previous_ended_line = False
self._line_is_new = False
self.text += text self.text += text
def handle_endtag(self, tag): def handle_endtag(self, tag):
@@ -155,8 +199,10 @@ class MatrixParser(HTMLParser):
self._open_tags_meta.popleft() self._open_tags_meta.popleft()
except IndexError: except IndexError:
pass pass
if (tag == "ul" or tag == "ol") and self.text.endswith("\n"):
self.text = self.text[:-1] if tag in self.block_tags:
self._newline(allow_multi=tag == "br")
entity = self._building_entities.pop(tag, None) entity = self._building_entities.pop(tag, None)
if entity: if entity:
self.entities.append(entity) self.entities.append(entity)
@@ -165,8 +211,8 @@ class MatrixParser(HTMLParser):
def matrix_to_telegram(html): def matrix_to_telegram(html):
try: try:
parser = MatrixParser() parser = MatrixParser()
parser.feed(add_surrogates(html)) parser.feed(add_surrogates(html.replace("\n", "")))
return remove_surrogates(parser.text), parser.entities return remove_surrogates(parser.text.strip()), parser.entities
except Exception: except Exception:
log.exception("Failed to convert Matrix format:\nhtml=%s", html) log.exception("Failed to convert Matrix format:\nhtml=%s", html)