127 lines
3.5 KiB
Python
127 lines
3.5 KiB
Python
|
import argparse
|
||
|
import asyncio
|
||
|
import dataclasses
|
||
|
import json
|
||
|
import os.path
|
||
|
import sys
|
||
|
from pathlib import Path
|
||
|
import dacite
|
||
|
|
||
|
import aiohttp
|
||
|
import typing
|
||
|
import urllib.parse
|
||
|
from dataclasses import dataclass
|
||
|
from tqdm import tqdm
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class Args:
|
||
|
files: list[str]
|
||
|
tag: str
|
||
|
token: str
|
||
|
instance: str
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class EmojiDef:
|
||
|
url: str
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class PackMeta:
|
||
|
display_name: str
|
||
|
usage: list[str]
|
||
|
|
||
|
|
||
|
@dataclass
|
||
|
class EmojiPack:
|
||
|
images: dict[str, EmojiDef]
|
||
|
pack: PackMeta
|
||
|
|
||
|
|
||
|
class DataclassEncoder(json.JSONEncoder):
|
||
|
def default(self, o):
|
||
|
if dataclasses.is_dataclass(o):
|
||
|
return dataclasses.asdict(o)
|
||
|
return super().default(o)
|
||
|
|
||
|
|
||
|
async def upload_emoji(args: Args):
|
||
|
handle_urlenc: str = urllib.parse.quote(args.tag, safe="")
|
||
|
api_url: str = f"https://{args.instance}/_matrix/client/r0/user/{handle_urlenc}/account_data/im.ponies.user_emotes"
|
||
|
|
||
|
async with aiohttp.ClientSession(headers={
|
||
|
"Authorization": f"Bearer {args.token}"
|
||
|
}) as session:
|
||
|
print("Querying emotes at:", api_url)
|
||
|
existing_emotes = await session.get(api_url)
|
||
|
|
||
|
if existing_emotes.status != 200:
|
||
|
print("Failed to fetch emotes, status code", existing_emotes.status)
|
||
|
return
|
||
|
|
||
|
emotes: EmojiPack = dacite.from_dict(EmojiPack, await existing_emotes.json())
|
||
|
|
||
|
for file in tqdm(args.files):
|
||
|
base_file_name: str = urllib.parse.quote(os.path.basename(file), safe="")
|
||
|
file_upload_url: str = f"https://{args.instance}/_matrix/media/r0/upload?filename={base_file_name} -> "
|
||
|
|
||
|
try:
|
||
|
with open(file, "rb") as file_data:
|
||
|
print(file, "-> ", end="")
|
||
|
|
||
|
resp = await session.post(file_upload_url, data=file_data)
|
||
|
|
||
|
if resp.status == 200:
|
||
|
resp_body: dict[str, str] = await resp.json()
|
||
|
dest_uri: str = resp_body["content_uri"]
|
||
|
|
||
|
print(dest_uri)
|
||
|
|
||
|
shortcode = Path(file).stem
|
||
|
|
||
|
emotes.images[shortcode] = EmojiDef(url=dest_uri)
|
||
|
else:
|
||
|
print("Error:", resp)
|
||
|
|
||
|
except IOError as e:
|
||
|
print("\n\tUpload failed:", e, file=sys.stderr)
|
||
|
|
||
|
new_emotes_json: str = json.dumps(emotes, cls=DataclassEncoder)
|
||
|
print(new_emotes_json)
|
||
|
replaced = await session.put(api_url, data=new_emotes_json)
|
||
|
|
||
|
if replaced.status == 200:
|
||
|
print("Success! Refresh your client.")
|
||
|
else:
|
||
|
print("Error:", replaced)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
parser = argparse.ArgumentParser(
|
||
|
prog="matrix-emoji-upload",
|
||
|
description="Batch upload emojis to Matrix")
|
||
|
|
||
|
parser.add_argument("files",
|
||
|
metavar="F",
|
||
|
type=str,
|
||
|
nargs="+",
|
||
|
help="a list of files to upload")
|
||
|
|
||
|
parser.add_argument("--tag",
|
||
|
type=str,
|
||
|
required=True,
|
||
|
help="the Matrix handle of the user")
|
||
|
|
||
|
parser.add_argument("--token",
|
||
|
type=str,
|
||
|
required=True,
|
||
|
help="a Matrix account bearer token")
|
||
|
|
||
|
parser.add_argument("--instance",
|
||
|
type=str,
|
||
|
required=True,
|
||
|
help="a Matrix instance domain")
|
||
|
|
||
|
asyncio.run(upload_emoji(typing.cast(Args, parser.parse_args())))
|