From 706b4ae6020d8096a438f2b9492d8bf7c0215999 Mon Sep 17 00:00:00 2001 From: Kaity A Date: Sun, 19 Mar 2023 08:26:47 +0000 Subject: [PATCH] Add sonic full-text search support (#9714) This pull request adds support for the [sonic](https://github.com/valeriansaliou/sonic) full text indexing server into Calckey. In addition to this, a stateful endpoint has been added that will completely (re-)index all notes into any (elasticsearch and/or sonic) indexing server defined in your config at `/api/admin/search/index-all`. It can (optionally) take input data to define the starting point, such as: ``` {"cursor": "9beg3lx6ad"} ``` Currently if both sonic and elasticsearch are defined in the config, sonic will take precedence for searching, but both indexes will continue to be updated for new note creations. Future enhancements may include the ability to choose which indexer to use (or combine multiple). Co-authored-by: Kaitlyn Allan Reviewed-on: https://codeberg.org/calckey/calckey/pulls/9714 Co-authored-by: Kaity A Co-committed-by: Kaity A --- .config/example.yml | 10 +++ .gitignore | 1 + packages/backend/package.json | 1 + packages/backend/src/config/types.ts | 7 ++ packages/backend/src/db/sonic.ts | 51 ++++++++++++ packages/backend/src/queue/index.ts | 14 ++++ .../processors/background/index-all-notes.ts | 76 ++++++++++++++++++ .../src/queue/processors/background/index.ts | 15 ++++ packages/backend/src/queue/queues.ts | 2 + packages/backend/src/server/api/endpoints.ts | 2 + .../server/api/endpoints/admin/queue/stats.ts | 8 ++ .../api/endpoints/admin/search/index-all.ts | 28 +++++++ .../src/server/api/endpoints/notes/search.ts | 79 ++++++++++++++++++- packages/backend/src/services/note/create.ts | 41 +++++++--- pnpm-lock.yaml | 7 ++ 15 files changed, 328 insertions(+), 14 deletions(-) create mode 100644 packages/backend/src/db/sonic.ts create mode 100644 packages/backend/src/queue/processors/background/index-all-notes.ts create mode 100644 packages/backend/src/queue/processors/background/index.ts create mode 100644 packages/backend/src/server/api/endpoints/admin/search/index-all.ts diff --git a/.config/example.yml b/.config/example.yml index 02661b7fbf..b7b56f2287 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -72,6 +72,16 @@ redis: # user: # pass: +# ┌─────────────────────┐ +#───┘ Sonic configuration └───────────────────────────────────── + +#sonic: +# host: localhost +# port: 1491 +# auth: SecretPassword +# collection: notes +# bucket: default + # ┌───────────────┐ #───┘ ID generation └─────────────────────────────────────────── diff --git a/.gitignore b/.gitignore index 52139614c2..5e1d4a26d0 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,7 @@ ormconfig.json packages/backend/assets/instance.css packages/backend/assets/sounds/None.mp3 +!packages/backend/src/db # blender backups *.blend1 diff --git a/packages/backend/package.json b/packages/backend/package.json index ce6efde9c2..e7bec1a220 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -112,6 +112,7 @@ "seedrandom": "^3.0.5", "semver": "7.3.8", "sharp": "0.31.3", + "sonic-channel": "^1.3.1", "speakeasy": "2.0.0", "stringz": "2.1.0", "summaly": "2.7.0", diff --git a/packages/backend/src/config/types.ts b/packages/backend/src/config/types.ts index ed9b0ece08..a7cdc89cf2 100644 --- a/packages/backend/src/config/types.ts +++ b/packages/backend/src/config/types.ts @@ -32,6 +32,13 @@ export type Source = { pass?: string; index?: string; }; + sonic: { + host: string; + port: number; + auth?: string; + collection?: string; + bucket?: string; + }; proxy?: string; proxySmtp?: string; diff --git a/packages/backend/src/db/sonic.ts b/packages/backend/src/db/sonic.ts new file mode 100644 index 0000000000..6c4d28f703 --- /dev/null +++ b/packages/backend/src/db/sonic.ts @@ -0,0 +1,51 @@ +import * as SonicChannel from "sonic-channel"; +import { dbLogger } from "./logger.js"; + +import config from "@/config/index.js"; + +const logger = dbLogger.createSubLogger("sonic", "gray", false); + +logger.info("Connecting to Sonic"); + +const handlers = (type: string): SonicChannel.Handlers => ( + { + connected: () => { + logger.succ(`Connected to Sonic ${type}`); + }, + disconnected: (error) => { + logger.warn(`Disconnected from Sonic ${type}, error: ${error}`); + }, + error: (error) => { + logger.warn(`Sonic ${type} error: ${error}`); + }, + retrying: () => { + logger.info(`Sonic ${type} retrying`); + }, + timeout: () => { + logger.warn(`Sonic ${type} timeout`); + }, + } +) + +const hasConfig = + config.sonic + && ( config.sonic.host + || config.sonic.port + || config.sonic.auth + ) + +const host = hasConfig ? config.sonic.host ?? "localhost" : ""; +const port = hasConfig ? config.sonic.port ?? 1491 : 0; +const auth = hasConfig ? config.sonic.auth ?? "SecretPassword" : ""; +const collection = hasConfig ? config.sonic.collection ?? "main" : ""; +const bucket = hasConfig ? config.sonic.bucket ?? "default" : ""; + +export default hasConfig + ? { + search: new SonicChannel.Search({host, port, auth}).connect(handlers("search")), + ingest: new SonicChannel.Ingest({host, port, auth}).connect(handlers("ingest")), + + collection, + bucket, + } + : null; diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index c40b3c6aeb..c387efe927 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -13,6 +13,7 @@ import processDb from "./processors/db/index.js"; import processObjectStorage from "./processors/object-storage/index.js"; import processSystemQueue from "./processors/system/index.js"; import processWebhookDeliver from "./processors/webhook-deliver.js"; +import processBackground from "./processors/background/index.js"; import { endedPollNotification } from "./processors/ended-poll-notification.js"; import { queueLogger } from "./logger.js"; import { getJobInfo } from "./get-job-info.js"; @@ -24,6 +25,7 @@ import { objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue, + backgroundQueue, } from "./queues.js"; import type { ThinUser } from "./types.js"; @@ -418,6 +420,17 @@ export function createCleanRemoteFilesJob() { ); } +export function createIndexAllNotesJob(data = {}) { + return backgroundQueue.add( + "indexAllNotes", + data, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + export function webhookDeliver( webhook: Webhook, type: typeof webhookEventTypes[number], @@ -454,6 +467,7 @@ export default function () { webhookDeliverQueue.process(64, processWebhookDeliver); processDb(dbQueue); processObjectStorage(objectStorageQueue); + processBackground(backgroundQueue); systemQueue.add( "tickCharts", diff --git a/packages/backend/src/queue/processors/background/index-all-notes.ts b/packages/backend/src/queue/processors/background/index-all-notes.ts new file mode 100644 index 0000000000..f032758864 --- /dev/null +++ b/packages/backend/src/queue/processors/background/index-all-notes.ts @@ -0,0 +1,76 @@ +import type Bull from "bull"; + +import { queueLogger } from "../../logger.js"; +import { Notes } from "@/models/index.js"; +import { MoreThan } from "typeorm"; +import { index } from "@/services/note/create.js" +import { Note } from "@/models/entities/note.js"; + +const logger = queueLogger.createSubLogger("index-all-notes"); + +export default async function indexAllNotes( + job: Bull.Job>, + done: ()=>void, +): Promise { + logger.info("Indexing all notes..."); + + let cursor: string|null = job.data.cursor as string ?? null; + let indexedCount: number = job.data.indexedCount as number ?? 0; + let total: number = job.data.total as number ?? 0; + + let running = true; + const take = 50000; + const batch = 100; + while (running) { + logger.info(`Querying for ${take} notes ${indexedCount}/${total ? total : '?'} at ${cursor}`); + + let notes: Note[] = []; + try { + notes = await Notes.find({ + where: { + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: take, + order: { + id: 1, + }, + }); + } catch (e) { + logger.error(`Failed to query notes ${e}`); + continue; + } + + if (notes.length === 0) { + job.progress(100); + running = false; + break; + } + + try { + const count = await Notes.count(); + total = count; + job.update({ indexedCount, cursor, total }) + } catch (e) { + } + + for (let i = 0; i < notes.length; i += batch) { + const chunk = notes.slice(i, i + batch); + await Promise.all(chunk.map(note => index(note))); + + indexedCount += chunk.length; + const pct = (indexedCount / total)*100; + job.update({ indexedCount, cursor, total }) + job.progress(+(pct.toFixed(1))); + logger.info(`Indexed notes ${indexedCount}/${total ? total : '?'}`); + } + cursor = notes[notes.length - 1].id; + job.update({ indexedCount, cursor, total }) + + if (notes.length < take) { + running = false; + } + } + + done(); + logger.info("All notes have been indexed."); +} diff --git a/packages/backend/src/queue/processors/background/index.ts b/packages/backend/src/queue/processors/background/index.ts new file mode 100644 index 0000000000..cf96b67ef6 --- /dev/null +++ b/packages/backend/src/queue/processors/background/index.ts @@ -0,0 +1,15 @@ +import type Bull from "bull"; +import indexAllNotes from "./index-all-notes.js"; + +const jobs = { + indexAllNotes, +} as Record< + string, + Bull.ProcessCallbackFunction> +>; + +export default function (q: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + q.process(k, 16, v); + } +} diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index 12d9d66207..6d7fffcb30 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -27,6 +27,7 @@ export const webhookDeliverQueue = initializeQueue( "webhookDeliver", 64, ); +export const backgroundQueue = initializeQueue>("bg"); export const queues = [ systemQueue, @@ -36,4 +37,5 @@ export const queues = [ dbQueue, objectStorageQueue, webhookDeliverQueue, + backgroundQueue, ]; diff --git a/packages/backend/src/server/api/endpoints.ts b/packages/backend/src/server/api/endpoints.ts index bfafbaa62e..ba0e721b9e 100644 --- a/packages/backend/src/server/api/endpoints.ts +++ b/packages/backend/src/server/api/endpoints.ts @@ -51,6 +51,7 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js"; import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js"; import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js"; import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js"; +import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js"; import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js"; import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js"; import * as ep___admin_showModerationLogs from "./endpoints/admin/show-moderation-logs.js"; @@ -393,6 +394,7 @@ const eps = [ ["admin/relays/remove", ep___admin_relays_remove], ["admin/reset-password", ep___admin_resetPassword], ["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport], + ["admin/search/index-all", ep___admin_search_indexAll], ["admin/send-email", ep___admin_sendEmail], ["admin/server-info", ep___admin_serverInfo], ["admin/show-moderation-logs", ep___admin_showModerationLogs], diff --git a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts index ecd67d8936..4a437c3d12 100644 --- a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts +++ b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts @@ -3,6 +3,7 @@ import { inboxQueue, dbQueue, objectStorageQueue, + backgroundQueue, } from "@/queue/queues.js"; import define from "../../../define.js"; @@ -37,6 +38,11 @@ export const meta = { nullable: false, ref: "QueueCount", }, + backgroundQueue: { + optional: false, + nullable: false, + ref: "QueueCount", + }, }, }, } as const; @@ -52,11 +58,13 @@ export default define(meta, paramDef, async (ps) => { const inboxJobCounts = await inboxQueue.getJobCounts(); const dbJobCounts = await dbQueue.getJobCounts(); const objectStorageJobCounts = await objectStorageQueue.getJobCounts(); + const backgroundJobCounts = await backgroundQueue.getJobCounts(); return { deliver: deliverJobCounts, inbox: inboxJobCounts, db: dbJobCounts, objectStorage: objectStorageJobCounts, + backgroundQueue: backgroundJobCounts, }; }); diff --git a/packages/backend/src/server/api/endpoints/admin/search/index-all.ts b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts new file mode 100644 index 0000000000..135b48eccd --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts @@ -0,0 +1,28 @@ +import define from "../../../define.js"; +import { createIndexAllNotesJob } from "@/queue/index.js"; + +export const meta = { + tags: ["admin"], + + requireCredential: true, + requireModerator: true, +} as const; + +export const paramDef = { + type: "object", + properties: { + cursor: { + type: "string", + format: "misskey:id", + nullable: true, + default: null, + }, + }, + required: [], +} as const; + +export default define(meta, paramDef, async (ps, _me) => { + createIndexAllNotesJob({ + cursor: ps.cursor ?? undefined, + }); +}); diff --git a/packages/backend/src/server/api/endpoints/notes/search.ts b/packages/backend/src/server/api/endpoints/notes/search.ts index ce60436db8..5e431d4f7d 100644 --- a/packages/backend/src/server/api/endpoints/notes/search.ts +++ b/packages/backend/src/server/api/endpoints/notes/search.ts @@ -1,7 +1,9 @@ import { In } from "typeorm"; import { Notes } from "@/models/index.js"; +import { Note } from "@/models/entities/note.js"; import config from "@/config/index.js"; import es from "../../../../db/elasticsearch.js"; +import sonic from "../../../../db/sonic.js"; import define from "../../define.js"; import { makePaginationQuery } from "../../common/make-pagination-query.js"; import { generateVisibilityQuery } from "../../common/generate-visibility-query.js"; @@ -59,7 +61,7 @@ export const paramDef = { } as const; export default define(meta, paramDef, async (ps, me) => { - if (es == null) { + if (es == null && sonic == null) { const query = makePaginationQuery( Notes.createQueryBuilder("note"), ps.sinceId, @@ -92,9 +94,82 @@ export default define(meta, paramDef, async (ps, me) => { if (me) generateMutedUserQuery(query, me); if (me) generateBlockedUserQuery(query, me); - const notes = await query.take(ps.limit).getMany(); + const notes: Note[] = await query.take(ps.limit).getMany(); return await Notes.packMany(notes, me); + } else if (sonic) { + let start = 0; + const chunkSize = 100; + + // Use sonic to fetch and step through all search results that could match the requirements + const ids = []; + while (true) { + const results = await sonic.search.query( + sonic.collection, + sonic.bucket, + ps.query, + { + limit: chunkSize, + offset: start, + }, + ); + + start += chunkSize; + + if (results.length === 0) { + break; + } + + const res = results + .map((k) => JSON.parse(k)) + .filter((key) => { + if (ps.userId && key.userId !== ps.userId) { + return false; + } + if (ps.channelId && key.channelId !== ps.channelId) { + return false; + } + if (ps.sinceId && key.id <= ps.sinceId) { + return false; + } + if (ps.untilId && key.id >= ps.untilId) { + return false; + } + return true; + }) + .map((key) => key.id); + + ids.push(...res); + } + + // Sort all the results by note id DESC (newest first) + ids.sort((a, b) => b - a); + + // Fetch the notes from the database until we have enough to satisfy the limit + start = 0; + const found = []; + while (found.length < ps.limit && start < ids.length) { + const chunk = ids.slice(start, start + chunkSize); + const notes: Note[] = await Notes.find({ + where: { + id: In(chunk), + }, + order: { + id: "DESC", + }, + }); + + // The notes are checked for visibility and muted/blocked users when packed + found.push(...await Notes.packMany(notes, me)); + start += chunkSize; + } + + // If we have more results than the limit, trim them + if (found.length > ps.limit) { + found.length = ps.limit; + } + + return found; } else { const userQuery = ps.userId != null diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 968aed880f..6c7fd9ad57 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -1,5 +1,6 @@ import * as mfm from "mfm-js"; import es from "../../db/elasticsearch.js"; +import sonic from "../../db/sonic.js"; import { publishMainStream, publishNotesStream, @@ -588,7 +589,7 @@ export default async ( } // Register to search database - index(note); + await index(note); }); async function renderNoteOrRenoteActivity(data: Option, note: Note) { @@ -728,18 +729,34 @@ async function insertNote( } } -function index(note: Note) { - if (note.text == null || config.elasticsearch == null) return; +export async function index(note: Note): Promise { + if (!note.text) return; - es!.index({ - index: config.elasticsearch.index || "misskey_note", - id: note.id.toString(), - body: { - text: normalizeForSearch(note.text), - userId: note.userId, - userHost: note.userHost, - }, - }); + if (config.elasticsearch && es) { + es.index({ + index: config.elasticsearch.index || "misskey_note", + id: note.id.toString(), + body: { + text: normalizeForSearch(note.text), + userId: note.userId, + userHost: note.userHost, + }, + }); + } + + if (sonic) { + await sonic.ingest.push( + sonic.collection, + sonic.bucket, + JSON.stringify({ + id: note.id, + userId: note.userId, + userHost: note.userHost, + channelId: note.channelId, + }), + note.text, + ); + } } async function notifyToWatchersOfRenotee( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index de12004772..37bc6c8054 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -196,6 +196,7 @@ importers: seedrandom: ^3.0.5 semver: 7.3.8 sharp: 0.31.3 + sonic-channel: ^1.3.1 speakeasy: 2.0.0 strict-event-emitter-types: 2.0.0 stringz: 2.1.0 @@ -310,6 +311,7 @@ importers: seedrandom: 3.0.5 semver: 7.3.8 sharp: 0.31.3 + sonic-channel: 1.3.1 speakeasy: 2.0.0 stringz: 2.1.0 summaly: 2.7.0 @@ -11594,6 +11596,11 @@ packages: smart-buffer: 4.2.0 dev: false + /sonic-channel/1.3.1: + resolution: {integrity: sha512-+K4IZVFE7Tf2DB4EFZ23xo7a/+gJaiOHhFzXVZpzkX6Rs/rvf4YbSxnEGdYw8mrTcjtpG+jLVQEhP8sNTtN5VA==} + engines: {node: '>= 6.0.0'} + dev: false + /sort-keys-length/1.0.1: resolution: {integrity: sha512-GRbEOUqCxemTAk/b32F2xa8wDTs+Z1QHOkbhJDQTvv/6G3ZkbJ+frYWsTcc7cBB3Fu4wy4XlLCuNtJuMn7Gsvw==} engines: {node: '>=0.10.0'}