diff --git a/src/post/create.ts b/src/post/create.ts index ecea37382d..f78bbe7521 100644 --- a/src/post/create.ts +++ b/src/post/create.ts @@ -1,8 +1,14 @@ import parseAcct from '../acct/parse'; -import Post from '../models/post'; -import User from '../models/user'; +import Post, { pack } from '../models/post'; +import User, { isLocalUser, isRemoteUser, IUser } from '../models/user'; +import stream from '../publishers/stream'; +import Following from '../models/following'; +import { createHttp } from '../queue'; +import renderNote from '../remote/activitypub/renderer/note'; +import renderCreate from '../remote/activitypub/renderer/create'; +import context from '../remote/activitypub/renderer/context'; -export default async (post, reply, repost, atMentions) => { +export default async (user: IUser, post, reply, repost, atMentions) => { post.mentions = []; function addMention(mentionee) { @@ -46,5 +52,98 @@ export default async (post, reply, repost, atMentions) => { addMention(_id); })); - return Post.insert(post); + const inserted = await Post.insert(post); + + User.update({ _id: user._id }, { + // Increment my posts count + $inc: { + postsCount: 1 + }, + + $set: { + latestPost: post._id + } + }); + + const postObj = await pack(inserted); + + // タイムラインへの投稿 + if (!post.channelId) { + // Publish event to myself's stream + stream(post.userId, 'post', postObj); + + // Fetch all followers + const followers = await Following.aggregate([{ + $lookup: { + from: 'users', + localField: 'followerId', + foreignField: '_id', + as: 'follower' + } + }, { + $match: { + followeeId: post.userId + } + }], { + _id: false + }); + + const note = await renderNote(user, post); + const content = renderCreate(note); + content['@context'] = context; + + Promise.all(followers.map(({ follower }) => { + if (isLocalUser(follower)) { + // Publish event to followers stream + stream(follower._id, 'post', postObj); + } else { + // フォロワーがリモートユーザーかつ投稿者がローカルユーザーなら投稿を配信 + if (isLocalUser(user)) { + createHttp({ + type: 'deliver', + user, + content, + to: follower.account.inbox + }).save(); + } + } + })); + } + + // チャンネルへの投稿 + /* TODO + if (post.channelId) { + promises.push( + // Increment channel index(posts count) + Channel.update({ _id: post.channelId }, { + $inc: { + index: 1 + } + }), + + // Publish event to channel + promisedPostObj.then(postObj => { + publishChannelStream(post.channelId, 'post', postObj); + }), + + Promise.all([ + promisedPostObj, + + // Get channel watchers + ChannelWatching.find({ + channelId: post.channelId, + // 削除されたドキュメントは除く + deletedAt: { $exists: false } + }) + ]).then(([postObj, watches]) => { + // チャンネルの視聴者(のタイムライン)に配信 + watches.forEach(w => { + stream(w.userId, 'post', postObj); + }); + }) + ); + }*/ + + return Promise.all(promises); + }; diff --git a/src/processor/http/deliver-post.ts b/src/processor/http/deliver-post.ts deleted file mode 100644 index c00ab912c9..0000000000 --- a/src/processor/http/deliver-post.ts +++ /dev/null @@ -1,93 +0,0 @@ -import Channel from '../../models/channel'; -import Following from '../../models/following'; -import ChannelWatching from '../../models/channel-watching'; -import Post, { pack } from '../../models/post'; -import User, { isLocalUser } from '../../models/user'; -import stream, { publishChannelStream } from '../../publishers/stream'; -import context from '../../remote/activitypub/renderer/context'; -import renderCreate from '../../remote/activitypub/renderer/create'; -import renderNote from '../../remote/activitypub/renderer/note'; -import request from '../../remote/request'; - -export default ({ data }) => Post.findOne({ _id: data.id }).then(post => { - const promisedPostObj = pack(post); - const promises = []; - - // タイムラインへの投稿 - if (!post.channelId) { - promises.push( - // Publish event to myself's stream - promisedPostObj.then(postObj => { - stream(post.userId, 'post', postObj); - }), - - Promise.all([ - User.findOne({ _id: post.userId }), - - // Fetch all followers - Following.aggregate([{ - $lookup: { - from: 'users', - localField: 'followerId', - foreignField: '_id', - as: 'follower' - } - }, { - $match: { - followeeId: post.userId - } - }], { - _id: false - }) - ]).then(([user, followers]) => Promise.all(followers.map(following => { - if (isLocalUser(following.follower)) { - // Publish event to followers stream - return promisedPostObj.then(postObj => { - stream(following.followerId, 'post', postObj); - }); - } - - return renderNote(user, post).then(note => { - const create = renderCreate(note); - create['@context'] = context; - return request(user, following.follower[0].account.inbox, create); - }); - }))) - ); - } - - // チャンネルへの投稿 - if (post.channelId) { - promises.push( - // Increment channel index(posts count) - Channel.update({ _id: post.channelId }, { - $inc: { - index: 1 - } - }), - - // Publish event to channel - promisedPostObj.then(postObj => { - publishChannelStream(post.channelId, 'post', postObj); - }), - - Promise.all([ - promisedPostObj, - - // Get channel watchers - ChannelWatching.find({ - channelId: post.channelId, - // 削除されたドキュメントは除く - deletedAt: { $exists: false } - }) - ]).then(([postObj, watches]) => { - // チャンネルの視聴者(のタイムライン)に配信 - watches.forEach(w => { - stream(w.userId, 'post', postObj); - }); - }) - ); - } - - return Promise.all(promises); -}); diff --git a/src/processor/http/process-inbox.ts b/src/processor/http/process-inbox.ts deleted file mode 100644 index f102f8d6b4..0000000000 --- a/src/processor/http/process-inbox.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { verifySignature } from 'http-signature'; -import parseAcct from '../../acct/parse'; -import User, { IRemoteUser } from '../../models/user'; -import act from '../../remote/activitypub/act'; -import resolvePerson from '../../remote/activitypub/resolve-person'; -import Resolver from '../../remote/activitypub/resolver'; - -export default async ({ data }): Promise => { - const keyIdLower = data.signature.keyId.toLowerCase(); - let user; - - if (keyIdLower.startsWith('acct:')) { - const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); - if (host === null) { - throw 'request was made by local user'; - } - - user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; - } else { - user = await User.findOne({ - host: { $ne: null }, - 'account.publicKey.id': data.signature.keyId - }) as IRemoteUser; - - if (user === null) { - user = await resolvePerson(data.signature.keyId); - } - } - - if (user === null) { - throw 'failed to resolve user'; - } - - if (!verifySignature(data.signature, user.account.publicKey.publicKeyPem)) { - throw 'signature verification failed'; - } - - await Promise.all(await act(new Resolver(), user, data.inbox, true)); -}; diff --git a/src/queue.ts b/src/queue.ts deleted file mode 100644 index 08ea13c2a3..0000000000 --- a/src/queue.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { createQueue } from 'kue'; -import config from './config'; - -export default createQueue({ - redis: { - port: config.redis.port, - host: config.redis.host, - auth: config.redis.pass - } -}); diff --git a/src/queue/index.ts b/src/queue/index.ts new file mode 100644 index 0000000000..c8c436b18c --- /dev/null +++ b/src/queue/index.ts @@ -0,0 +1,37 @@ +import { createQueue } from 'kue'; +import config from '../config'; +import db from './processors/db'; +import http from './processors/http'; + +const queue = createQueue({ + redis: { + port: config.redis.port, + host: config.redis.host, + auth: config.redis.pass + } +}); + +export function createHttp(data) { + return queue + .create('http', data) + .attempts(16) + .backoff({ delay: 16384, type: 'exponential' }); +} + +export function createDb(data) { + return queue.create('db', data); +} + +export function process() { + queue.process('db', db); + + /* + 256 is the default concurrency limit of Mozilla Firefox and Google + Chromium. + a8af215e691f3a2205a3758d2d96e9d328e100ff - chromium/src.git - Git at Google + https://chromium.googlesource.com/chromium/src.git/+/a8af215e691f3a2205a3758d2d96e9d328e100ff + Network.http.max-connections - MozillaZine Knowledge Base + http://kb.mozillazine.org/Network.http.max-connections + */ + queue.process('http', 256, http); +} diff --git a/src/processor/db/delete-post-dependents.ts b/src/queue/processors/db/delete-post-dependents.ts similarity index 100% rename from src/processor/db/delete-post-dependents.ts rename to src/queue/processors/db/delete-post-dependents.ts diff --git a/src/processor/db/index.ts b/src/queue/processors/db/index.ts similarity index 100% rename from src/processor/db/index.ts rename to src/queue/processors/db/index.ts diff --git a/src/queue/processors/http/deliver.ts b/src/queue/processors/http/deliver.ts new file mode 100644 index 0000000000..8cd9eb624e --- /dev/null +++ b/src/queue/processors/http/deliver.ts @@ -0,0 +1,17 @@ +import * as kue from 'kue'; + +import Channel from '../../models/channel'; +import Following from '../../models/following'; +import ChannelWatching from '../../models/channel-watching'; +import Post, { pack } from '../../models/post'; +import User, { isLocalUser } from '../../models/user'; +import stream, { publishChannelStream } from '../../publishers/stream'; +import context from '../../remote/activitypub/renderer/context'; +import renderCreate from '../../remote/activitypub/renderer/create'; +import renderNote from '../../remote/activitypub/renderer/note'; +import request from '../../remote/request'; + +export default async (job: kue.Job, done): Promise => { + + request(user, following.follower[0].account.inbox, create); +} diff --git a/src/processor/http/follow.ts b/src/queue/processors/http/follow.ts similarity index 100% rename from src/processor/http/follow.ts rename to src/queue/processors/http/follow.ts diff --git a/src/processor/http/index.ts b/src/queue/processors/http/index.ts similarity index 100% rename from src/processor/http/index.ts rename to src/queue/processors/http/index.ts diff --git a/src/processor/http/perform-activitypub.ts b/src/queue/processors/http/perform-activitypub.ts similarity index 100% rename from src/processor/http/perform-activitypub.ts rename to src/queue/processors/http/perform-activitypub.ts diff --git a/src/queue/processors/http/process-inbox.ts b/src/queue/processors/http/process-inbox.ts new file mode 100644 index 0000000000..fff1fbf663 --- /dev/null +++ b/src/queue/processors/http/process-inbox.ts @@ -0,0 +1,55 @@ +import * as kue from 'kue'; + +import { verifySignature } from 'http-signature'; +import parseAcct from '../../acct/parse'; +import User, { IRemoteUser } from '../../models/user'; +import act from '../../remote/activitypub/act'; +import resolvePerson from '../../remote/activitypub/resolve-person'; +import Resolver from '../../remote/activitypub/resolver'; + +// ユーザーのinboxにアクティビティが届いた時の処理 +export default async (job: kue.Job, done): Promise => { + const signature = job.data.signature; + const activity = job.data.activity; + + const keyIdLower = signature.keyId.toLowerCase(); + let user; + + if (keyIdLower.startsWith('acct:')) { + const { username, host } = parseAcct(keyIdLower.slice('acct:'.length)); + if (host === null) { + console.warn(`request was made by local user: @${username}`); + done(); + } + + user = await User.findOne({ usernameLower: username, hostLower: host }) as IRemoteUser; + } else { + user = await User.findOne({ + host: { $ne: null }, + 'account.publicKey.id': signature.keyId + }) as IRemoteUser; + + // アクティビティを送信してきたユーザーがまだMisskeyサーバーに登録されていなかったら登録する + if (user === null) { + user = await resolvePerson(signature.keyId); + } + } + + if (user === null) { + done(new Error('failed to resolve user')); + return; + } + + if (!verifySignature(signature, user.account.publicKey.publicKeyPem)) { + done(new Error('signature verification failed')); + return; + } + + // アクティビティを処理 + try { + await act(new Resolver(), user, activity); + done(); + } catch (e) { + done(e); + } +}; diff --git a/src/processor/http/report-github-failure.ts b/src/queue/processors/http/report-github-failure.ts similarity index 100% rename from src/processor/http/report-github-failure.ts rename to src/queue/processors/http/report-github-failure.ts diff --git a/src/processor/http/unfollow.ts b/src/queue/processors/http/unfollow.ts similarity index 100% rename from src/processor/http/unfollow.ts rename to src/queue/processors/http/unfollow.ts diff --git a/src/processor/index.ts b/src/queue/processors/index.ts similarity index 100% rename from src/processor/index.ts rename to src/queue/processors/index.ts diff --git a/src/remote/activitypub/act/create.ts b/src/remote/activitypub/act/create.ts index fa681982cf..c1a30ce7d0 100644 --- a/src/remote/activitypub/act/create.ts +++ b/src/remote/activitypub/act/create.ts @@ -1,10 +1,92 @@ -import create from '../create'; -import Resolver from '../resolver'; +import { JSDOM } from 'jsdom'; +const createDOMPurify = require('dompurify'); -export default (resolver: Resolver, actor, activity, distribute) => { +import Resolver from '../resolver'; +import DriveFile from '../../../models/drive-file'; +import Post from '../../../models/post'; +import uploadFromUrl from '../../../drive/upload-from-url'; +import createPost from '../../../post/create'; + +export default async (resolver: Resolver, actor, activity): Promise => { if ('actor' in activity && actor.account.uri !== activity.actor) { - throw new Error(); + throw new Error('invalid actor'); } - return create(resolver, actor, activity.object, distribute); + const uri = activity.id || activity; + + try { + await Promise.all([ + DriveFile.findOne({ 'metadata.uri': uri }).then(file => { + if (file !== null) { + throw new Error(); + } + }, () => {}), + Post.findOne({ uri }).then(post => { + if (post !== null) { + throw new Error(); + } + }, () => {}) + ]); + } catch (object) { + throw new Error(`already registered: ${uri}`); + } + + const object = await resolver.resolve(activity); + + switch (object.type) { + case 'Image': + createImage(resolver, object); + break; + + case 'Note': + createNote(resolver, object); + break; + } + + /// + + async function createImage(resolver: Resolver, image) { + if ('attributedTo' in image && actor.account.uri !== image.attributedTo) { + throw new Error('invalid image'); + } + + return await uploadFromUrl(image.url, actor); + } + + async function createNote(resolver: Resolver, note) { + if ( + ('attributedTo' in note && actor.account.uri !== note.attributedTo) || + typeof note.id !== 'string' + ) { + throw new Error('invalid note'); + } + + const mediaIds = []; + + if ('attachment' in note) { + note.attachment.forEach(async media => { + const created = await createImage(resolver, media); + mediaIds.push(created._id); + }); + } + + const { window } = new JSDOM(note.content); + + await createPost(actor, { + channelId: undefined, + index: undefined, + createdAt: new Date(note.published), + mediaIds, + replyId: undefined, + repostId: undefined, + poll: undefined, + text: window.document.body.textContent, + textHtml: note.content && createDOMPurify(window).sanitize(note.content), + userId: actor._id, + appId: null, + viaMobile: false, + geo: undefined, + uri: note.id + }, null, null, []); + } }; diff --git a/src/remote/activitypub/act/index.ts b/src/remote/activitypub/act/index.ts index d282e12885..d78335f16e 100644 --- a/src/remote/activitypub/act/index.ts +++ b/src/remote/activitypub/act/index.ts @@ -2,35 +2,29 @@ import create from './create'; import performDeleteActivity from './delete'; import follow from './follow'; import undo from './undo'; -import createObject from '../create'; import Resolver from '../resolver'; +import { IObject } from '../type'; -export default async (parentResolver: Resolver, actor, value, distribute?: boolean) => { - const collection = await parentResolver.resolveCollection(value); +export default async (parentResolver: Resolver, actor, activity: IObject): Promise => { + switch (activity.type) { + case 'Create': + await create(parentResolver, actor, activity); + break; - return collection.object.map(async element => { - const { resolver, object } = await collection.resolver.resolveOne(element); - const created = await (await createObject(resolver, actor, [object], distribute))[0]; + case 'Delete': + await performDeleteActivity(parentResolver, actor, activity); + break; - if (created !== null) { - return created; - } + case 'Follow': + await follow(parentResolver, actor, activity); + break; - switch (object.type) { - case 'Create': - return create(resolver, actor, object, distribute); + case 'Undo': + await undo(parentResolver, actor, activity); + break; - case 'Delete': - return performDeleteActivity(resolver, actor, object); - - case 'Follow': - return follow(resolver, actor, object, distribute); - - case 'Undo': - return undo(resolver, actor, object); - - default: - return null; - } - }); + default: + console.warn(`unknown activity type: ${activity.type}`); + return null; + } }; diff --git a/src/remote/activitypub/create.ts b/src/remote/activitypub/create.ts deleted file mode 100644 index 97c72860fd..0000000000 --- a/src/remote/activitypub/create.ts +++ /dev/null @@ -1,158 +0,0 @@ -import { JSDOM } from 'jsdom'; -import { ObjectID } from 'mongodb'; -import config from '../../config'; -import DriveFile from '../../models/drive-file'; -import Post from '../../models/post'; -import { IRemoteUser } from '../../models/user'; -import uploadFromUrl from '../../drive/upload-from-url'; -import createPost from '../../post/create'; -import distributePost from '../../post/distribute'; -import Resolver from './resolver'; -const createDOMPurify = require('dompurify'); - -type IResult = { - resolver: Resolver; - object: { - $ref: string; - $id: ObjectID; - }; -}; - -class Creator { - private actor: IRemoteUser; - private distribute: boolean; - - constructor(actor, distribute) { - this.actor = actor; - this.distribute = distribute; - } - - private async createImage(resolver: Resolver, image) { - if ('attributedTo' in image && this.actor.account.uri !== image.attributedTo) { - throw new Error(); - } - - const { _id } = await uploadFromUrl(image.url, this.actor, image.id || null); - return { - resolver, - object: { $ref: 'driveFiles.files', $id: _id } - }; - } - - private async createNote(resolver: Resolver, note) { - if ( - ('attributedTo' in note && this.actor.account.uri !== note.attributedTo) || - typeof note.id !== 'string' - ) { - throw new Error(); - } - - const mediaIds = 'attachment' in note && - (await Promise.all(await this.create(resolver, note.attachment))) - .filter(media => media !== null && media.object.$ref === 'driveFiles.files') - .map(({ object }) => object.$id); - - const { window } = new JSDOM(note.content); - - const inserted = await createPost({ - channelId: undefined, - index: undefined, - createdAt: new Date(note.published), - mediaIds, - replyId: undefined, - repostId: undefined, - poll: undefined, - text: window.document.body.textContent, - textHtml: note.content && createDOMPurify(window).sanitize(note.content), - userId: this.actor._id, - appId: null, - viaMobile: false, - geo: undefined, - uri: note.id - }, null, null, []); - - const promises = []; - - if (this.distribute) { - promises.push(distributePost(this.actor, inserted.mentions, inserted)); - } - - // Register to search database - if (note.content && config.elasticsearch.enable) { - const es = require('../../db/elasticsearch'); - - promises.push(new Promise((resolve, reject) => { - es.index({ - index: 'misskey', - type: 'post', - id: inserted._id.toString(), - body: { - text: window.document.body.textContent - } - }, resolve); - })); - } - - await Promise.all(promises); - - return { - resolver, - object: { $ref: 'posts', id: inserted._id } - }; - } - - public async create(parentResolver: Resolver, value): Promise>> { - const collection = await parentResolver.resolveCollection(value); - - return collection.object.map(async element => { - const uri = element.id || element; - - try { - await Promise.all([ - DriveFile.findOne({ 'metadata.uri': uri }).then(file => { - if (file === null) { - return; - } - - throw { - $ref: 'driveFile.files', - $id: file._id - }; - }, () => {}), - Post.findOne({ uri }).then(post => { - if (post === null) { - return; - } - - throw { - $ref: 'posts', - $id: post._id - }; - }, () => {}) - ]); - } catch (object) { - return { - resolver: collection.resolver, - object - }; - } - - const { resolver, object } = await collection.resolver.resolveOne(element); - - switch (object.type) { - case 'Image': - return this.createImage(resolver, object); - - case 'Note': - return this.createNote(resolver, object); - } - - return null; - }); - } -} - -export default (resolver: Resolver, actor, value, distribute?: boolean) => { - const creator = new Creator(actor, distribute); - return creator.create(resolver, value); -}; diff --git a/src/remote/activitypub/resolver.ts b/src/remote/activitypub/resolver.ts index 371ccdcc30..de0bba2687 100644 --- a/src/remote/activitypub/resolver.ts +++ b/src/remote/activitypub/resolver.ts @@ -1,20 +1,45 @@ +import { IObject } from "./type"; + const request = require('request-promise-native'); export default class Resolver { - private requesting: Set; + private history: Set; - constructor(iterable?: Iterable) { - this.requesting = new Set(iterable); + constructor() { + this.history = new Set(); } - private async resolveUnrequestedOne(value) { - if (typeof value !== 'string') { - return { resolver: this, object: value }; + public async resolveCollection(value) { + const collection = typeof value === 'string' + ? await this.resolve(value) + : value; + + switch (collection.type) { + case 'Collection': + collection.objects = collection.object.items; + break; + + case 'OrderedCollection': + collection.objects = collection.object.orderedItems; + break; + + default: + throw new Error(`unknown collection type: ${collection.type}`); } - const resolver = new Resolver(this.requesting); + return collection; + } - resolver.requesting.add(value); + public async resolve(value): Promise { + if (typeof value !== 'string') { + return value; + } + + if (this.history.has(value)) { + throw new Error('cannot resolve already resolved one'); + } + + this.history.add(value); const object = await request({ url: value, @@ -29,41 +54,9 @@ export default class Resolver { !object['@context'].includes('https://www.w3.org/ns/activitystreams') : object['@context'] !== 'https://www.w3.org/ns/activitystreams' )) { - throw new Error(); + throw new Error('invalid response'); } - return { resolver, object }; - } - - public async resolveCollection(value) { - const resolved = typeof value === 'string' ? - await this.resolveUnrequestedOne(value) : - { resolver: this, object: value }; - - switch (resolved.object.type) { - case 'Collection': - resolved.object = resolved.object.items; - break; - - case 'OrderedCollection': - resolved.object = resolved.object.orderedItems; - break; - - default: - if (!Array.isArray(value)) { - resolved.object = [resolved.object]; - } - break; - } - - return resolved; - } - - public resolveOne(value) { - if (this.requesting.has(value)) { - throw new Error(); - } - - return this.resolveUnrequestedOne(value); + return object; } } diff --git a/src/server/activitypub/inbox.ts b/src/server/activitypub/inbox.ts index 5de8433850..847dc19af6 100644 --- a/src/server/activitypub/inbox.ts +++ b/src/server/activitypub/inbox.ts @@ -24,7 +24,7 @@ app.post('/@:user/inbox', bodyParser.json({ queue.create('http', { type: 'processInbox', - inbox: req.body, + activity: req.body, signature, }).save();