From 38d4d347135d37ef873b43b9e7227e7963d0bc59 Mon Sep 17 00:00:00 2001 From: Namekuji Date: Sat, 1 Jul 2023 00:50:46 -0400 Subject: [PATCH] refactor: use redis-semaphore for mutex across workers --- packages/backend/package.json | 1 + packages/backend/src/services/note/create.ts | 89 ++++++++------------ pnpm-lock.yaml | 17 ++++ 3 files changed, 53 insertions(+), 54 deletions(-) diff --git a/packages/backend/package.json b/packages/backend/package.json index f7d19d85b2..b584a56910 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -112,6 +112,7 @@ "ratelimiter": "3.4.1", "re2": "1.19.0", "redis-lock": "0.1.4", + "redis-semaphore": "5.3.1", "reflect-metadata": "0.1.13", "rename": "1.0.4", "rndstr": "1.0.0", diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 369b3e4f77..6d64bec50e 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -69,6 +69,7 @@ import { getActiveWebhooks } from "@/misc/webhook-cache.js"; import { shouldSilenceInstance } from "@/misc/should-block-instance.js"; import meilisearch from "../../db/meilisearch.js"; import { redisClient } from "@/db/redis.js"; +import { Mutex } from "redis-semaphore"; const mutedWordsCache = new Cache< { userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[] @@ -461,63 +462,43 @@ export default async ( } if (!dontFederateInitially) { - if (Users.isLocalUser(user)) { - // Publish if the post is local - publishNotesStream(note); - } else { - const relays = await getCachedRelays(); - // Some relays (e.g., aode-relay) deliver posts by boosting them as - // Announce activities. In that case, user is the relay's actor. - const boostedByRelay = - !!user.inbox && - relays.map((relay) => relay.inbox).includes(user.inbox); + let publishKey: string; + let noteToPublish: Note; + const relays = await getCachedRelays(); - if (boostedByRelay && data.renote && data.renote.userHost) { - /* A relay boosted a remote post. */ - // Use Redis transaction for atomicity - const key = `publishedNote:${data.renote.id}`; - await redisClient.watch(key); - const exists = await redisClient.exists(key); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - transaction.set(key, 1, "EX", 30); - // Execute the transaction - await transaction.exec((err, _replies) => { - // Publish after setting the key in Redis - if (!err && boostedByRelay && data.renote) { - publishNotesStream(data.renote); - } - }); - } else { - // Abort the transaction - redisClient.unwatch(); - } - } else { - // Use Redis transaction for atomicity - const key = `publishedNote:${note.id}`; - await redisClient.watch(key); - const exists = await redisClient.exists(key); - if (exists === 0) { - // Start the transaction - const transaction = redisClient.multi(); - transaction.set(key, 1, "EX", 30); - if (note.renoteId) { - // Prevent other threads from publishing this boosting post - transaction.set(`publishedNote:${note.renoteId}`, 1, "EX", 30); - } - // Execute the transaction - await transaction.exec((err, _replies) => { - // Publish after setting the key in Redis - if (!err) { - publishNotesStream(note); - } - }); - } else { - // Abort the transaction - redisClient.unwatch(); + // Some relays (e.g., aode-relay) deliver posts by boosting them as + // Announce activities. In that case, user is the relay's actor. + const boostedByRelay = + !!user.inbox && + relays.map((relay) => relay.inbox).includes(user.inbox); + + if (boostedByRelay && data.renote && data.renote.userHost) { + publishKey = `publishedNote:${data.renote.id}`; + noteToPublish = data.renote; + } else { + publishKey = `publishedNote:${note.id}`; + noteToPublish = note; + } + + const lock = new Mutex(redisClient, "publishedNote:lock"); + await lock.acquire(); + try { + const exists = (await redisClient.exists(publishKey)) > 0; + if (!exists) { + await redisClient.set(publishKey, 1, "EX", 30); + if (noteToPublish.renoteId) { + // Prevents other threads from publishing the boosting post + await redisClient.set( + `publishedNote:${noteToPublish.renoteId}`, + 1, + "EX", + 30, + ); } + publishNotesStream(noteToPublish); } + } finally { + lock.release(); } } if (note.replyId != null) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 760931858b..1002a6f958 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -339,6 +339,9 @@ importers: redis-lock: specifier: 0.1.4 version: 0.1.4 + redis-semaphore: + specifier: 5.3.1 + version: 5.3.1(ioredis@5.3.2) reflect-metadata: specifier: 0.1.13 version: 0.1.13 @@ -2666,6 +2669,7 @@ packages: engines: {node: '>=10'} cpu: [arm64] os: [android] + requiresBuild: true dependencies: '@swc/wasm': 1.2.130 @@ -2772,6 +2776,7 @@ packages: /@swc/wasm@1.2.130: resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==} + requiresBuild: true /@syuilo/aiscript@0.11.1: resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==} @@ -12831,6 +12836,18 @@ packages: dependencies: redis-errors: 1.2.0 + /redis-semaphore@5.3.1(ioredis@5.3.2): + resolution: {integrity: sha512-oUpxxfxSbh5eT0mvVpz2d4Qlg2CsaoQkeo80/v6CU2l97zO0u6NPgc9/zQZa9KGR3/93b0igtSct3hEFh8Ei8w==} + engines: {node: '>= 14.17.0'} + peerDependencies: + ioredis: ^4.1.0 || ^5 + dependencies: + debug: 4.3.4(supports-color@8.1.1) + ioredis: 5.3.2 + transitivePeerDependencies: + - supports-color + dev: false + /redis@4.6.7: resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==} dependencies: