Merge pull request '[PR]: fix: publication logic involving relays' (#10383) from nmkj/calckey:fix-boost-publication into develop

Reviewed-on: https://codeberg.org/calckey/calckey/pulls/10383
This commit is contained in:
Kainoa Kanter 2023-07-02 05:28:01 +00:00
commit 599fc3d7a3
3 changed files with 48 additions and 44 deletions

View File

@ -112,6 +112,7 @@
"ratelimiter": "3.4.1", "ratelimiter": "3.4.1",
"re2": "1.19.0", "re2": "1.19.0",
"redis-lock": "0.1.4", "redis-lock": "0.1.4",
"redis-semaphore": "5.3.1",
"reflect-metadata": "0.1.13", "reflect-metadata": "0.1.13",
"rename": "1.0.4", "rename": "1.0.4",
"rndstr": "1.0.0", "rndstr": "1.0.0",

View File

@ -69,6 +69,7 @@ import { getActiveWebhooks } from "@/misc/webhook-cache.js";
import { shouldSilenceInstance } from "@/misc/should-block-instance.js"; import { shouldSilenceInstance } from "@/misc/should-block-instance.js";
import meilisearch from "../../db/meilisearch.js"; import meilisearch from "../../db/meilisearch.js";
import { redisClient } from "@/db/redis.js"; import { redisClient } from "@/db/redis.js";
import { Mutex } from "redis-semaphore";
const mutedWordsCache = new Cache< const mutedWordsCache = new Cache<
{ userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[] { userId: UserProfile["userId"]; mutedWords: UserProfile["mutedWords"] }[]
@ -461,58 +462,43 @@ export default async (
} }
if (!dontFederateInitially) { if (!dontFederateInitially) {
let publishKey: string;
let noteToPublish: Note;
const relays = await getCachedRelays(); const relays = await getCachedRelays();
// Some relays (e.g., aode-relay) deliver posts by boosting them as // Some relays (e.g., aode-relay) deliver posts by boosting them as
// Announce activities. In that case, user is the relay's actor. // Announce activities. In that case, user is the relay's actor.
const boostedByRelay = const boostedByRelay =
!!user.inbox && !!user.inbox &&
relays.map((relay) => relay.inbox).includes(user.inbox); relays.map((relay) => relay.inbox).includes(user.inbox);
if (!note.uri) { if (boostedByRelay && data.renote && data.renote.userHost) {
// Publish if the post is local publishKey = `publishedNote:${data.renote.id}`;
publishNotesStream(note); noteToPublish = data.renote;
} else if (boostedByRelay && data.renote?.uri) { } else {
// Use Redis transaction for atomicity publishKey = `publishedNote:${note.id}`;
await redisClient.watch(`publishedNote:${data.renote.uri}`); noteToPublish = note;
const exists = await redisClient.exists( }
`publishedNote:${data.renote.uri}`,
const lock = new Mutex(redisClient, "publishedNote");
await lock.acquire();
try {
const published = (await redisClient.get(publishKey)) !== null;
if (!published) {
await redisClient.set(publishKey, "done", "EX", 30);
if (noteToPublish.renoteId) {
// Prevents other threads from publishing the boosting post
await redisClient.set(
`publishedNote:${noteToPublish.renoteId}`,
"done",
"EX",
30,
); );
if (exists === 0) {
// Start the transaction
const transaction = redisClient.multi();
const key = `publishedNote:${data.renote.uri}`;
transaction.set(key, 1, "EX", 30);
// Execute the transaction
transaction.exec((err, replies) => {
// Publish after setting the key in Redis
if (!err && data.renote) {
publishNotesStream(data.renote);
} }
}); publishNotesStream(noteToPublish);
} else {
// Abort the transaction
redisClient.unwatch();
}
} else if (!boostedByRelay && note.uri) {
// Use Redis transaction for atomicity
await redisClient.watch(`publishedNote:${note.uri}`);
const exists = await redisClient.exists(`publishedNote:${note.uri}`);
if (exists === 0) {
// Start the transaction
const transaction = redisClient.multi();
const key = `publishedNote:${note.uri}`;
transaction.set(key, 1, "EX", 30);
// Execute the transaction
transaction.exec((err, replies) => {
// Publish after setting the key in Redis
if (!err) {
publishNotesStream(note);
}
});
} else {
// Abort the transaction
redisClient.unwatch();
} }
} finally {
await lock.release();
} }
} }
if (note.replyId != null) { if (note.replyId != null) {

View File

@ -339,6 +339,9 @@ importers:
redis-lock: redis-lock:
specifier: 0.1.4 specifier: 0.1.4
version: 0.1.4 version: 0.1.4
redis-semaphore:
specifier: 5.3.1
version: 5.3.1(ioredis@5.3.2)
reflect-metadata: reflect-metadata:
specifier: 0.1.13 specifier: 0.1.13
version: 0.1.13 version: 0.1.13
@ -2666,6 +2669,7 @@ packages:
engines: {node: '>=10'} engines: {node: '>=10'}
cpu: [arm64] cpu: [arm64]
os: [android] os: [android]
requiresBuild: true
dependencies: dependencies:
'@swc/wasm': 1.2.130 '@swc/wasm': 1.2.130
@ -2772,6 +2776,7 @@ packages:
/@swc/wasm@1.2.130: /@swc/wasm@1.2.130:
resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==} resolution: {integrity: sha512-rNcJsBxS70+pv8YUWwf5fRlWX6JoY/HJc25HD/F8m6Kv7XhJdqPPMhyX6TKkUBPAG7TWlZYoxa+rHAjPy4Cj3Q==}
requiresBuild: true
/@syuilo/aiscript@0.11.1: /@syuilo/aiscript@0.11.1:
resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==} resolution: {integrity: sha512-chwOIA3yLUKvOB0G611hjLArKTeOWNmTm3lHERSaDW1d+dS6do56naX6Lkwy2UpnwWC0qzeNSgg35elk6t2gZg==}
@ -12831,6 +12836,18 @@ packages:
dependencies: dependencies:
redis-errors: 1.2.0 redis-errors: 1.2.0
/redis-semaphore@5.3.1(ioredis@5.3.2):
resolution: {integrity: sha512-oUpxxfxSbh5eT0mvVpz2d4Qlg2CsaoQkeo80/v6CU2l97zO0u6NPgc9/zQZa9KGR3/93b0igtSct3hEFh8Ei8w==}
engines: {node: '>= 14.17.0'}
peerDependencies:
ioredis: ^4.1.0 || ^5
dependencies:
debug: 4.3.4(supports-color@8.1.1)
ioredis: 5.3.2
transitivePeerDependencies:
- supports-color
dev: false
/redis@4.6.7: /redis@4.6.7:
resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==} resolution: {integrity: sha512-KrkuNJNpCwRm5vFJh0tteMxW8SaUzkm5fBH7eL5hd/D0fAkzvapxbfGPP/r+4JAXdQuX7nebsBkBqA2RHB7Usw==}
dependencies: dependencies: