diff --git a/packages/backend/package.json b/packages/backend/package.json index b584a56910..6f63441023 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -34,6 +34,7 @@ "@koa/cors": "3.4.3", "@koa/multer": "3.0.2", "@koa/router": "9.0.1", + "@msgpack/msgpack": "3.0.0-beta2", "@peertube/http-signature": "1.7.0", "@redocly/openapi-core": "1.0.0-beta.120", "@sinonjs/fake-timers": "9.1.2", @@ -43,7 +44,6 @@ "ajv": "8.12.0", "archiver": "5.3.1", "argon2": "^0.30.3", - "async-mutex": "^0.4.0", "autobind-decorator": "2.4.0", "autolinker": "4.0.0", "autwh": "0.1.0", diff --git a/packages/backend/src/misc/cache.ts b/packages/backend/src/misc/cache.ts index 9abebc91cb..d790313d18 100644 --- a/packages/backend/src/misc/cache.ts +++ b/packages/backend/src/misc/cache.ts @@ -1,43 +1,75 @@ +import { redisClient } from "@/db/redis.js"; +import { nativeRandomStr } from "native-utils/built/index.js"; +import { encode, decode } from "@msgpack/msgpack"; +import { ChainableCommander } from "ioredis"; + export class Cache { - public cache: Map; - private lifetime: number; + private ttl: number; + private fingerprint: string; - constructor(lifetime: Cache["lifetime"]) { - this.cache = new Map(); - this.lifetime = lifetime; + constructor(ttl: number) { + this.ttl = ttl; + this.fingerprint = `cache:${nativeRandomStr(32)}`; } - public set(key: string | null, value: T): void { - this.cache.set(key, { - date: Date.now(), - value, - }); + private prefixedKey(key: string | null): string { + return key ? `${this.fingerprint}:${key}` : this.fingerprint; } - public get(key: string | null): T | undefined { - const cached = this.cache.get(key); - if (cached == null) return undefined; - if (Date.now() - cached.date > this.lifetime) { - this.cache.delete(key); - return undefined; + public async set(key: string | null, value: T, transaction?: ChainableCommander): Promise { + const _key = this.prefixedKey(key); + const _value = Buffer.from(encode(value)); + const commander = transaction ?? redisClient; + if (this.ttl === Infinity) { + await commander.set(_key, _value); + } else { + await commander.set(_key, _value, "PX", this.ttl); } - return cached.value; } - public delete(key: string | null) { - this.cache.delete(key); + public async get(key: string | null): Promise { + const _key = this.prefixedKey(key); + const cached = await redisClient.getBuffer(_key); + if (cached === null) return undefined; + + return decode(cached) as T; + } + + public async getAll(): Promise> { + const keys = await redisClient.keys(`${this.fingerprint}*`); + const map = new Map(); + if (keys.length === 0) { + return map; + } + const values = await redisClient.mgetBuffer(keys); + + for (const [i, key] of keys.entries()) { + const val = values[i]; + if (val !== null) { + map.set(key, decode(val) as T); + } + } + + return map; + } + + public async delete(...keys: (string | null)[]): Promise { + if (keys.length > 0) { + const _keys = keys.map(this.prefixedKey); + await redisClient.del(_keys); + } } /** - * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します - * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします + * Returns if cached value exists. Otherwise, calls fetcher and caches. + * Overwrites cached value if invalidated by the optional validator. */ public async fetch( key: string | null, fetcher: () => Promise, validator?: (cachedValue: T) => boolean, ): Promise { - const cachedValue = this.get(key); + const cachedValue = await this.get(key); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { @@ -52,20 +84,20 @@ export class Cache { // Cache MISS const value = await fetcher(); - this.set(key, value); + await this.set(key, value); return value; } /** - * キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します - * optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします + * Returns if cached value exists. Otherwise, calls fetcher and caches if the fetcher returns a value. + * Overwrites cached value if invalidated by the optional validator. */ public async fetchMaybe( key: string | null, fetcher: () => Promise, validator?: (cachedValue: T) => boolean, ): Promise { - const cachedValue = this.get(key); + const cachedValue = await this.get(key); if (cachedValue !== undefined) { if (validator) { if (validator(cachedValue)) { @@ -81,7 +113,7 @@ export class Cache { // Cache MISS const value = await fetcher(); if (value !== undefined) { - this.set(key, value); + await this.set(key, value); } return value; } diff --git a/packages/backend/src/misc/emoji-meta.ts b/packages/backend/src/misc/emoji-meta.ts index fd9d9baa5c..45364bdcbc 100644 --- a/packages/backend/src/misc/emoji-meta.ts +++ b/packages/backend/src/misc/emoji-meta.ts @@ -1,9 +1,10 @@ import probeImageSize from "probe-image-size"; -import { Mutex, withTimeout } from "async-mutex"; +import { Mutex } from "redis-semaphore"; import { FILE_TYPE_BROWSERSAFE } from "@/const.js"; import Logger from "@/services/logger.js"; import { Cache } from "./cache.js"; +import { redisClient } from "@/db/redis.js"; export type Size = { width: number; @@ -11,23 +12,30 @@ export type Size = { }; const cache = new Cache(1000 * 60 * 10); // once every 10 minutes for the same url -const mutex = withTimeout(new Mutex(), 1000); +const logger = new Logger("emoji"); export async function getEmojiSize(url: string): Promise { - const logger = new Logger("emoji"); + let attempted = true; - await mutex.runExclusive(() => { - const attempted = cache.get(url); - if (!attempted) { - cache.set(url, true); - } else { - logger.warn(`Attempt limit exceeded: ${url}`); - throw new Error("Too many attempts"); - } - }); + const lock = new Mutex(redisClient, "getEmojiSize"); + await lock.acquire(); try { - logger.info(`Retrieving emoji size from ${url}`); + attempted = (await cache.get(url)) === true; + if (!attempted) { + await cache.set(url, true); + } + } finally { + await lock.release(); + } + + if (attempted) { + logger.warn(`Attempt limit exceeded: ${url}`); + throw new Error("Too many attempts"); + } + + try { + logger.debug(`Retrieving emoji size from ${url}`); const { width, height, mime } = await probeImageSize(url, { timeout: 5000, }); diff --git a/packages/backend/src/misc/populate-emojis.ts b/packages/backend/src/misc/populate-emojis.ts index 7aee4ec253..ce25dd5594 100644 --- a/packages/backend/src/misc/populate-emojis.ts +++ b/packages/backend/src/misc/populate-emojis.ts @@ -7,6 +7,7 @@ import { isSelfHost, toPunyNullable } from "./convert-host.js"; import { decodeReaction } from "./reaction-lib.js"; import config from "@/config/index.js"; import { query } from "@/prelude/url.js"; +import { redisClient } from "@/db/redis.js"; const cache = new Cache(1000 * 60 * 60 * 12); @@ -75,7 +76,7 @@ export async function populateEmoji( if (emoji && !(emoji.width && emoji.height)) { emoji = await queryOrNull(); - cache.set(cacheKey, emoji); + await cache.set(cacheKey, emoji); } if (emoji == null) return null; @@ -150,7 +151,7 @@ export async function prefetchEmojis( emojis: { name: string; host: string | null }[], ): Promise { const notCachedEmojis = emojis.filter( - (emoji) => cache.get(`${emoji.name} ${emoji.host}`) == null, + async (emoji) => !(await cache.get(`${emoji.name} ${emoji.host}`)), ); const emojisQuery: any[] = []; const hosts = new Set(notCachedEmojis.map((e) => e.host)); @@ -169,7 +170,9 @@ export async function prefetchEmojis( select: ["name", "host", "originalUrl", "publicUrl"], }) : []; + const trans = redisClient.multi(); for (const emoji of _emojis) { - cache.set(`${emoji.name} ${emoji.host}`, emoji); + cache.set(`${emoji.name} ${emoji.host}`, emoji, trans); } + await trans.exec(); } diff --git a/packages/backend/src/remote/activitypub/models/person.ts b/packages/backend/src/remote/activitypub/models/person.ts index f8208e6d7b..c541e9ae50 100644 --- a/packages/backend/src/remote/activitypub/models/person.ts +++ b/packages/backend/src/remote/activitypub/models/person.ts @@ -135,14 +135,14 @@ export async function fetchPerson( ): Promise { if (typeof uri !== "string") throw new Error("uri is not string"); - const cached = uriPersonCache.get(uri); + const cached = await uriPersonCache.get(uri); if (cached) return cached; // Fetch from the database if the URI points to this server if (uri.startsWith(`${config.url}/`)) { const id = uri.split("/").pop(); const u = await Users.findOneBy({ id }); - if (u) uriPersonCache.set(uri, u); + if (u) await uriPersonCache.set(uri, u); return u; } @@ -150,7 +150,7 @@ export async function fetchPerson( const exist = await Users.findOneBy({ uri }); if (exist) { - uriPersonCache.set(uri, exist); + await uriPersonCache.set(uri, exist); return exist; } //#endregion diff --git a/packages/backend/src/services/chart/charts/active-users.ts b/packages/backend/src/services/chart/charts/active-users.ts index 7a0c45cfaf..15317e68b0 100644 --- a/packages/backend/src/services/chart/charts/active-users.ts +++ b/packages/backend/src/services/chart/charts/active-users.ts @@ -25,12 +25,12 @@ export default class ActiveUsersChart extends Chart { return {}; } - public async read(user: { + public read(user: { id: User["id"]; host: null; createdAt: User["createdAt"]; - }): Promise { - await this.commit({ + }) { + this.commit({ read: [user.id], registeredWithinWeek: Date.now() - user.createdAt.getTime() < week ? [user.id] : [], diff --git a/packages/backend/src/services/instance-actor.ts b/packages/backend/src/services/instance-actor.ts index 50ce227eba..9240f31073 100644 --- a/packages/backend/src/services/instance-actor.ts +++ b/packages/backend/src/services/instance-actor.ts @@ -9,7 +9,7 @@ const ACTOR_USERNAME = "instance.actor" as const; const cache = new Cache(Infinity); export async function getInstanceActor(): Promise { - const cached = cache.get(null); + const cached = await cache.get(null); if (cached) return cached; const user = (await Users.findOneBy({ @@ -18,11 +18,11 @@ export async function getInstanceActor(): Promise { })) as ILocalUser | undefined; if (user) { - cache.set(null, user); + await cache.set(null, user); return user; } else { const created = (await createSystemUser(ACTOR_USERNAME)) as ILocalUser; - cache.set(null, created); + await cache.set(null, created); return created; } } diff --git a/packages/backend/src/services/register-or-fetch-instance-doc.ts b/packages/backend/src/services/register-or-fetch-instance-doc.ts index 4c3570e907..ddb9ce2413 100644 --- a/packages/backend/src/services/register-or-fetch-instance-doc.ts +++ b/packages/backend/src/services/register-or-fetch-instance-doc.ts @@ -9,25 +9,25 @@ const cache = new Cache(1000 * 60 * 60); export async function registerOrFetchInstanceDoc( host: string, ): Promise { - host = toPuny(host); + const _host = toPuny(host); - const cached = cache.get(host); + const cached = await cache.get(_host); if (cached) return cached; - const index = await Instances.findOneBy({ host }); + const index = await Instances.findOneBy({ host: _host }); if (index == null) { const i = await Instances.insert({ id: genId(), - host, + host: _host, caughtAt: new Date(), lastCommunicatedAt: new Date(), }).then((x) => Instances.findOneByOrFail(x.identifiers[0])); - cache.set(host, i); + await cache.set(_host, i); return i; } else { - cache.set(host, index); + await cache.set(_host, index); return index; } } diff --git a/packages/backend/src/services/relay.ts b/packages/backend/src/services/relay.ts index bec4b1f86b..2325f76c69 100644 --- a/packages/backend/src/services/relay.ts +++ b/packages/backend/src/services/relay.ts @@ -90,7 +90,7 @@ async function updateRelaysCache() { const relays = await Relays.findBy({ status: "accepted", }); - relaysCache.set(null, relays); + await relaysCache.set(null, relays); } export async function relayRejected(id: string) { diff --git a/packages/backend/src/services/user-cache.ts b/packages/backend/src/services/user-cache.ts index 9492448554..373fb86869 100644 --- a/packages/backend/src/services/user-cache.ts +++ b/packages/backend/src/services/user-cache.ts @@ -6,7 +6,7 @@ import type { import { User } from "@/models/entities/user.js"; import { Users } from "@/models/index.js"; import { Cache } from "@/misc/cache.js"; -import { subscriber } from "@/db/redis.js"; +import { redisClient, subscriber } from "@/db/redis.js"; export const userByIdCache = new Cache(Infinity); export const localUserByNativeTokenCache = new Cache( @@ -22,13 +22,12 @@ subscriber.on("message", async (_, data) => { const { type, body } = obj.message; switch (type) { case "localUserUpdated": { - userByIdCache.delete(body.id); - localUserByIdCache.delete(body.id); - localUserByNativeTokenCache.cache.forEach((v, k) => { - if (v.value?.id === body.id) { - localUserByNativeTokenCache.delete(k); - } - }); + await userByIdCache.delete(body.id); + await localUserByIdCache.delete(body.id); + const toDelete = Array.from(await localUserByNativeTokenCache.getAll()) + .filter((v) => v[1]?.id === body.id) + .map((v) => v[0]); + await localUserByNativeTokenCache.delete(...toDelete); break; } case "userChangeSuspendedState": @@ -36,15 +35,17 @@ subscriber.on("message", async (_, data) => { case "userChangeModeratorState": case "remoteUserUpdated": { const user = await Users.findOneByOrFail({ id: body.id }); - userByIdCache.set(user.id, user); - for (const [k, v] of uriPersonCache.cache.entries()) { - if (v.value?.id === user.id) { - uriPersonCache.set(k, user); + await userByIdCache.set(user.id, user); + const trans = redisClient.multi(); + for (const [k, v] of (await uriPersonCache.getAll()).entries()) { + if (v?.id === user.id) { + await uriPersonCache.set(k, user, trans); } } + await trans.exec(); if (Users.isLocalUser(user)) { - localUserByNativeTokenCache.set(user.token, user); - localUserByIdCache.set(user.id, user); + await localUserByNativeTokenCache.set(user.token, user); + await localUserByIdCache.set(user.id, user); } break; } @@ -52,8 +53,8 @@ subscriber.on("message", async (_, data) => { const user = (await Users.findOneByOrFail({ id: body.id, })) as ILocalUser; - localUserByNativeTokenCache.delete(body.oldToken); - localUserByNativeTokenCache.set(body.newToken, user); + await localUserByNativeTokenCache.delete(body.oldToken); + await localUserByNativeTokenCache.set(body.newToken, user); break; } default: diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1002a6f958..560bb55a37 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -105,6 +105,9 @@ importers: '@koa/router': specifier: 9.0.1 version: 9.0.1 + '@msgpack/msgpack': + specifier: 3.0.0-beta2 + version: 3.0.0-beta2 '@peertube/http-signature': specifier: 1.7.0 version: 1.7.0 @@ -132,9 +135,6 @@ importers: argon2: specifier: ^0.30.3 version: 0.30.3 - async-mutex: - specifier: ^0.4.0 - version: 0.4.0 autobind-decorator: specifier: 2.4.0 version: 2.4.0 @@ -786,7 +786,7 @@ importers: version: 2.30.0 emojilib: specifier: github:thatonecalculator/emojilib - version: github.com/thatonecalculator/emojilib/542fcc1a25003afad78f3248ceee8ac6980ddeb8 + version: github.com/thatonecalculator/emojilib/06944984a61ee799b7083894258f5fa318d932d1 escape-regexp: specifier: 0.0.1 version: 0.0.1 @@ -2277,6 +2277,11 @@ packages: os-filter-obj: 2.0.0 dev: true + /@msgpack/msgpack@3.0.0-beta2: + resolution: {integrity: sha512-y+l1PNV0XDyY8sM3YtuMLK5vE3/hkfId+Do8pLo/OPxfxuFAUwcGz3oiiUuV46/aBpwTzZ+mRWVMtlSKbradhw==} + engines: {node: '>= 14'} + dev: false + /@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.2: resolution: {integrity: sha512-9bfjwDxIDWmmOKusUcqdS4Rw+SETlp9Dy39Xui9BEGEk19dDwH0jhipwFzEff/pFg95NKymc6TOTbRKcWeRqyQ==} cpu: [arm64] @@ -4496,12 +4501,6 @@ packages: stream-exhaust: 1.0.2 dev: true - /async-mutex@0.4.0: - resolution: {integrity: sha512-eJFZ1YhRR8UN8eBLoNzcDPcy/jqjsg6I1AP+KvWQX80BqOSW1oJPJXDylPUEeMr2ZQvHgnQ//Lp6f3RQ1zI7HA==} - dependencies: - tslib: 2.6.0 - dev: false - /async-settle@1.0.0: resolution: {integrity: sha512-VPXfB4Vk49z1LHHodrEQ6Xf7W4gg1w0dAPROHngx7qgDjqmIQ+fXmwgGXTW/ITLai0YLSvWepJOP9EVpMnEAcw==} engines: {node: '>= 0.10'} @@ -15772,8 +15771,8 @@ packages: url-polyfill: 1.1.12 dev: true - github.com/thatonecalculator/emojilib/542fcc1a25003afad78f3248ceee8ac6980ddeb8: - resolution: {tarball: https://codeload.github.com/thatonecalculator/emojilib/tar.gz/542fcc1a25003afad78f3248ceee8ac6980ddeb8} + github.com/thatonecalculator/emojilib/06944984a61ee799b7083894258f5fa318d932d1: + resolution: {tarball: https://codeload.github.com/thatonecalculator/emojilib/tar.gz/06944984a61ee799b7083894258f5fa318d932d1} name: emojilib version: 3.0.10 dev: true