Moved converters to other file, added methods for video and gif, which supports resize.
XXX: videos don't want to be played by riot, i don't know why...
This commit is contained in:
@@ -14,7 +14,7 @@
|
|||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
from typing import Optional, Tuple, Union, Dict
|
from typing import Optional, Tuple, Union, Dict
|
||||||
from io import BytesIO, StringIO
|
from io import BytesIO
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
import asyncio
|
import asyncio
|
||||||
@@ -30,6 +30,7 @@ from telethon.errors import (AuthBytesInvalidError, AuthKeyInvalidError, Locatio
|
|||||||
|
|
||||||
from mautrix.appservice import IntentAPI
|
from mautrix.appservice import IntentAPI
|
||||||
|
|
||||||
|
|
||||||
from ..tgclient import MautrixTelegramClient
|
from ..tgclient import MautrixTelegramClient
|
||||||
from ..db import TelegramFile as DBTelegramFile
|
from ..db import TelegramFile as DBTelegramFile
|
||||||
from ..util import sane_mimetypes
|
from ..util import sane_mimetypes
|
||||||
@@ -48,16 +49,7 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
VideoFileClip = random = string = os = mimetypes = None
|
VideoFileClip = random = string = os = mimetypes = None
|
||||||
|
|
||||||
try:
|
from .tgs_converter import convert_tgs
|
||||||
import cairosvg
|
|
||||||
from tgs.parsers.tgs import parse_tgs as tgs_importer
|
|
||||||
from tgs.exporters import svg as tgs_svg_exporter
|
|
||||||
# from tgs.exporters import gif as tgs_gif_exporter
|
|
||||||
except (ImportError, OSError):
|
|
||||||
cairosvg = None
|
|
||||||
tgs_importer = None
|
|
||||||
tgs_svg_exporter = None
|
|
||||||
# tgs_gif_exporter = None
|
|
||||||
|
|
||||||
log: logging.Logger = logging.getLogger("mau.util")
|
log: logging.Logger = logging.getLogger("mau.util")
|
||||||
|
|
||||||
@@ -83,32 +75,6 @@ def convert_image(file: bytes, source_mime: str = "image/webp", target_type: str
|
|||||||
return source_mime, file, None, None
|
return source_mime, file, None, None
|
||||||
|
|
||||||
|
|
||||||
def convert_tgs(file: bytes) -> Tuple[str, bytes, Optional[int], Optional[int]]:
|
|
||||||
if cairosvg and tgs_importer and tgs_svg_exporter:
|
|
||||||
try:
|
|
||||||
with BytesIO(file) as fi:
|
|
||||||
animation = tgs_importer(fi)
|
|
||||||
"""
|
|
||||||
It's possible to convert to gif, but out animation is too big (~500KB),
|
|
||||||
Convert to mp4 needs opencv2 to be installed...
|
|
||||||
TODO: Maybe should create config parameter
|
|
||||||
"""
|
|
||||||
with StringIO() as svg, BytesIO() as fo:
|
|
||||||
frame = int(animation.out_point * 0.3)
|
|
||||||
w, h = 256, 256
|
|
||||||
tgs_svg_exporter.export_svg(animation, svg, frame=frame)
|
|
||||||
svg.seek(0)
|
|
||||||
cairosvg.svg2png(file_obj=svg, write_to=fo, output_width=w, output_height=h)
|
|
||||||
out = fo.getvalue()
|
|
||||||
return "image/png", out, w, h
|
|
||||||
# Yep... some animations crash library...
|
|
||||||
except AttributeError:
|
|
||||||
log.exception("Error occurred while converting animated sticker")
|
|
||||||
else:
|
|
||||||
log.warning("Unable to convert animated sticker, install tgs and cairosvg packages")
|
|
||||||
return "application/gzip", file, None, None
|
|
||||||
|
|
||||||
|
|
||||||
def _temp_file_name(ext: str) -> str:
|
def _temp_file_name(ext: str) -> str:
|
||||||
return ("/tmp/mxtg-video-"
|
return ("/tmp/mxtg-video-"
|
||||||
+ "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
|
+ "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(10))
|
||||||
@@ -200,9 +166,9 @@ async def transfer_file_to_matrix(client: MautrixTelegramClient, intent: IntentA
|
|||||||
if not location_id:
|
if not location_id:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
db_file = DBTelegramFile.get(location_id)
|
#db_file = DBTelegramFile.get(location_id)
|
||||||
if db_file:
|
#if db_file:
|
||||||
return db_file
|
# return db_file
|
||||||
|
|
||||||
try:
|
try:
|
||||||
lock = transfer_locks[location_id]
|
lock = transfer_locks[location_id]
|
||||||
@@ -218,9 +184,9 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
|||||||
loc_id: str, location: TypeLocation,
|
loc_id: str, location: TypeLocation,
|
||||||
thumbnail: TypeThumbnail, is_sticker: bool
|
thumbnail: TypeThumbnail, is_sticker: bool
|
||||||
) -> Optional[DBTelegramFile]:
|
) -> Optional[DBTelegramFile]:
|
||||||
db_file = DBTelegramFile.get(loc_id)
|
#db_file = DBTelegramFile.get(loc_id)
|
||||||
if db_file:
|
#if db_file:
|
||||||
return db_file
|
# return db_file
|
||||||
|
|
||||||
try:
|
try:
|
||||||
file = await client.download_file(location)
|
file = await client.download_file(location)
|
||||||
@@ -235,9 +201,8 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
|||||||
|
|
||||||
image_converted = False
|
image_converted = False
|
||||||
if mime_type == "application/gzip" and is_sticker:
|
if mime_type == "application/gzip" and is_sticker:
|
||||||
mime_type, file, width, height = convert_tgs(file)
|
mime_type, file, width, height, thumbnail = convert_tgs(file, "gif", 128, 128)
|
||||||
image_converted = width is not None
|
image_converted = width is not None
|
||||||
thumbnail = None
|
|
||||||
|
|
||||||
if mime_type == "image/webp":
|
if mime_type == "image/webp":
|
||||||
new_mime_type, file, width, height = convert_image(
|
new_mime_type, file, width, height = convert_image(
|
||||||
|
|||||||
@@ -0,0 +1,148 @@
|
|||||||
|
# Generated by Netbeans
|
||||||
|
# Author: Eramde
|
||||||
|
# Date: 09.2019
|
||||||
|
|
||||||
|
from typing import Optional, Tuple, Union, Dict
|
||||||
|
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
LOG: logging.Logger = logging.getLogger("mau.util.tgs")
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
import cairosvg
|
||||||
|
from io import BytesIO, StringIO
|
||||||
|
from tgs.objects import Animation
|
||||||
|
from tgs.parsers.tgs import parse_tgs as tgs_importer
|
||||||
|
from tgs.exporters import svg as tgs_svg_exporter
|
||||||
|
|
||||||
|
def _tgs_to_png(animation: Animation, width: int = None,
|
||||||
|
height: int = None, frame: int = None) -> Tuple[bytes, Optional[bytes]]:
|
||||||
|
if not frame:
|
||||||
|
frame = int(animation.out_point * 0.3)
|
||||||
|
if not (width and height):
|
||||||
|
width = animation.width
|
||||||
|
height = animation.height
|
||||||
|
svg = StringIO()
|
||||||
|
tgs_svg_exporter.export_svg(animation, svg, frame=frame)
|
||||||
|
svg.seek(0)
|
||||||
|
fo = BytesIO()
|
||||||
|
cairosvg.svg2png(file_obj=svg, write_to=fo, output_width=width, output_height=height)
|
||||||
|
return fo.getvalue(), None
|
||||||
|
|
||||||
|
TGS_CONVERTERS = {"image": _tgs_to_png}
|
||||||
|
|
||||||
|
try:
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
def _tgs_to_gif(animation: Animation, width: int = None, height: int = None) \
|
||||||
|
-> Tuple[bytes, Optional[bytes]]:
|
||||||
|
"""
|
||||||
|
FIXME: copy-pasted from tgs.exporters.gif, because it's method don't resize images
|
||||||
|
"""
|
||||||
|
start = int(animation.in_point)
|
||||||
|
end = int(animation.out_point)
|
||||||
|
skip_frames = 5
|
||||||
|
frames = []
|
||||||
|
first_frame = None
|
||||||
|
for i in range(start, end + 1, skip_frames):
|
||||||
|
frame, _ = _tgs_to_png(animation, width, height, i)
|
||||||
|
if not first_frame:
|
||||||
|
first_frame = frame
|
||||||
|
image = Image.open(BytesIO(frame))
|
||||||
|
if image.mode not in ["RGBA", "RGBa"]:
|
||||||
|
image = image.convert("RGBA")
|
||||||
|
alpha = image.getchannel("A")
|
||||||
|
image = image.convert('P', palette=Image.ADAPTIVE, colors=255)
|
||||||
|
mask = Image.eval(alpha, lambda a: 255 if a <= 128 else 0)
|
||||||
|
image.paste(255, mask)
|
||||||
|
frames.append(image)
|
||||||
|
|
||||||
|
duration = 1000 / animation.frame_rate
|
||||||
|
fo = BytesIO()
|
||||||
|
frames[0].save(
|
||||||
|
fo,
|
||||||
|
format='GIF',
|
||||||
|
append_images=frames[1:],
|
||||||
|
save_all=True,
|
||||||
|
duration=duration,
|
||||||
|
loop=0,
|
||||||
|
transparency=255,
|
||||||
|
disposal=2,
|
||||||
|
)
|
||||||
|
return fo.getvalue(), first_frame
|
||||||
|
|
||||||
|
TGS_CONVERTERS.update({"gif": _tgs_to_gif})
|
||||||
|
except ImportError:
|
||||||
|
LOG.warn("Unable to create tgs to gif converter, install PIL")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import cv2
|
||||||
|
import numpy
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
from PIL import Image
|
||||||
|
|
||||||
|
def _tgs_to_video(animation: Animation, width: int = None, height: int = None) \
|
||||||
|
-> Tuple[bytes, Optional[bytes]]:
|
||||||
|
"""
|
||||||
|
FIXME: copy-pasted from tgs.exporters.video, because it's method don't resize images
|
||||||
|
"""
|
||||||
|
start = int(animation.in_point)
|
||||||
|
end = int(animation.out_point)
|
||||||
|
with tempfile.NamedTemporaryFile(mode="r+b", suffix=".mp4") as tmp:
|
||||||
|
video_tmp_file = tmp.name
|
||||||
|
video = None
|
||||||
|
first_frame = None
|
||||||
|
try:
|
||||||
|
video = cv2.VideoWriter(filename=video_tmp_file, apiPreference=cv2.CAP_ANY,
|
||||||
|
fourcc=cv2.VideoWriter_fourcc(*'vp09'),
|
||||||
|
fps=int(animation.frame_rate),
|
||||||
|
frameSize=(width or animation.width,
|
||||||
|
height or animation.height))
|
||||||
|
|
||||||
|
for i in range(start, end + 1):
|
||||||
|
frame, _ = _tgs_to_png(animation, width, height, i)
|
||||||
|
if not first_frame:
|
||||||
|
first_frame = frame
|
||||||
|
video.write(cv2.cvtColor(numpy.array(Image.open(BytesIO(frame))),
|
||||||
|
cv2.COLOR_RGB2BGR))
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if video:
|
||||||
|
video.release()
|
||||||
|
with open(video_tmp_file, "rb") as video_file:
|
||||||
|
out = video_file.read()
|
||||||
|
os.remove(video_tmp_file)
|
||||||
|
return out, first_frame
|
||||||
|
"""
|
||||||
|
It seems, that riot don't wont to play converted videos...
|
||||||
|
"""
|
||||||
|
TGS_CONVERTERS.update({"video": _tgs_to_video})
|
||||||
|
except ImportError:
|
||||||
|
LOG.warn("Unable to create tgs to video converter, "
|
||||||
|
"install PIL, numpy and opencv-python-headless")
|
||||||
|
|
||||||
|
except (ImportError, OSError):
|
||||||
|
LOG.exception("Unable to init tgs converters, possibly missing tgs and/or cairo libraries")
|
||||||
|
TGS_CONVERTERS = {}
|
||||||
|
|
||||||
|
TYPE_TO_MIME = {"image": "image/png", "gif": "image/gif", "video": "video/mp4"}
|
||||||
|
|
||||||
|
|
||||||
|
def convert_tgs(file: bytes, convert_to: str, width: int = None, height: int = None) \
|
||||||
|
-> Tuple[str, bytes, Optional[int], Optional[int], Optional[bytes]]:
|
||||||
|
if convert_to in TGS_CONVERTERS:
|
||||||
|
converter = TGS_CONVERTERS[convert_to]
|
||||||
|
mime = TYPE_TO_MIME[convert_to]
|
||||||
|
try:
|
||||||
|
animation = tgs_importer(BytesIO(file))
|
||||||
|
out, preview = converter(animation, width, height)
|
||||||
|
return mime, out, width or animation.width, height or animation.height, preview
|
||||||
|
# Yep... some animations crash library...
|
||||||
|
except AttributeError:
|
||||||
|
LOG.exception("Error occurred while converting animated sticker")
|
||||||
|
else:
|
||||||
|
LOG.warning(f"Unable to convert animated sticker, no converter for type {convert_to}")
|
||||||
|
return "application/gzip", file, None, None, None
|
||||||
Reference in New Issue
Block a user