server: avoid adding suspended instances to deliver queue
This should reduce the performance hit when adding large numbers of instances to the deliver queue by making the check for suspended and dead instances a bulk operation. Changelog: Changed Reviewed-on: https://akkoma.dev/FoundKeyGang/FoundKey/pulls/215
This commit is contained in:
parent
dc7b8aa6e4
commit
5a691f7e98
|
@ -1,50 +1,25 @@
|
||||||
import { URL } from 'node:url';
|
import { URL } from 'node:url';
|
||||||
import Bull from 'bull';
|
|
||||||
import request from '@/remote/activitypub/request.js';
|
import request from '@/remote/activitypub/request.js';
|
||||||
import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js';
|
import { registerOrFetchInstanceDoc } from '@/services/register-or-fetch-instance-doc.js';
|
||||||
import Logger from '@/services/logger.js';
|
import Logger from '@/services/logger.js';
|
||||||
import { Instances } from '@/models/index.js';
|
import { Instances } from '@/models/index.js';
|
||||||
import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
|
import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
|
||||||
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
|
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
|
||||||
import { fetchMeta } from '@/misc/fetch-meta.js';
|
|
||||||
import { toPuny } from '@/misc/convert-host.js';
|
import { toPuny } from '@/misc/convert-host.js';
|
||||||
import { Cache } from '@/misc/cache.js';
|
|
||||||
import { Instance } from '@/models/entities/instance.js';
|
|
||||||
import { DeliverJobData } from '../types.js';
|
|
||||||
import { StatusError } from '@/misc/fetch.js';
|
import { StatusError } from '@/misc/fetch.js';
|
||||||
|
import { shouldSkipInstance } from '@/misc/skipped-instances.js';
|
||||||
|
import type { DeliverJobData } from '@/queue/types.js';
|
||||||
|
import type Bull from 'bull';
|
||||||
|
|
||||||
const logger = new Logger('deliver');
|
const logger = new Logger('deliver');
|
||||||
|
|
||||||
let latest: string | null = null;
|
let latest: string | null = null;
|
||||||
|
|
||||||
const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
|
|
||||||
|
|
||||||
export default async (job: Bull.Job<DeliverJobData>) => {
|
export default async (job: Bull.Job<DeliverJobData>) => {
|
||||||
const { host } = new URL(job.data.to);
|
const { host } = new URL(job.data.to);
|
||||||
|
const puny = toPuny(host);
|
||||||
|
|
||||||
// ブロックしてたら中断
|
if (await shouldSkipInstance(puny)) return 'skip';
|
||||||
const meta = await fetchMeta();
|
|
||||||
if (meta.blockedHosts.includes(toPuny(host))) {
|
|
||||||
return 'skip (blocked)';
|
|
||||||
}
|
|
||||||
|
|
||||||
if (meta.privateMode && !meta.allowedHosts.includes(toPuny(host))) {
|
|
||||||
return 'skip (not allowed)';
|
|
||||||
}
|
|
||||||
|
|
||||||
// isSuspendedなら中断
|
|
||||||
let suspendedHosts = suspendedHostsCache.get(null);
|
|
||||||
if (suspendedHosts == null) {
|
|
||||||
suspendedHosts = await Instances.find({
|
|
||||||
where: {
|
|
||||||
isSuspended: true,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
suspendedHostsCache.set(null, suspendedHosts);
|
|
||||||
}
|
|
||||||
if (suspendedHosts.map(x => x.host).includes(toPuny(host))) {
|
|
||||||
return 'skip (suspended)';
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import { Users, Followings } from '@/models/index.js';
|
|
||||||
import { ILocalUser, IRemoteUser, User } from '@/models/entities/user.js';
|
|
||||||
import { deliver } from '@/queue/index.js';
|
|
||||||
import { IsNull, Not } from 'typeorm';
|
import { IsNull, Not } from 'typeorm';
|
||||||
|
import { Users, Followings } from '@/models/index.js';
|
||||||
|
import type { ILocalUser, IRemoteUser, User } from '@/models/entities/user.js';
|
||||||
|
import { deliver } from '@/queue/index.js';
|
||||||
|
import { skippedInstances } from '@/misc/skipped-instances.js';
|
||||||
|
|
||||||
//#region types
|
//#region types
|
||||||
interface IRecipe {
|
interface IRecipe {
|
||||||
|
@ -117,10 +118,21 @@ export default class DeliverManager {
|
||||||
// check that they actually have an inbox
|
// check that they actually have an inbox
|
||||||
&& recipe.to.inbox != null,
|
&& recipe.to.inbox != null,
|
||||||
)
|
)
|
||||||
.forEach(recipe => inboxes.add(recipe.to.inbox!));
|
.forEach(recipe => inboxes.add(recipe.to.inbox!));
|
||||||
|
|
||||||
|
const instancesToSkip = await skippedInstances(
|
||||||
|
// get (unique) list of hosts
|
||||||
|
Array.from(new Set(
|
||||||
|
Array.from(inboxes)
|
||||||
|
.map(inbox => new URL(inbox).host),
|
||||||
|
)),
|
||||||
|
);
|
||||||
|
|
||||||
// deliver
|
// deliver
|
||||||
for (const inbox of inboxes) {
|
for (const inbox of inboxes) {
|
||||||
|
// skip instances as indicated
|
||||||
|
if (instancesToSkip.includes(new URL(inbox).host)) continue;
|
||||||
|
|
||||||
deliver(this.actor, this.activity, inbox);
|
deliver(this.actor, this.activity, inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue