diff --git a/.config/example.yml b/.config/example.yml index 02661b7fbf..b7b56f2287 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -72,6 +72,16 @@ redis: # user: # pass: +# ┌─────────────────────┐ +#───┘ Sonic configuration └───────────────────────────────────── + +#sonic: +# host: localhost +# port: 1491 +# auth: SecretPassword +# collection: notes +# bucket: default + # ┌───────────────┐ #───┘ ID generation └─────────────────────────────────────────── diff --git a/.gitignore b/.gitignore index 52139614c2..5e1d4a26d0 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,7 @@ ormconfig.json packages/backend/assets/instance.css packages/backend/assets/sounds/None.mp3 +!packages/backend/src/db # blender backups *.blend1 diff --git a/packages/backend/package.json b/packages/backend/package.json index ce6efde9c2..e7bec1a220 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -112,6 +112,7 @@ "seedrandom": "^3.0.5", "semver": "7.3.8", "sharp": "0.31.3", + "sonic-channel": "^1.3.1", "speakeasy": "2.0.0", "stringz": "2.1.0", "summaly": "2.7.0", diff --git a/packages/backend/src/config/types.ts b/packages/backend/src/config/types.ts index ed9b0ece08..a7cdc89cf2 100644 --- a/packages/backend/src/config/types.ts +++ b/packages/backend/src/config/types.ts @@ -32,6 +32,13 @@ export type Source = { pass?: string; index?: string; }; + sonic: { + host: string; + port: number; + auth?: string; + collection?: string; + bucket?: string; + }; proxy?: string; proxySmtp?: string; diff --git a/packages/backend/src/db/sonic.ts b/packages/backend/src/db/sonic.ts new file mode 100644 index 0000000000..6c4d28f703 --- /dev/null +++ b/packages/backend/src/db/sonic.ts @@ -0,0 +1,51 @@ +import * as SonicChannel from "sonic-channel"; +import { dbLogger } from "./logger.js"; + +import config from "@/config/index.js"; + +const logger = dbLogger.createSubLogger("sonic", "gray", false); + +logger.info("Connecting to Sonic"); + +const handlers = (type: string): SonicChannel.Handlers => ( + { + connected: () => { + logger.succ(`Connected to Sonic ${type}`); + }, + disconnected: (error) => { + logger.warn(`Disconnected from Sonic ${type}, error: ${error}`); + }, + error: (error) => { + logger.warn(`Sonic ${type} error: ${error}`); + }, + retrying: () => { + logger.info(`Sonic ${type} retrying`); + }, + timeout: () => { + logger.warn(`Sonic ${type} timeout`); + }, + } +) + +const hasConfig = + config.sonic + && ( config.sonic.host + || config.sonic.port + || config.sonic.auth + ) + +const host = hasConfig ? config.sonic.host ?? "localhost" : ""; +const port = hasConfig ? config.sonic.port ?? 1491 : 0; +const auth = hasConfig ? config.sonic.auth ?? "SecretPassword" : ""; +const collection = hasConfig ? config.sonic.collection ?? "main" : ""; +const bucket = hasConfig ? config.sonic.bucket ?? "default" : ""; + +export default hasConfig + ? { + search: new SonicChannel.Search({host, port, auth}).connect(handlers("search")), + ingest: new SonicChannel.Ingest({host, port, auth}).connect(handlers("ingest")), + + collection, + bucket, + } + : null; diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index c40b3c6aeb..c387efe927 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -13,6 +13,7 @@ import processDb from "./processors/db/index.js"; import processObjectStorage from "./processors/object-storage/index.js"; import processSystemQueue from "./processors/system/index.js"; import processWebhookDeliver from "./processors/webhook-deliver.js"; +import processBackground from "./processors/background/index.js"; import { endedPollNotification } from "./processors/ended-poll-notification.js"; import { queueLogger } from "./logger.js"; import { getJobInfo } from "./get-job-info.js"; @@ -24,6 +25,7 @@ import { objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue, + backgroundQueue, } from "./queues.js"; import type { ThinUser } from "./types.js"; @@ -418,6 +420,17 @@ export function createCleanRemoteFilesJob() { ); } +export function createIndexAllNotesJob(data = {}) { + return backgroundQueue.add( + "indexAllNotes", + data, + { + removeOnComplete: true, + removeOnFail: true, + }, + ); +} + export function webhookDeliver( webhook: Webhook, type: typeof webhookEventTypes[number], @@ -454,6 +467,7 @@ export default function () { webhookDeliverQueue.process(64, processWebhookDeliver); processDb(dbQueue); processObjectStorage(objectStorageQueue); + processBackground(backgroundQueue); systemQueue.add( "tickCharts", diff --git a/packages/backend/src/queue/processors/background/index-all-notes.ts b/packages/backend/src/queue/processors/background/index-all-notes.ts new file mode 100644 index 0000000000..f032758864 --- /dev/null +++ b/packages/backend/src/queue/processors/background/index-all-notes.ts @@ -0,0 +1,76 @@ +import type Bull from "bull"; + +import { queueLogger } from "../../logger.js"; +import { Notes } from "@/models/index.js"; +import { MoreThan } from "typeorm"; +import { index } from "@/services/note/create.js" +import { Note } from "@/models/entities/note.js"; + +const logger = queueLogger.createSubLogger("index-all-notes"); + +export default async function indexAllNotes( + job: Bull.Job>, + done: ()=>void, +): Promise { + logger.info("Indexing all notes..."); + + let cursor: string|null = job.data.cursor as string ?? null; + let indexedCount: number = job.data.indexedCount as number ?? 0; + let total: number = job.data.total as number ?? 0; + + let running = true; + const take = 50000; + const batch = 100; + while (running) { + logger.info(`Querying for ${take} notes ${indexedCount}/${total ? total : '?'} at ${cursor}`); + + let notes: Note[] = []; + try { + notes = await Notes.find({ + where: { + ...(cursor ? { id: MoreThan(cursor) } : {}), + }, + take: take, + order: { + id: 1, + }, + }); + } catch (e) { + logger.error(`Failed to query notes ${e}`); + continue; + } + + if (notes.length === 0) { + job.progress(100); + running = false; + break; + } + + try { + const count = await Notes.count(); + total = count; + job.update({ indexedCount, cursor, total }) + } catch (e) { + } + + for (let i = 0; i < notes.length; i += batch) { + const chunk = notes.slice(i, i + batch); + await Promise.all(chunk.map(note => index(note))); + + indexedCount += chunk.length; + const pct = (indexedCount / total)*100; + job.update({ indexedCount, cursor, total }) + job.progress(+(pct.toFixed(1))); + logger.info(`Indexed notes ${indexedCount}/${total ? total : '?'}`); + } + cursor = notes[notes.length - 1].id; + job.update({ indexedCount, cursor, total }) + + if (notes.length < take) { + running = false; + } + } + + done(); + logger.info("All notes have been indexed."); +} diff --git a/packages/backend/src/queue/processors/background/index.ts b/packages/backend/src/queue/processors/background/index.ts new file mode 100644 index 0000000000..cf96b67ef6 --- /dev/null +++ b/packages/backend/src/queue/processors/background/index.ts @@ -0,0 +1,15 @@ +import type Bull from "bull"; +import indexAllNotes from "./index-all-notes.js"; + +const jobs = { + indexAllNotes, +} as Record< + string, + Bull.ProcessCallbackFunction> +>; + +export default function (q: Bull.Queue) { + for (const [k, v] of Object.entries(jobs)) { + q.process(k, 16, v); + } +} diff --git a/packages/backend/src/queue/queues.ts b/packages/backend/src/queue/queues.ts index 12d9d66207..6d7fffcb30 100644 --- a/packages/backend/src/queue/queues.ts +++ b/packages/backend/src/queue/queues.ts @@ -27,6 +27,7 @@ export const webhookDeliverQueue = initializeQueue( "webhookDeliver", 64, ); +export const backgroundQueue = initializeQueue>("bg"); export const queues = [ systemQueue, @@ -36,4 +37,5 @@ export const queues = [ dbQueue, objectStorageQueue, webhookDeliverQueue, + backgroundQueue, ]; diff --git a/packages/backend/src/server/api/endpoints.ts b/packages/backend/src/server/api/endpoints.ts index bfafbaa62e..ba0e721b9e 100644 --- a/packages/backend/src/server/api/endpoints.ts +++ b/packages/backend/src/server/api/endpoints.ts @@ -51,6 +51,7 @@ import * as ep___admin_relays_list from "./endpoints/admin/relays/list.js"; import * as ep___admin_relays_remove from "./endpoints/admin/relays/remove.js"; import * as ep___admin_resetPassword from "./endpoints/admin/reset-password.js"; import * as ep___admin_resolveAbuseUserReport from "./endpoints/admin/resolve-abuse-user-report.js"; +import * as ep___admin_search_indexAll from "./endpoints/admin/search/index-all.js"; import * as ep___admin_sendEmail from "./endpoints/admin/send-email.js"; import * as ep___admin_serverInfo from "./endpoints/admin/server-info.js"; import * as ep___admin_showModerationLogs from "./endpoints/admin/show-moderation-logs.js"; @@ -393,6 +394,7 @@ const eps = [ ["admin/relays/remove", ep___admin_relays_remove], ["admin/reset-password", ep___admin_resetPassword], ["admin/resolve-abuse-user-report", ep___admin_resolveAbuseUserReport], + ["admin/search/index-all", ep___admin_search_indexAll], ["admin/send-email", ep___admin_sendEmail], ["admin/server-info", ep___admin_serverInfo], ["admin/show-moderation-logs", ep___admin_showModerationLogs], diff --git a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts index ecd67d8936..4a437c3d12 100644 --- a/packages/backend/src/server/api/endpoints/admin/queue/stats.ts +++ b/packages/backend/src/server/api/endpoints/admin/queue/stats.ts @@ -3,6 +3,7 @@ import { inboxQueue, dbQueue, objectStorageQueue, + backgroundQueue, } from "@/queue/queues.js"; import define from "../../../define.js"; @@ -37,6 +38,11 @@ export const meta = { nullable: false, ref: "QueueCount", }, + backgroundQueue: { + optional: false, + nullable: false, + ref: "QueueCount", + }, }, }, } as const; @@ -52,11 +58,13 @@ export default define(meta, paramDef, async (ps) => { const inboxJobCounts = await inboxQueue.getJobCounts(); const dbJobCounts = await dbQueue.getJobCounts(); const objectStorageJobCounts = await objectStorageQueue.getJobCounts(); + const backgroundJobCounts = await backgroundQueue.getJobCounts(); return { deliver: deliverJobCounts, inbox: inboxJobCounts, db: dbJobCounts, objectStorage: objectStorageJobCounts, + backgroundQueue: backgroundJobCounts, }; }); diff --git a/packages/backend/src/server/api/endpoints/admin/search/index-all.ts b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts new file mode 100644 index 0000000000..135b48eccd --- /dev/null +++ b/packages/backend/src/server/api/endpoints/admin/search/index-all.ts @@ -0,0 +1,28 @@ +import define from "../../../define.js"; +import { createIndexAllNotesJob } from "@/queue/index.js"; + +export const meta = { + tags: ["admin"], + + requireCredential: true, + requireModerator: true, +} as const; + +export const paramDef = { + type: "object", + properties: { + cursor: { + type: "string", + format: "misskey:id", + nullable: true, + default: null, + }, + }, + required: [], +} as const; + +export default define(meta, paramDef, async (ps, _me) => { + createIndexAllNotesJob({ + cursor: ps.cursor ?? undefined, + }); +}); diff --git a/packages/backend/src/server/api/endpoints/notes/search.ts b/packages/backend/src/server/api/endpoints/notes/search.ts index ce60436db8..5e431d4f7d 100644 --- a/packages/backend/src/server/api/endpoints/notes/search.ts +++ b/packages/backend/src/server/api/endpoints/notes/search.ts @@ -1,7 +1,9 @@ import { In } from "typeorm"; import { Notes } from "@/models/index.js"; +import { Note } from "@/models/entities/note.js"; import config from "@/config/index.js"; import es from "../../../../db/elasticsearch.js"; +import sonic from "../../../../db/sonic.js"; import define from "../../define.js"; import { makePaginationQuery } from "../../common/make-pagination-query.js"; import { generateVisibilityQuery } from "../../common/generate-visibility-query.js"; @@ -59,7 +61,7 @@ export const paramDef = { } as const; export default define(meta, paramDef, async (ps, me) => { - if (es == null) { + if (es == null && sonic == null) { const query = makePaginationQuery( Notes.createQueryBuilder("note"), ps.sinceId, @@ -92,9 +94,82 @@ export default define(meta, paramDef, async (ps, me) => { if (me) generateMutedUserQuery(query, me); if (me) generateBlockedUserQuery(query, me); - const notes = await query.take(ps.limit).getMany(); + const notes: Note[] = await query.take(ps.limit).getMany(); return await Notes.packMany(notes, me); + } else if (sonic) { + let start = 0; + const chunkSize = 100; + + // Use sonic to fetch and step through all search results that could match the requirements + const ids = []; + while (true) { + const results = await sonic.search.query( + sonic.collection, + sonic.bucket, + ps.query, + { + limit: chunkSize, + offset: start, + }, + ); + + start += chunkSize; + + if (results.length === 0) { + break; + } + + const res = results + .map((k) => JSON.parse(k)) + .filter((key) => { + if (ps.userId && key.userId !== ps.userId) { + return false; + } + if (ps.channelId && key.channelId !== ps.channelId) { + return false; + } + if (ps.sinceId && key.id <= ps.sinceId) { + return false; + } + if (ps.untilId && key.id >= ps.untilId) { + return false; + } + return true; + }) + .map((key) => key.id); + + ids.push(...res); + } + + // Sort all the results by note id DESC (newest first) + ids.sort((a, b) => b - a); + + // Fetch the notes from the database until we have enough to satisfy the limit + start = 0; + const found = []; + while (found.length < ps.limit && start < ids.length) { + const chunk = ids.slice(start, start + chunkSize); + const notes: Note[] = await Notes.find({ + where: { + id: In(chunk), + }, + order: { + id: "DESC", + }, + }); + + // The notes are checked for visibility and muted/blocked users when packed + found.push(...await Notes.packMany(notes, me)); + start += chunkSize; + } + + // If we have more results than the limit, trim them + if (found.length > ps.limit) { + found.length = ps.limit; + } + + return found; } else { const userQuery = ps.userId != null diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 968aed880f..6c7fd9ad57 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -1,5 +1,6 @@ import * as mfm from "mfm-js"; import es from "../../db/elasticsearch.js"; +import sonic from "../../db/sonic.js"; import { publishMainStream, publishNotesStream, @@ -588,7 +589,7 @@ export default async ( } // Register to search database - index(note); + await index(note); }); async function renderNoteOrRenoteActivity(data: Option, note: Note) { @@ -728,18 +729,34 @@ async function insertNote( } } -function index(note: Note) { - if (note.text == null || config.elasticsearch == null) return; +export async function index(note: Note): Promise { + if (!note.text) return; - es!.index({ - index: config.elasticsearch.index || "misskey_note", - id: note.id.toString(), - body: { - text: normalizeForSearch(note.text), - userId: note.userId, - userHost: note.userHost, - }, - }); + if (config.elasticsearch && es) { + es.index({ + index: config.elasticsearch.index || "misskey_note", + id: note.id.toString(), + body: { + text: normalizeForSearch(note.text), + userId: note.userId, + userHost: note.userHost, + }, + }); + } + + if (sonic) { + await sonic.ingest.push( + sonic.collection, + sonic.bucket, + JSON.stringify({ + id: note.id, + userId: note.userId, + userHost: note.userHost, + channelId: note.channelId, + }), + note.text, + ); + } } async function notifyToWatchersOfRenotee( diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index de12004772..37bc6c8054 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -196,6 +196,7 @@ importers: seedrandom: ^3.0.5 semver: 7.3.8 sharp: 0.31.3 + sonic-channel: ^1.3.1 speakeasy: 2.0.0 strict-event-emitter-types: 2.0.0 stringz: 2.1.0 @@ -310,6 +311,7 @@ importers: seedrandom: 3.0.5 semver: 7.3.8 sharp: 0.31.3 + sonic-channel: 1.3.1 speakeasy: 2.0.0 stringz: 2.1.0 summaly: 2.7.0 @@ -11594,6 +11596,11 @@ packages: smart-buffer: 4.2.0 dev: false + /sonic-channel/1.3.1: + resolution: {integrity: sha512-+K4IZVFE7Tf2DB4EFZ23xo7a/+gJaiOHhFzXVZpzkX6Rs/rvf4YbSxnEGdYw8mrTcjtpG+jLVQEhP8sNTtN5VA==} + engines: {node: '>= 6.0.0'} + dev: false + /sort-keys-length/1.0.1: resolution: {integrity: sha512-GRbEOUqCxemTAk/b32F2xa8wDTs+Z1QHOkbhJDQTvv/6G3ZkbJ+frYWsTcc7cBB3Fu4wy4XlLCuNtJuMn7Gsvw==} engines: {node: '>=0.10.0'}