Merge pull request '[PR]: Fix indexing causing Postgres error loop' (#10462) from PrivateGER/calckey:fix/indexing-hang into develop

Reviewed-on: https://codeberg.org/calckey/calckey/pulls/10462
This commit is contained in:
Kainoa Kanter 2023-07-11 00:51:32 +00:00
commit 225fa2976b
1 changed files with 11 additions and 9 deletions

View File

@ -1,4 +1,5 @@
import type Bull from "bull"; import type Bull from "bull";
import type { DoneCallback } from "bull";
import { queueLogger } from "../../logger.js"; import { queueLogger } from "../../logger.js";
import { Notes } from "@/models/index.js"; import { Notes } from "@/models/index.js";
@ -11,7 +12,7 @@ const logger = queueLogger.createSubLogger("index-all-notes");
export default async function indexAllNotes( export default async function indexAllNotes(
job: Bull.Job<Record<string, unknown>>, job: Bull.Job<Record<string, unknown>>,
done: () => void, done: DoneCallback,
): Promise<void> { ): Promise<void> {
logger.info("Indexing all notes..."); logger.info("Indexing all notes...");
@ -20,7 +21,7 @@ export default async function indexAllNotes(
let total: number = (job.data.total as number) ?? 0; let total: number = (job.data.total as number) ?? 0;
let running = true; let running = true;
const take = 100000; const take = 10000;
const batch = 100; const batch = 100;
while (running) { while (running) {
logger.info( logger.info(
@ -41,13 +42,14 @@ export default async function indexAllNotes(
}, },
relations: ["user"], relations: ["user"],
}); });
} catch (e) { } catch (e: any) {
logger.error(`Failed to query notes ${e}`); logger.error(`Failed to query notes ${e}`);
continue; done(e);
break;
} }
if (notes.length === 0) { if (notes.length === 0) {
job.progress(100); await job.progress(100);
running = false; running = false;
break; break;
} }
@ -55,7 +57,7 @@ export default async function indexAllNotes(
try { try {
const count = await Notes.count(); const count = await Notes.count();
total = count; total = count;
job.update({ indexedCount, cursor, total }); await job.update({indexedCount, cursor, total});
} catch (e) {} } catch (e) {}
for (let i = 0; i < notes.length; i += batch) { for (let i = 0; i < notes.length; i += batch) {
@ -69,12 +71,12 @@ export default async function indexAllNotes(
indexedCount += chunk.length; indexedCount += chunk.length;
const pct = (indexedCount / total) * 100; const pct = (indexedCount / total) * 100;
job.update({ indexedCount, cursor, total }); await job.update({ indexedCount, cursor, total });
job.progress(+pct.toFixed(1)); await job.progress(+pct.toFixed(1));
logger.info(`Indexed notes ${indexedCount}/${total ? total : "?"}`); logger.info(`Indexed notes ${indexedCount}/${total ? total : "?"}`);
} }
cursor = notes[notes.length - 1].id; cursor = notes[notes.length - 1].id;
job.update({ indexedCount, cursor, total }); await job.update({ indexedCount, cursor, total });
if (notes.length < take) { if (notes.length < take) {
running = false; running = false;