Add high performance batch imports

This commit is contained in:
PrivateGER 2023-05-25 23:49:52 +02:00 committed by PrivateGER
parent 8d08bf3ee4
commit 5ae89a69bc
3 changed files with 50 additions and 32 deletions

View File

@ -97,8 +97,14 @@ export default hasConfig ? {
filter: constructedFilters filter: constructedFilters
}); });
}, },
ingestNote: (note : Note) => { ingestNote: (note: Note | Note[]) => {
logger.info("Indexing note in MeiliSearch: " + note.id); if (note instanceof Note) {
note = [note];
}
let indexingBatch: MeilisearchNote[] = [];
note.forEach(note => {
let attachmentType = ""; let attachmentType = "";
if (note.attachedFileTypes.length > 0) { if (note.attachedFileTypes.length > 0) {
@ -115,18 +121,24 @@ export default hasConfig ? {
} }
} }
return posts.addDocuments([ indexingBatch.push({
{
id: note.id.toString(), id: note.id.toString(),
text: note.text, text: note.text ? note.text : "",
userId: note.userId, userId: note.userId,
userHost: note.userHost, userHost: note.userHost ? note.userHost : "",
channelId: note.channelId, channelId: note.channelId ? note.channelId : "",
mediaAttachment: attachmentType, mediaAttachment: attachmentType,
userName: note.user?.username, userName: note.user?.username ? note.user.username : "",
createdAt: note.createdAt.getTime() / 1000 // division by 1000 is necessary because Node returns in ms-accuracy createdAt: note.createdAt.getTime() / 1000 // division by 1000 is necessary because Node returns in ms-accuracy
} }
]); )
});
let indexingIDs = indexingBatch.map(note => note.id);
logger.info("Indexing notes in MeiliSearch: " + indexingIDs.join(","));
return posts.addDocuments(indexingBatch);
}, },
serverStats: async () => { serverStats: async () => {
let health : Health = await client.health(); let health : Health = await client.health();

View File

@ -4,7 +4,8 @@ import { queueLogger } from "../../logger.js";
import { Notes } from "@/models/index.js"; import { Notes } from "@/models/index.js";
import { MoreThan } from "typeorm"; import { MoreThan } from "typeorm";
import { index } from "@/services/note/create.js"; import { index } from "@/services/note/create.js";
import { Note } from "@/models/entities/note.js"; import {Note} from "@/models/entities/note.js";
import meilisearch from "../../../db/meilisearch.js";
const logger = queueLogger.createSubLogger("index-all-notes"); const logger = queueLogger.createSubLogger("index-all-notes");
@ -58,11 +59,16 @@ export default async function indexAllNotes(
for (let i = 0; i < notes.length; i += batch) { for (let i = 0; i < notes.length; i += batch) {
const chunk = notes.slice(i, i + batch); const chunk = notes.slice(i, i + batch);
await Promise.all(chunk.map((note) => index(note)));
if (meilisearch) {
await meilisearch.ingestNote(chunk)
}
await Promise.all(chunk.map((note) => index(note, true)));
indexedCount += chunk.length; indexedCount += chunk.length;
const pct = (indexedCount / total) * 100; const pct = (indexedCount / total) * 100;
job.update({ indexedCount, cursor, total }); job.update({indexedCount, cursor, total});
job.progress(+pct.toFixed(1)); job.progress(+pct.toFixed(1));
logger.info(`Indexed notes ${indexedCount}/${total ? total : "?"}`); logger.info(`Indexed notes ${indexedCount}/${total ? total : "?"}`);
} }

View File

@ -749,7 +749,7 @@ async function insertNote(
} }
} }
export async function index(note: Note): Promise<void> { export async function index(note: Note, reindexing: boolean): Promise<void> {
if (!note.text) return; if (!note.text) return;
if (config.elasticsearch && es) { if (config.elasticsearch && es) {
@ -778,7 +778,7 @@ export async function index(note: Note): Promise<void> {
); );
} }
if (meilisearch) { if (meilisearch && !reindexing) {
await meilisearch.ingestNote(note); await meilisearch.ingestNote(note);
} }
} }