fix: multiple boost publication by relay

This commit is contained in:
Namekuji 2023-06-30 17:44:36 -04:00
parent ee5a08efdc
commit 44bf99e0c1
No known key found for this signature in database
GPG Key ID: 1D62332C07FBA532
1 changed files with 51 additions and 47 deletions

View File

@ -461,57 +461,61 @@ export default async (
} }
if (!dontFederateInitially) { if (!dontFederateInitially) {
const relays = await getCachedRelays(); if (!note.userHost) {
// Some relays (e.g., aode-relay) deliver posts by boosting them as
// Announce activities. In that case, user is the relay's actor.
const boostedByRelay =
!!user.inbox &&
relays.map((relay) => relay.inbox).includes(user.inbox);
if (!note.uri) {
// Publish if the post is local // Publish if the post is local
publishNotesStream(note); publishNotesStream(note);
} else if (boostedByRelay && data.renote?.uri) { } else {
// Use Redis transaction for atomicity const relays = await getCachedRelays();
await redisClient.watch(`publishedNote:${data.renote.uri}`); // Some relays (e.g., aode-relay) deliver posts by boosting them as
const exists = await redisClient.exists( // Announce activities. In that case, user is the relay's actor.
`publishedNote:${data.renote.uri}`, const boostedByRelay = relays
); .map((relay) => new URL(relay.inbox).host)
if (exists === 0) { .includes(note.userHost);
// Start the transaction
const transaction = redisClient.multi(); if (boostedByRelay && data.renote) {
const key = `publishedNote:${data.renote.uri}`; // Use Redis transaction for atomicity
transaction.set(key, 1, "EX", 30); const key = `publishedNote:${data.renote.id}`;
// Execute the transaction await redisClient.watch(key);
transaction.exec((err, replies) => { const exists = await redisClient.exists(key);
// Publish after setting the key in Redis if (exists === 0) {
if (!err && data.renote) { // Start the transaction
publishNotesStream(data.renote); const transaction = redisClient.multi();
} transaction.set(key, 1, "EX", 30);
}); // Execute the transaction
await transaction.exec((err, _replies) => {
// Publish after setting the key in Redis
if (!err && boostedByRelay && data.renote) {
publishNotesStream(data.renote);
}
});
} else {
// Abort the transaction
redisClient.unwatch();
}
} else { } else {
// Abort the transaction // Use Redis transaction for atomicity
redisClient.unwatch(); const key = `publishedNote:${note.id}`;
} await redisClient.watch(key);
} else if (!boostedByRelay && note.uri) { const exists = await redisClient.exists(key);
// Use Redis transaction for atomicity if (exists === 0) {
await redisClient.watch(`publishedNote:${note.uri}`); // Start the transaction
const exists = await redisClient.exists(`publishedNote:${note.uri}`); const transaction = redisClient.multi();
if (exists === 0) { transaction.set(key, 1, "EX", 30);
// Start the transaction if (note.renoteId) {
const transaction = redisClient.multi(); // Prevent other threads from publishing this boosting post
const key = `publishedNote:${note.uri}`; transaction.set(`publishedNote:${note.renoteId}`, 1, "EX", 30);
transaction.set(key, 1, "EX", 30);
// Execute the transaction
transaction.exec((err, replies) => {
// Publish after setting the key in Redis
if (!err) {
publishNotesStream(note);
} }
}); // Execute the transaction
} else { await transaction.exec((err, _replies) => {
// Abort the transaction // Publish after setting the key in Redis
redisClient.unwatch(); if (!err) {
publishNotesStream(note);
}
});
} else {
// Abort the transaction
redisClient.unwatch();
}
} }
} }
} }