147 lines
4.4 KiB
Python
147 lines
4.4 KiB
Python
import argparse
|
|
import asyncio
|
|
import dataclasses
|
|
import json
|
|
import os.path
|
|
import sys
|
|
from pathlib import Path
|
|
import dacite
|
|
|
|
import aiohttp
|
|
import typing
|
|
import urllib.parse
|
|
from typing import Optional
|
|
from dataclasses import dataclass
|
|
from tqdm import tqdm
|
|
|
|
|
|
@dataclass
|
|
class Args:
|
|
files: list[str]
|
|
tag: typing.Optional[str]
|
|
room_id: typing.Optional[str]
|
|
room_event_id: typing.Optional[str]
|
|
token: str
|
|
instance: str
|
|
|
|
|
|
@dataclass
|
|
class EmojiDef:
|
|
url: str
|
|
|
|
|
|
@dataclass
|
|
class PackMeta:
|
|
display_name: str
|
|
usage: Optional[list[str]]
|
|
|
|
|
|
@dataclass
|
|
class EmojiPack:
|
|
images: dict[str, EmojiDef]
|
|
pack: PackMeta
|
|
|
|
|
|
class DataclassEncoder(json.JSONEncoder):
|
|
def default(self, o):
|
|
if dataclasses.is_dataclass(o):
|
|
return dataclasses.asdict(o)
|
|
return super().default(o)
|
|
|
|
|
|
async def upload_emoji(args: Args):
|
|
client_api_base: str = f"https://{args.instance}/_matrix/client"
|
|
|
|
if args.tag:
|
|
handle_urlenc: str = urllib.parse.quote(args.tag, safe="")
|
|
api_url: str = f"{client_api_base}/r0/user/{handle_urlenc}/account_data/im.ponies.user_emotes"
|
|
elif args.room_id:
|
|
room_urlenc: str = urllib.parse.quote(args.room_id, safe="")
|
|
room_event_urlenc: str = urllib.parse.quote(args.room_event_id, safe="")
|
|
api_url: str = f"{client_api_base}/r0/rooms/{room_urlenc}/state/im.ponies.room_emotes/{room_event_urlenc}"
|
|
else:
|
|
print("Neither the user nor room option have been selected", file=sys.stderr)
|
|
return
|
|
|
|
async with aiohttp.ClientSession(headers={
|
|
"Authorization": f"Bearer {args.token}"
|
|
}) as session:
|
|
print("Querying emotes at:", api_url)
|
|
existing_emotes = await session.get(api_url)
|
|
|
|
if existing_emotes.status != 200:
|
|
print("Failed to fetch emotes, status code", existing_emotes.status)
|
|
return
|
|
|
|
emotes: EmojiPack = dacite.from_dict(EmojiPack, await existing_emotes.json())
|
|
|
|
for file in tqdm(args.files):
|
|
base_file_name: str = urllib.parse.quote(os.path.basename(file), safe="")
|
|
file_upload_url: str = f"https://{args.instance}/_matrix/media/r0/upload?filename={base_file_name} -> "
|
|
|
|
try:
|
|
with open(file, "rb") as file_data:
|
|
print(file, "-> ", end="")
|
|
|
|
resp = await session.post(file_upload_url, data=file_data)
|
|
|
|
if resp.status == 200:
|
|
resp_body: dict[str, str] = await resp.json()
|
|
dest_uri: str = resp_body["content_uri"]
|
|
|
|
print(dest_uri)
|
|
|
|
shortcode = Path(file).stem
|
|
|
|
emotes.images[shortcode] = EmojiDef(url=dest_uri)
|
|
else:
|
|
print("Error:", resp)
|
|
|
|
except IOError as e:
|
|
print("\n\tUpload failed:", e, file=sys.stderr)
|
|
|
|
new_emotes_json: str = json.dumps(emotes, cls=DataclassEncoder)
|
|
print(new_emotes_json)
|
|
replaced = await session.put(api_url, data=new_emotes_json)
|
|
|
|
if replaced.status == 200:
|
|
print("Success! Refresh your client.")
|
|
else:
|
|
print("Error:", replaced)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
prog="matrix-emoji-upload",
|
|
description="Batch upload emojis to Matrix")
|
|
|
|
parser.add_argument("files",
|
|
metavar="emoji_file",
|
|
type=str,
|
|
nargs="+",
|
|
help="a list of files to upload")
|
|
|
|
ex = parser.add_mutually_exclusive_group()
|
|
ex.add_argument("--tag",
|
|
type=str,
|
|
help="the Matrix handle of the user")
|
|
ex.add_argument("--room-id",
|
|
type=str,
|
|
help="the Matrix room to upload the emoji in")
|
|
parser.add_argument("--room-event-id",
|
|
type=str,
|
|
default="",
|
|
help="the Matrix room event ID the pack is stored in")
|
|
|
|
parser.add_argument("--token",
|
|
type=str,
|
|
required=True,
|
|
help="a Matrix account bearer token")
|
|
|
|
parser.add_argument("--instance",
|
|
type=str,
|
|
required=True,
|
|
help="a Matrix instance domain")
|
|
|
|
asyncio.run(upload_emoji(typing.cast(Args, parser.parse_args())))
|