calckey/packages/backend/src/queue/index.ts

517 lines
11 KiB
TypeScript
Raw Normal View History

2023-01-13 04:40:33 +00:00
import type httpSignature from "@peertube/http-signature";
import { v4 as uuid } from "uuid";
import config from "@/config/index.js";
import type { DriveFile } from "@/models/entities/drive-file.js";
import type { IActivity } from "@/remote/activitypub/type.js";
import type { Webhook, webhookEventTypes } from "@/models/entities/webhook.js";
import { envOption } from "../env.js";
import processDeliver from "./processors/deliver.js";
import processInbox from "./processors/inbox.js";
import processDb from "./processors/db/index.js";
import processObjectStorage from "./processors/object-storage/index.js";
import processSystemQueue from "./processors/system/index.js";
import processWebhookDeliver from "./processors/webhook-deliver.js";
import { endedPollNotification } from "./processors/ended-poll-notification.js";
import { queueLogger } from "./logger.js";
import { getJobInfo } from "./get-job-info.js";
import {
systemQueue,
dbQueue,
deliverQueue,
inboxQueue,
objectStorageQueue,
endedPollNotificationQueue,
webhookDeliverQueue,
} from "./queues.js";
import type { ThinUser } from "./types.js";
2019-04-13 17:19:59 +00:00
function renderError(e: Error): any {
return {
stack: e.stack,
message: e.message,
name: e.name,
2019-04-13 17:19:59 +00:00
};
}
2023-01-13 04:40:33 +00:00
const systemLogger = queueLogger.createSubLogger("system");
const deliverLogger = queueLogger.createSubLogger("deliver");
const webhookLogger = queueLogger.createSubLogger("webhook");
const inboxLogger = queueLogger.createSubLogger("inbox");
const dbLogger = queueLogger.createSubLogger("db");
const objectStorageLogger = queueLogger.createSubLogger("objectStorage");
2019-03-09 01:18:59 +00:00
systemQueue
2023-01-13 04:40:33 +00:00
.on("waiting", (jobId) => systemLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => systemLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
systemLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
systemLogger.warn(`failed(${err}) id=${job.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (job: any, err: Error) =>
systemLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => systemLogger.warn(`stalled id=${job.id}`));
2019-03-08 23:57:55 +00:00
deliverQueue
2023-01-13 04:40:33 +00:00
.on("waiting", (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`),
)
.on("completed", (job, result) =>
deliverLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`),
)
.on("error", (job: any, err: Error) =>
deliverLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
);
2019-03-09 01:18:59 +00:00
inboxQueue
2023-01-13 04:40:33 +00:00
.on("waiting", (jobId) => inboxLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`))
.on("completed", (job, result) =>
inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`),
)
.on("failed", (job, err) =>
inboxLogger.warn(
`failed(${err}) ${getJobInfo(job)} activity=${
job.data.activity ? job.data.activity.id : "none"
}`,
{ job, e: renderError(err) },
),
)
.on("error", (job: any, err: Error) =>
inboxLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
inboxLogger.warn(
`stalled ${getJobInfo(job)} activity=${
job.data.activity ? job.data.activity.id : "none"
}`,
),
);
2019-03-08 23:57:55 +00:00
2019-05-27 07:54:47 +00:00
dbQueue
2023-01-13 04:40:33 +00:00
.on("waiting", (jobId) => dbLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => dbLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
dbLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
dbLogger.warn(`failed(${err}) id=${job.id}`, { job, e: renderError(err) }),
)
.on("error", (job: any, err: Error) =>
dbLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => dbLogger.warn(`stalled id=${job.id}`));
2019-05-27 07:54:47 +00:00
objectStorageQueue
2023-01-13 04:40:33 +00:00
.on("waiting", (jobId) => objectStorageLogger.debug(`waiting id=${jobId}`))
.on("active", (job) => objectStorageLogger.debug(`active id=${job.id}`))
.on("completed", (job, result) =>
objectStorageLogger.debug(`completed(${result}) id=${job.id}`),
)
.on("failed", (job, err) =>
objectStorageLogger.warn(`failed(${err}) id=${job.id}`, {
job,
e: renderError(err),
}),
)
.on("error", (job: any, err: Error) =>
objectStorageLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) => objectStorageLogger.warn(`stalled id=${job.id}`));
2019-05-27 07:54:47 +00:00
webhookDeliverQueue
2023-01-13 04:40:33 +00:00
.on("waiting", (jobId) => webhookLogger.debug(`waiting id=${jobId}`))
.on("active", (job) =>
webhookLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`),
)
.on("completed", (job, result) =>
webhookLogger.debug(
`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`,
),
)
.on("failed", (job, err) =>
webhookLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`),
)
.on("error", (job: any, err: Error) =>
webhookLogger.error(`error ${err}`, { job, e: renderError(err) }),
)
.on("stalled", (job) =>
webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`),
);
2021-05-08 09:56:21 +00:00
export function deliver(user: ThinUser, content: unknown, to: string | null) {
2019-03-07 14:07:21 +00:00
if (content == null) return null;
2021-05-08 09:56:21 +00:00
if (to == null) return null;
2019-02-06 06:01:43 +00:00
const data = {
user: {
2021-12-09 14:58:30 +00:00
id: user.id,
},
2019-02-06 06:01:43 +00:00
content,
2021-12-09 14:58:30 +00:00
to,
2019-02-06 06:01:43 +00:00
};
2019-03-07 14:07:21 +00:00
return deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts || 12,
2023-01-13 04:40:33 +00:00
timeout: 1 * 60 * 1000, // 1min
2019-03-07 14:07:21 +00:00
backoff: {
2023-01-13 04:40:33 +00:00
type: "apBackoff",
2019-03-07 14:07:21 +00:00
},
removeOnComplete: true,
2021-12-09 14:58:30 +00:00
removeOnFail: true,
2019-03-07 14:07:21 +00:00
});
2018-04-04 14:12:35 +00:00
}
2023-01-13 04:40:33 +00:00
export function inbox(
activity: IActivity,
signature: httpSignature.IParsedSignature,
) {
2019-02-06 06:01:43 +00:00
const data = {
activity: activity,
2021-12-09 14:58:30 +00:00
signature,
2019-02-06 06:01:43 +00:00
};
2019-03-07 14:07:21 +00:00
return inboxQueue.add(data, {
attempts: config.inboxJobMaxAttempts || 8,
2023-01-13 04:40:33 +00:00
timeout: 5 * 60 * 1000, // 5min
2019-03-07 14:07:21 +00:00
backoff: {
2023-01-13 04:40:33 +00:00
type: "apBackoff",
2019-03-07 14:07:21 +00:00
},
removeOnComplete: true,
2021-12-09 14:58:30 +00:00
removeOnFail: true,
2019-03-07 14:07:21 +00:00
});
2018-04-05 14:24:51 +00:00
}
2021-05-08 09:56:21 +00:00
export function createDeleteDriveFilesJob(user: ThinUser) {
2023-01-13 04:40:33 +00:00
return dbQueue.add(
"deleteDriveFiles",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
export function createExportCustomEmojisJob(user: ThinUser) {
2023-01-13 04:40:33 +00:00
return dbQueue.add(
"exportCustomEmojis",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-20 16:30:21 +00:00
}
2021-05-08 09:56:21 +00:00
export function createExportNotesJob(user: ThinUser) {
2023-01-13 04:40:33 +00:00
return dbQueue.add(
"exportNotes",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2019-02-04 04:35:58 +00:00
2023-01-13 04:40:33 +00:00
export function createExportFollowingJob(
user: ThinUser,
excludeMuting = false,
excludeInactive = false,
) {
return dbQueue.add(
"exportFollowing",
{
user: user,
excludeMuting,
excludeInactive,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-06 12:21:49 +00:00
}
2021-05-08 09:56:21 +00:00
export function createExportMuteJob(user: ThinUser) {
2023-01-13 04:40:33 +00:00
return dbQueue.add(
"exportMute",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-06 12:21:49 +00:00
}
2021-05-08 09:56:21 +00:00
export function createExportBlockingJob(user: ThinUser) {
2023-01-13 04:40:33 +00:00
return dbQueue.add(
"exportBlocking",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-02-06 12:21:49 +00:00
}
2021-05-08 09:56:21 +00:00
export function createExportUserListsJob(user: ThinUser) {
2023-01-13 04:40:33 +00:00
return dbQueue.add(
"exportUserLists",
{
user: user,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 04:40:33 +00:00
export function createImportFollowingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importFollowing",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-03-11 15:34:19 +00:00
}
2023-01-13 04:40:33 +00:00
export function createImportMutingJob(user: ThinUser, fileId: DriveFile["id"]) {
return dbQueue.add(
"importMuting",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 04:40:33 +00:00
export function createImportBlockingJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importBlocking",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 04:40:33 +00:00
export function createImportUserListsJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importUserLists",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-03-11 10:43:58 +00:00
}
2023-01-13 04:40:33 +00:00
export function createImportCustomEmojisJob(
user: ThinUser,
fileId: DriveFile["id"],
) {
return dbQueue.add(
"importCustomEmojis",
{
user: user,
fileId: fileId,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2022-01-12 15:48:58 +00:00
}
2023-01-13 04:40:33 +00:00
export function createDeleteAccountJob(
user: ThinUser,
opts: { soft?: boolean } = {},
) {
return dbQueue.add(
"deleteAccount",
{
user: user,
soft: opts.soft,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2019-05-27 07:54:47 +00:00
export function createDeleteObjectStorageFileJob(key: string) {
2023-01-13 04:40:33 +00:00
return objectStorageQueue.add(
"deleteFile",
{
key: key,
},
{
removeOnComplete: true,
removeOnFail: true,
},
);
2019-05-27 07:54:47 +00:00
}
export function createCleanRemoteFilesJob() {
2023-01-13 04:40:33 +00:00
return objectStorageQueue.add(
"cleanRemoteFiles",
{},
{
removeOnComplete: true,
removeOnFail: true,
},
);
}
2023-01-13 04:40:33 +00:00
export function webhookDeliver(
webhook: Webhook,
type: typeof webhookEventTypes[number],
content: unknown,
) {
const data = {
2022-04-03 13:36:30 +00:00
type,
content,
webhookId: webhook.id,
userId: webhook.userId,
to: webhook.url,
secret: webhook.secret,
2022-04-03 13:36:30 +00:00
createdAt: Date.now(),
eventId: uuid(),
};
return webhookDeliverQueue.add(data, {
attempts: 4,
2023-01-13 04:40:33 +00:00
timeout: 1 * 60 * 1000, // 1min
backoff: {
2023-01-13 04:40:33 +00:00
type: "apBackoff",
},
removeOnComplete: true,
removeOnFail: true,
});
}
2023-01-13 04:40:33 +00:00
export default function () {
if (envOption.onlyServer) return;
deliverQueue.process(config.deliverJobConcurrency || 128, processDeliver);
inboxQueue.process(config.inboxJobConcurrency || 16, processInbox);
endedPollNotificationQueue.process(endedPollNotification);
webhookDeliverQueue.process(64, processWebhookDeliver);
processDb(dbQueue);
2022-02-08 13:18:39 +00:00
processObjectStorage(objectStorageQueue);
2023-01-13 04:40:33 +00:00
systemQueue.add(
"tickCharts",
{},
{
repeat: { cron: "55 * * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"resyncCharts",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"cleanCharts",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"clean",
{},
{
repeat: { cron: "0 0 * * *" },
removeOnComplete: true,
},
);
systemQueue.add(
"checkExpiredMutings",
{},
{
repeat: { cron: "*/5 * * * *" },
removeOnComplete: true,
},
);
2022-03-04 11:23:53 +00:00
processSystemQueue(systemQueue);
2019-02-04 04:35:58 +00:00
}
2019-02-06 06:24:59 +00:00
export function destroy() {
2023-01-13 04:40:33 +00:00
deliverQueue.once("cleaned", (jobs, status) => {
2019-03-09 01:18:59 +00:00
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
2019-03-07 14:36:08 +00:00
});
2023-01-13 04:40:33 +00:00
deliverQueue.clean(0, "delayed");
2019-03-07 14:36:08 +00:00
2023-01-13 04:40:33 +00:00
inboxQueue.once("cleaned", (jobs, status) => {
2019-03-09 01:18:59 +00:00
inboxLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
2019-03-07 14:36:08 +00:00
});
2023-01-13 04:40:33 +00:00
inboxQueue.clean(0, "delayed");
2019-02-06 06:24:59 +00:00
}