diff --git a/src/following/distribute.ts b/src/following/distribute.ts new file mode 100644 index 0000000000..10ff988814 --- /dev/null +++ b/src/following/distribute.ts @@ -0,0 +1,42 @@ +import User, { pack as packUser } from '../models/user'; +import FollowingLog from '../models/following-log'; +import FollowedLog from '../models/followed-log'; +import event from '../publishers/stream'; +import notify from '../publishers/notify'; + +export default async (follower, followee) => Promise.all([ + // Increment following count + User.update(follower._id, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followee._id }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: followee.followersCount + 1 + }), + + followee.host === null && Promise.all([ + // Notify + notify(followee.id, follower.id, 'follow'), + + // Publish follow event + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)) + ]) +]); diff --git a/src/index.ts b/src/index.ts index 29c4f3431a..21fb2f5530 100644 --- a/src/index.ts +++ b/src/index.ts @@ -99,7 +99,7 @@ async function workerMain(opt) { if (!opt['only-server']) { // start processor - require('./processor').default(); + require('./queue').process(); } // Send a 'ready' message to parent process diff --git a/src/post/distribute.ts b/src/post/distribute.ts index 49c6eb22df..f748a620c0 100644 --- a/src/post/distribute.ts +++ b/src/post/distribute.ts @@ -1,11 +1,14 @@ +import Channel from '../models/channel'; +import ChannelWatching from '../models/channel-watching'; +import Following from '../models/following'; import Mute from '../models/mute'; import Post, { pack } from '../models/post'; import Watching from '../models/post-watching'; -import User from '../models/user'; -import stream from '../publishers/stream'; +import User, { isLocalUser } from '../models/user'; +import stream, { publishChannelStream } from '../publishers/stream'; import notify from '../publishers/notify'; import pushSw from '../publishers/push-sw'; -import queue from '../queue'; +import { createHttp } from '../queue'; import watch from './watch'; export default async (user, mentions, post) => { @@ -21,10 +24,6 @@ export default async (user, mentions, post) => { latestPost: post._id } }), - new Promise((resolve, reject) => queue.create('http', { - type: 'deliverPost', - id: post._id, - }).save(error => error ? reject(error) : resolve())), ] as Array>; function addMention(promisedMentionee, reason) { @@ -50,6 +49,91 @@ export default async (user, mentions, post) => { })); } + // タイムラインへの投稿 + if (!post.channelId) { + promises.push( + // Publish event to myself's stream + promisedPostObj.then(postObj => { + stream(post.userId, 'post', postObj); + }), + + Promise.all([ + User.findOne({ _id: post.userId }), + + // Fetch all followers + Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }) + ]).then(([user, followers]) => Promise.all(followers.map(following => { + if (isLocalUser(following.follower)) { + // Publish event to followers stream + return promisedPostObj.then(postObj => { + stream(following.followerId, 'post', postObj); + }); + } + + return new Promise((resolve, reject) => { + createHttp({ + type: 'deliverPost', + fromId: user._id, + toId: following.followerId, + postId: post._id + }).save(error => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }))) + ); + } + + // チャンネルへの投稿 + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + } + // If has in reply to post if (post.replyId) { promises.push( diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts deleted file mode 100644 index c00ab912c9..0000000000 --- a/src/processor/http/deliver-post.ts +++ /dev/null @@ -1,93 +0,0 @@ -import Channel from '../../models/channel'; -import Following from '../../models/following'; -import ChannelWatching from '../../models/channel-watching'; -import Post, { pack } from '../../models/post'; -import User, { isLocalUser } from '../../models/user'; -import stream, { publishChannelStream } from '../../publishers/stream'; -import context from '../../remote/activitypub/renderer/context'; -import renderCreate from '../../remote/activitypub/renderer/create'; -import renderNote from '../../remote/activitypub/renderer/note'; -import request from '../../remote/request'; - -export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { - const promisedPostObj = pack(post); - const promises = []; - - // タイムラインへの投稿 - if (!post.channelId) { - promises.push( - // Publish event to myself's stream - promisedPostObj.then(postObj => { - stream(post.userId, 'post', postObj); - }), - - Promise.all([ - User.findOne({ _id: post.userId }), - - // Fetch all followers - Following.aggregate([{ - $lookup: { - from: 'users', - localField: 'followerId', - foreignField: '_id', - as: 'follower' - } - }, { - $match: { - followeeId: post.userId - } - }], { - _id: false - }) - ]).then(([user, followers]) => Promise.all(followers.map(following => { - if (isLocalUser(following.follower)) { - // Publish event to followers stream - return promisedPostObj.then(postObj => { - stream(following.followerId, 'post', postObj); - }); - } - - return renderNote(user, post).then(note => { - const create = renderCreate(note); - create['@context'] = context; - return request(user, following.follower[0].account.inbox, create); - }); - }))) - ); - } - - // チャンネルへの投稿 - if (post.channelId) { - promises.push( - // Increment channel index(posts count) - Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } - }), - - // Publish event to channel - promisedPostObj.then(postObj => { - publishChannelStream(post.channelId, 'post', postObj); - }), - - Promise.all([ - promisedPostObj, - - // Get channel watchers - ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }) - ]).then(([postObj, watches]) => { - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', postObj); - }); - }) - ); - } - - return Promise.all(promises); -}); diff --git a/src/processor/http/follow.ts b/src/processor/http/follow.ts deleted file mode 100644 index 8bf890efbc..0000000000 --- a/src/processor/http/follow.ts +++ /dev/null @@ -1,69 +0,0 @@ -import User, { isLocalUser, pack as packUser } from '../../models/user'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import FollowedLog from '../../models/followed-log'; -import event from '../../publishers/stream'; -import notify from '../../publishers/notify'; -import context from '../../remote/activitypub/renderer/context'; -import render from '../../remote/activitypub/renderer/follow'; -import request from '../../remote/request'; - -export default ({ data }) => Following.findOne({ _id: data.following }).then(({ followerId, followeeId }) => { - const promisedFollower = User.findOne({ _id: followerId }); - const promisedFollowee = User.findOne({ _id: followeeId }); - - return Promise.all([ - // Increment following count - User.update(followerId, { - $inc: { - followingCount: 1 - } - }), - - promisedFollower.then(({ followingCount }) => FollowingLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followingCount + 1 - })), - - // Increment followers count - User.update({ _id: followeeId }, { - $inc: { - followersCount: 1 - } - }), - - promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ - createdAt: data.following.createdAt, - userId: followerId, - count: followersCount + 1 - })), - - // Notify - promisedFollowee.then(followee => followee.host === null ? - notify(followeeId, followerId, 'follow') : null), - - // Publish follow event - Promise.all([promisedFollower, promisedFollowee]).then(([follower, followee]) => { - let followerEvent; - let followeeEvent; - - if (isLocalUser(follower)) { - followerEvent = packUser(followee, follower) - .then(packed => event(follower._id, 'follow', packed)); - } - - if (isLocalUser(followee)) { - followeeEvent = packUser(follower, followee) - .then(packed => event(followee._id, 'followed', packed)); - } else if (isLocalUser(follower)) { - const rendered = render(follower, followee); - rendered['@context'] = context; - - followeeEvent = request(follower, followee.account.inbox, rendered); - } - - return Promise.all([followerEvent, followeeEvent]); - }) - ]); -}); diff --git a/src/processor/http/perform-activitypub.ts b/src/processor/http/perform-activitypub.ts deleted file mode 100644 index 963e532fe5..0000000000 --- a/src/processor/http/perform-activitypub.ts +++ /dev/null @@ -1,7 +0,0 @@ -import User from '../../models/user'; -import act from '../../remote/activitypub/act'; -import Resolver from '../../remote/activitypub/resolver'; - -export default ({ data }) => User.findOne({ _id: data.actor }) - .then(actor => act(new Resolver(), actor, data.outbox)) - .then(Promise.all); diff --git a/src/processor/http/unfollow.ts b/src/processor/http/unfollow.ts deleted file mode 100644 index d3d5f2246f..0000000000 --- a/src/processor/http/unfollow.ts +++ /dev/null @@ -1,56 +0,0 @@ -import FollowedLog from '../../models/followed-log'; -import Following from '../../models/following'; -import FollowingLog from '../../models/following-log'; -import User, { isRemoteUser, pack as packUser } from '../../models/user'; -import stream from '../../publishers/stream'; -import renderFollow from '../../remote/activitypub/renderer/follow'; -import renderUndo from '../../remote/activitypub/renderer/undo'; -import context from '../../remote/activitypub/renderer/context'; -import request from '../../remote/request'; - -export default async ({ data }) => { - // Delete following - const following = await Following.findOneAndDelete({ _id: data.id }); - if (following === null) { - return; - } - - const promisedFollower = User.findOne({ _id: following.followerId }); - const promisedFollowee = User.findOne({ _id: following.followeeId }); - - await Promise.all([ - // Decrement following count - User.update({ _id: following.followerId }, { $inc: { followingCount: -1 } }), - promisedFollower.then(({ followingCount }) => FollowingLog.insert({ - createdAt: new Date(), - userId: following.followerId, - count: followingCount - 1 - })), - - // Decrement followers count - User.update({ _id: following.followeeId }, { $inc: { followersCount: -1 } }), - promisedFollowee.then(({ followersCount }) => FollowedLog.insert({ - createdAt: new Date(), - userId: following.followeeId, - count: followersCount - 1 - })), - - // Publish follow event - Promise.all([promisedFollower, promisedFollowee]).then(async ([follower, followee]) => { - if (isRemoteUser(follower)) { - return; - } - - const promisedPackedUser = packUser(followee, follower); - - if (isRemoteUser(followee)) { - const undo = renderUndo(renderFollow(follower, followee)); - undo['@context'] = context; - - await request(follower, followee.account.inbox, undo); - } - - stream(follower._id, 'unfollow', promisedPackedUser); - }) - ]); -}; diff --git a/src/processor/index.ts b/src/processor/index.ts deleted file mode 100644 index 172048ddae..0000000000 --- a/src/processor/index.ts +++ /dev/null @@ -1,18 +0,0 @@ -import queue from '../queue'; -import db from './db'; -import http from './http'; - -export default () => { - queue.process('db', db); - - /* - 256 is the default concurrency limit of Mozilla Firefox and Google - Chromium. - - a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google - https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff - Network.http.max-connections - MozillaZine Knowledge Base - http://kb.mozillazine.org/Network.http.max-connections - */ - queue.process('http', 256, http); -}; diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a3..0000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..f90754a561 --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,38 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts similarity index 59% rename from src/processor/db/delete-post-dependents.ts rename to src/queue/processors/db/delete-post-dependents.ts index 879c41ec9c..6de21eb053 100644 --- a/src/processor/db/delete-post-dependents.ts +++ b/src/queue/processors/db/delete-post-dependents.ts @@ -1,9 +1,9 @@ -import Favorite from '../../models/favorite'; -import Notification from '../../models/notification'; -import PollVote from '../../models/poll-vote'; -import PostReaction from '../../models/post-reaction'; -import PostWatching from '../../models/post-watching'; -import Post from '../../models/post'; +import Favorite from '../../../models/favorite'; +import Notification from '../../../models/notification'; +import PollVote from '../../../models/poll-vote'; +import PostReaction from '../../../models/post-reaction'; +import PostWatching from '../../../models/post-watching'; +import Post from '../../../models/post'; export default async ({ data }) => Promise.all([ Favorite.remove({ postId: data._id }), diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts similarity index 100% rename from src/processor/db/index.ts rename to src/queue/processors/db/index.ts diff --git a/src/queue/processors/http/deliver-post.ts b/src/queue/processors/http/deliver-post.ts new file mode 100644 index 0000000000..e743fc5f68 --- /dev/null +++ b/src/queue/processors/http/deliver-post.ts @@ -0,0 +1,21 @@ +import Post from '../../../models/post'; +import User, { IRemoteUser } from '../../../models/user'; +import context from '../../../remote/activitypub/renderer/context'; +import renderCreate from '../../../remote/activitypub/renderer/create'; +import renderNote from '../../../remote/activitypub/renderer/note'; +import request from '../../../remote/request'; + +export default async ({ data }) => { + const promisedTo = User.findOne({ _id: data.toId }) as Promise; + const [from, post] = await Promise.all([ + User.findOne({ _id: data.fromId }), + Post.findOne({ _id: data.postId }) + ]); + const note = await renderNote(from, post); + const to = await promisedTo; + const create = renderCreate(note); + + create['@context'] = context; + + return request(from, to.account.inbox, create); +}; diff --git a/src/queue/processors/http/follow.ts b/src/queue/processors/http/follow.ts new file mode 100644 index 0000000000..4cb72828e7 --- /dev/null +++ b/src/queue/processors/http/follow.ts @@ -0,0 +1,69 @@ +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import FollowedLog from '../../../models/followed-log'; +import event from '../../../publishers/stream'; +import notify from '../../../publishers/notify'; +import context from '../../../remote/activitypub/renderer/context'; +import render from '../../../remote/activitypub/renderer/follow'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const { followerId, followeeId } = await Following.findOne({ _id: data.following }); + const [follower, followee] = await Promise.all([ + User.findOne({ _id: followerId }), + User.findOne({ _id: followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const rendered = render(follower, followee); + rendered['@context'] = context; + + await request(follower, followee.account.inbox, rendered); + } + + try { + await Promise.all([ + // Increment following count + User.update(followerId, { + $inc: { + followingCount: 1 + } + }), + + FollowingLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: follower.followingCount + 1 + }), + + // Increment followers count + User.update({ _id: followeeId }, { + $inc: { + followersCount: 1 + } + }), + + FollowedLog.insert({ + createdAt: data.following.createdAt, + userId: followerId, + count: followee.followersCount + 1 + }), + + // Publish follow event + isLocalUser(follower) && packUser(followee, follower) + .then(packed => event(follower._id, 'follow', packed)), + + isLocalUser(followee) && Promise.all([ + packUser(follower, followee) + .then(packed => event(followee._id, 'followed', packed)), + + // Notify + isLocalUser(followee) && notify(followeeId, followerId, 'follow') + ]) + ]); + } catch (error) { + Logger.error(error.toString()); + } +}; diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts similarity index 100% rename from src/processor/http/index.ts rename to src/queue/processors/http/index.ts diff --git a/src/queue/processors/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts new file mode 100644 index 0000000000..7b84400d5c --- /dev/null +++ b/src/queue/processors/http/perform-activitypub.ts @@ -0,0 +1,7 @@ +import User from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import Resolver from '../../../remote/activitypub/resolver'; + +export default ({ data }) => User.findOne({ _id: data.actor }) + .then(actor => act(new Resolver(), actor, data.outbox)) + .then(Promise.all); diff --git a/src/processor/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts similarity index 76% rename from src/processor/http/process-inbox.ts rename to src/queue/processors/http/process-inbox.ts index f102f8d6b4..de1dbd2f98 100644 --- a/src/processor/http/process-inbox.ts +++ b/src/queue/processors/http/process-inbox.ts @@ -1,9 +1,9 @@ import { verifySignature } from 'http-signature'; -import parseAcct from '../../acct/parse'; -import User, { IRemoteUser } from '../../models/user'; -import act from '../../remote/activitypub/act'; -import resolvePerson from '../../remote/activitypub/resolve-person'; -import Resolver from '../../remote/activitypub/resolver'; +import parseAcct from '../../../acct/parse'; +import User, { IRemoteUser } from '../../../models/user'; +import act from '../../../remote/activitypub/act'; +import resolvePerson from '../../../remote/activitypub/resolve-person'; +import Resolver from '../../../remote/activitypub/resolver'; export default async ({ data }): Promise => { const keyIdLower = data.signature.keyId.toLowerCase(); diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts similarity index 85% rename from src/processor/http/report-github-failure.ts rename to src/queue/processors/http/report-github-failure.ts index 4f6f5ccee5..21683ba3c2 100644 --- a/src/processor/http/report-github-failure.ts +++ b/src/queue/processors/http/report-github-failure.ts @@ -1,6 +1,6 @@ import * as request from 'request-promise-native'; -import User from '../../models/user'; -const createPost = require('../../server/api/endpoints/posts/create'); +import User from '../../../models/user'; +const createPost = require('../../../server/api/endpoints/posts/create'); export default async ({ data }) => { const asyncBot = User.findOne({ _id: data.userId }); diff --git a/src/queue/processors/http/unfollow.ts b/src/queue/processors/http/unfollow.ts new file mode 100644 index 0000000000..801a3612a7 --- /dev/null +++ b/src/queue/processors/http/unfollow.ts @@ -0,0 +1,63 @@ +import FollowedLog from '../../../models/followed-log'; +import Following from '../../../models/following'; +import FollowingLog from '../../../models/following-log'; +import User, { isLocalUser, isRemoteUser, pack as packUser } from '../../../models/user'; +import stream from '../../../publishers/stream'; +import renderFollow from '../../../remote/activitypub/renderer/follow'; +import renderUndo from '../../../remote/activitypub/renderer/undo'; +import context from '../../../remote/activitypub/renderer/context'; +import request from '../../../remote/request'; +import Logger from '../../../utils/logger'; + +export default async ({ data }) => { + const following = await Following.findOne({ _id: data.id }); + if (following === null) { + return; + } + + const [follower, followee] = await Promise.all([ + User.findOne({ _id: following.followerId }), + User.findOne({ _id: following.followeeId }) + ]); + + if (isLocalUser(follower) && isRemoteUser(followee)) { + const undo = renderUndo(renderFollow(follower, followee)); + undo['@context'] = context; + + await request(follower, followee.account.inbox, undo); + } + + try { + await Promise.all([ + // Delete following + Following.findOneAndDelete({ _id: data.id }), + + // Decrement following count + User.update({ _id: follower._id }, { $inc: { followingCount: -1 } }), + FollowingLog.insert({ + createdAt: new Date(), + userId: follower._id, + count: follower.followingCount - 1 + }), + + // Decrement followers count + User.update({ _id: followee._id }, { $inc: { followersCount: -1 } }), + FollowedLog.insert({ + createdAt: new Date(), + userId: followee._id, + count: followee.followersCount - 1 + }) + ]); + + if (isLocalUser(follower)) { + return; + } + + const promisedPackedUser = packUser(followee, follower); + + // Publish follow event + stream(follower._id, 'unfollow', promisedPackedUser); + } catch (error) { + Logger.error(error.toString()); + } +}; diff --git a/src/remote/activitypub/act/follow.ts b/src/remote/activitypub/act/follow.ts index 23fa41df8e..222a257e1a 100644 --- a/src/remote/activitypub/act/follow.ts +++ b/src/remote/activitypub/act/follow.ts @@ -3,7 +3,7 @@ import parseAcct from '../../../acct/parse'; import Following, { IFollowing } from '../../../models/following'; import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; import context from '../renderer/context'; import renderAccept from '../renderer/accept'; import request from '../../request'; @@ -44,7 +44,7 @@ export default async (resolver: Resolver, actor, activity, distribute) => { followerId: actor._id, followeeId: followee._id }).then(following => new Promise((resolve, reject) => { - queue.create('http', { + createHttp({ type: 'follow', following: following._id }).save(error => { diff --git a/src/remote/activitypub/act/undo/unfollow.ts b/src/remote/activitypub/act/undo/unfollow.ts index c17e06e8a9..4f15d9a3e4 100644 --- a/src/remote/activitypub/act/undo/unfollow.ts +++ b/src/remote/activitypub/act/undo/unfollow.ts @@ -1,7 +1,7 @@ -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; export default ({ $id }) => new Promise((resolve, reject) => { - queue.create('http', { type: 'unfollow', id: $id }).save(error => { + createHttp({ type: 'unfollow', id: $id }).save(error => { if (error) { reject(error); } else { diff --git a/src/remote/activitypub/delete/post.ts b/src/remote/activitypub/delete/post.ts index f6c816647d..59ae8c2b94 100644 --- a/src/remote/activitypub/delete/post.ts +++ b/src/remote/activitypub/delete/post.ts @@ -1,10 +1,10 @@ import Post from '../../../models/post'; -import queue from '../../../queue'; +import { createDb } from '../../../queue'; export default async ({ $id }) => { const promisedDeletion = Post.findOneAndDelete({ _id: $id }); - await new Promise((resolve, reject) => queue.create('db', { + await new Promise((resolve, reject) => createDb({ type: 'deletePostDependents', id: $id }).delay(65536).save(error => error ? reject(error) : resolve())); diff --git a/src/remote/activitypub/resolve-person.ts b/src/remote/activitypub/resolve-person.ts index 59be65908e..2cf3ad32d8 100644 --- a/src/remote/activitypub/resolve-person.ts +++ b/src/remote/activitypub/resolve-person.ts @@ -1,7 +1,7 @@ import { JSDOM } from 'jsdom'; import { toUnicode } from 'punycode'; import User, { validateUsername, isValidName, isValidDescription } from '../../models/user'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; import webFinger from '../webfinger'; import create from './create'; import Resolver from './resolver'; @@ -69,7 +69,7 @@ export default async (value, verifier?: string) => { }, }); - queue.create('http', { + createHttp({ type: 'performActivityPub', actor: user._id, outbox diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts index 5de8433850..0907823b23 100644 --- a/src/server/activitypub/inbox.ts +++ b/src/server/activitypub/inbox.ts @@ -1,7 +1,7 @@ import * as bodyParser from 'body-parser'; import * as express from 'express'; import { parseRequest } from 'http-signature'; -import queue from '../../queue'; +import { createHttp } from '../../queue'; const app = express(); @@ -22,7 +22,7 @@ app.post('/@:user/inbox', bodyParser.json({ return res.sendStatus(401); } - queue.create('http', { + createHttp({ type: 'processInbox', inbox: req.body, signature, diff --git a/src/server/api/endpoints/following/create.ts b/src/server/api/endpoints/following/create.ts index e568595215..9ccbe20171 100644 --- a/src/server/api/endpoints/following/create.ts +++ b/src/server/api/endpoints/following/create.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Follow a user @@ -56,7 +56,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { followeeId: followee._id }); - queue.create('http', { type: 'follow', following: _id }).save(); + createHttp({ type: 'follow', following: _id }).save(); // Send response res(); diff --git a/src/server/api/endpoints/following/delete.ts b/src/server/api/endpoints/following/delete.ts index bf21bf0cb7..0684b87504 100644 --- a/src/server/api/endpoints/following/delete.ts +++ b/src/server/api/endpoints/following/delete.ts @@ -4,7 +4,7 @@ import $ from 'cafy'; import User from '../../../../models/user'; import Following from '../../../../models/following'; -import queue from '../../../../queue'; +import { createHttp } from '../../../../queue'; /** * Unfollow a user @@ -49,7 +49,7 @@ module.exports = (params, user) => new Promise(async (res, rej) => { return rej('already not following'); } - queue.create('http', { + createHttp({ type: 'unfollow', id: exist._id }).save(error => { diff --git a/src/server/api/service/github.ts b/src/server/api/service/github.ts index 4fd59c2a94..5fc4a92f57 100644 --- a/src/server/api/service/github.ts +++ b/src/server/api/service/github.ts @@ -3,7 +3,7 @@ import * as express from 'express'; //const crypto = require('crypto'); import User from '../../../models/user'; import config from '../../../config'; -import queue from '../../../queue'; +import { createHttp } from '../../../queue'; module.exports = async (app: express.Application) => { if (config.github_bot == null) return; @@ -42,7 +42,7 @@ module.exports = async (app: express.Application) => { const commit = event.commit; const parent = commit.parents[0]; - queue.create('http', { + createHttp({ type: 'gitHubFailureReport', userId: bot._id, parentUrl: parent.url,