From 482081c41b45ab3798e73c4d11e8a7c1c1f5e8c9 Mon Sep 17 00:00:00 2001 From: MeiMei <30769358+mei23@users.noreply.github.com> Date: Sat, 16 Oct 2021 17:16:24 +0900 Subject: [PATCH] Refactor request (#7814) * status code * Test ap-request.ts https://github.com/mei23/crytest/blob/4397fc5e70536e4175fe56e974ca83b8047bef3a/test/ap-request.ts * tune --- src/misc/download-url.ts | 21 +-- src/misc/fetch.ts | 67 ++++--- src/queue/processors/deliver.ts | 7 +- src/queue/processors/inbox.ts | 3 +- src/remote/activitypub/ap-request.ts | 104 +++++++++++ .../activitypub/kernel/announce/note.ts | 3 +- src/remote/activitypub/kernel/create/note.ts | 3 +- src/remote/activitypub/models/note.ts | 3 +- src/remote/activitypub/request.ts | 166 ++++-------------- src/server/file/send-drive-file.ts | 5 +- src/server/proxy/proxy-media.ts | 5 +- test/ap-request.ts | 55 ++++++ 12 files changed, 268 insertions(+), 174 deletions(-) create mode 100644 src/remote/activitypub/ap-request.ts create mode 100644 test/ap-request.ts diff --git a/src/misc/download-url.ts b/src/misc/download-url.ts index 8a8640a8c..c96b4fd1d 100644 --- a/src/misc/download-url.ts +++ b/src/misc/download-url.ts @@ -2,7 +2,7 @@ import * as fs from 'fs'; import * as stream from 'stream'; import * as util from 'util'; import got, * as Got from 'got'; -import { httpAgent, httpsAgent } from './fetch'; +import { httpAgent, httpsAgent, StatusError } from './fetch'; import config from '@/config/index'; import * as chalk from 'chalk'; import Logger from '@/services/logger'; @@ -37,6 +37,7 @@ export async function downloadUrl(url: string, path: string) { http: httpAgent, https: httpsAgent, }, + http2: false, // default retry: 0, }).on('response', (res: Got.Response) => { if ((process.env.NODE_ENV === 'production' || process.env.NODE_ENV === 'test') && !config.proxy && res.ip) { @@ -59,17 +60,17 @@ export async function downloadUrl(url: string, path: string) { logger.warn(`maxSize exceeded (${progress.transferred} > ${maxSize}) on downloadProgress`); req.destroy(); } - }).on('error', (e: any) => { - if (e.name === 'HTTPError') { - const statusCode = e.response?.statusCode; - const statusMessage = e.response?.statusMessage; - e.name = `StatusError`; - e.statusCode = statusCode; - e.message = `${statusCode} ${statusMessage}`; - } }); - await pipeline(req, fs.createWriteStream(path)); + try { + await pipeline(req, fs.createWriteStream(path)); + } catch (e) { + if (e instanceof Got.HTTPError) { + throw new StatusError(`${e.response.statusCode} ${e.response.statusMessage}`, e.response.statusCode, e.response.statusMessage); + } else { + throw e; + } + } logger.succ(`Download finished: ${chalk.cyan(url)}`); } diff --git a/src/misc/fetch.ts b/src/misc/fetch.ts index 82db2f2f8..f4f16a27e 100644 --- a/src/misc/fetch.ts +++ b/src/misc/fetch.ts @@ -1,51 +1,62 @@ import * as http from 'http'; import * as https from 'https'; import CacheableLookup from 'cacheable-lookup'; -import fetch, { HeadersInit } from 'node-fetch'; +import fetch from 'node-fetch'; import { HttpProxyAgent, HttpsProxyAgent } from 'hpagent'; import config from '@/config/index'; import { URL } from 'url'; -export async function getJson(url: string, accept = 'application/json, */*', timeout = 10000, headers?: HeadersInit) { - const res = await fetch(url, { +export async function getJson(url: string, accept = 'application/json, */*', timeout = 10000, headers?: Record) { + const res = await getResponse({ + url, + method: 'GET', headers: Object.assign({ 'User-Agent': config.userAgent, Accept: accept }, headers || {}), - timeout, - agent: getAgentByUrl, + timeout }); - if (!res.ok) { - throw { - name: `StatusError`, - statusCode: res.status, - message: `${res.status} ${res.statusText}`, - }; - } - return await res.json(); } -export async function getHtml(url: string, accept = 'text/html, */*', timeout = 10000, headers?: HeadersInit) { - const res = await fetch(url, { +export async function getHtml(url: string, accept = 'text/html, */*', timeout = 10000, headers?: Record) { + const res = await getResponse({ + url, + method: 'GET', headers: Object.assign({ 'User-Agent': config.userAgent, Accept: accept }, headers || {}), + timeout + }); + + return await res.text(); +} + +export async function getResponse(args: { url: string, method: string, body?: string, headers: Record, timeout?: number, size?: number }) { + const timeout = args?.timeout || 10 * 1000; + + const controller = new AbortController(); + setTimeout(() => { + controller.abort(); + }, timeout * 6); + + const res = await fetch(args.url, { + method: args.method, + headers: args.headers, + body: args.body, timeout, + size: args?.size || 10 * 1024 * 1024, agent: getAgentByUrl, + signal: controller.signal, }); if (!res.ok) { - throw { - name: `StatusError`, - statusCode: res.status, - message: `${res.status} ${res.statusText}`, - }; + throw new StatusError(`${res.status} ${res.statusText}`, res.status, res.statusText); } - return await res.text(); + return res; } const cache = new CacheableLookup({ @@ -114,3 +125,17 @@ export function getAgentByUrl(url: URL, bypassProxy = false) { return url.protocol == 'http:' ? httpAgent : httpsAgent; } } + +export class StatusError extends Error { + public statusCode: number; + public statusMessage?: string; + public isClientError: boolean; + + constructor(message: string, statusCode: number, statusMessage?: string) { + super(message); + this.name = 'StatusError'; + this.statusCode = statusCode; + this.statusMessage = statusMessage; + this.isClientError = typeof this.statusCode === 'number' && this.statusCode >= 400 && this.statusCode < 500; + } +} diff --git a/src/queue/processors/deliver.ts b/src/queue/processors/deliver.ts index 373e57cbd..3c61896a2 100644 --- a/src/queue/processors/deliver.ts +++ b/src/queue/processors/deliver.ts @@ -11,6 +11,7 @@ import { toPuny } from '@/misc/convert-host'; import { Cache } from '@/misc/cache'; import { Instance } from '@/models/entities/instance'; import { DeliverJobData } from '../types'; +import { StatusError } from '@/misc/fetch'; const logger = new Logger('deliver'); @@ -68,16 +69,16 @@ export default async (job: Bull.Job) => { registerOrFetchInstanceDoc(host).then(i => { Instances.update(i.id, { latestRequestSentAt: new Date(), - latestStatus: res != null && res.hasOwnProperty('statusCode') ? res.statusCode : null, + latestStatus: res instanceof StatusError ? res.statusCode : null, isNotResponding: true }); instanceChart.requestSent(i.host, false); }); - if (res != null && res.hasOwnProperty('statusCode')) { + if (res instanceof StatusError) { // 4xx - if (res.statusCode >= 400 && res.statusCode < 500) { + if (res.isClientError) { // HTTPステータスコード4xxはクライアントエラーであり、それはつまり // 何回再送しても成功することはないということなのでエラーにはしないでおく return `${res.statusCode} ${res.statusMessage}`; diff --git a/src/queue/processors/inbox.ts b/src/queue/processors/inbox.ts index e2c271cdf..4032ce865 100644 --- a/src/queue/processors/inbox.ts +++ b/src/queue/processors/inbox.ts @@ -14,6 +14,7 @@ import { InboxJobData } from '../types'; import DbResolver from '@/remote/activitypub/db-resolver'; import { resolvePerson } from '@/remote/activitypub/models/person'; import { LdSignature } from '@/remote/activitypub/misc/ld-signature'; +import { StatusError } from '@/misc/fetch'; const logger = new Logger('inbox'); @@ -53,7 +54,7 @@ export default async (job: Bull.Job): Promise => { authUser = await dbResolver.getAuthUserFromApId(getApId(activity.actor)); } catch (e) { // 対象が4xxならスキップ - if (e.statusCode >= 400 && e.statusCode < 500) { + if (e instanceof StatusError && e.isClientError) { return `skip: Ignored deleted actors on both ends ${activity.actor} - ${e.statusCode}`; } throw `Error in actor ${activity.actor} - ${e.statusCode || e}`; diff --git a/src/remote/activitypub/ap-request.ts b/src/remote/activitypub/ap-request.ts new file mode 100644 index 000000000..76a385714 --- /dev/null +++ b/src/remote/activitypub/ap-request.ts @@ -0,0 +1,104 @@ +import * as crypto from 'crypto'; +import { URL } from 'url'; + +type Request = { + url: string; + method: string; + headers: Record; +}; + +type PrivateKey = { + privateKeyPem: string; + keyId: string; +}; + +export function createSignedPost(args: { key: PrivateKey, url: string, body: string, additionalHeaders: Record }) { + const u = new URL(args.url); + const digestHeader = `SHA-256=${crypto.createHash('sha256').update(args.body).digest('base64')}`; + + const request: Request = { + url: u.href, + method: 'POST', + headers: objectAssignWithLcKey({ + 'Date': new Date().toUTCString(), + 'Host': u.hostname, + 'Content-Type': 'application/activity+json', + 'Digest': digestHeader, + }, args.additionalHeaders), + }; + + const result = signToRequest(request, args.key, ['(request-target)', 'date', 'host', 'digest']); + + return { + request, + signingString: result.signingString, + signature: result.signature, + signatureHeader: result.signatureHeader, + }; +} + +export function createSignedGet(args: { key: PrivateKey, url: string, additionalHeaders: Record }) { + const u = new URL(args.url); + + const request: Request = { + url: u.href, + method: 'GET', + headers: objectAssignWithLcKey({ + 'Accept': 'application/activity+json, application/ld+json', + 'Date': new Date().toUTCString(), + 'Host': new URL(args.url).hostname, + }, args.additionalHeaders), + }; + + const result = signToRequest(request, args.key, ['(request-target)', 'date', 'host', 'accept']); + + return { + request, + signingString: result.signingString, + signature: result.signature, + signatureHeader: result.signatureHeader, + }; +} + +function signToRequest(request: Request, key: PrivateKey, includeHeaders: string[]) { + const signingString = genSigningString(request, includeHeaders); + const signature = crypto.sign('sha256', Buffer.from(signingString), key.privateKeyPem).toString('base64'); + const signatureHeader = `keyId="${key.keyId}",algorithm="rsa-sha256",headers="${includeHeaders.join(' ')}",signature="${signature}"`; + + request.headers = objectAssignWithLcKey(request.headers, { + Signature: signatureHeader + }); + + return { + request, + signingString, + signature, + signatureHeader, + }; +} + +function genSigningString(request: Request, includeHeaders: string[]) { + request.headers = lcObjectKey(request.headers); + + const results: string[] = []; + + for (const key of includeHeaders.map(x => x.toLowerCase())) { + if (key === '(request-target)') { + results.push(`(request-target): ${request.method.toLowerCase()} ${new URL(request.url).pathname}`); + } else { + results.push(`${key}: ${request.headers[key]}`); + } + } + + return results.join('\n'); +} + +function lcObjectKey(src: Record) { + const dst: Record = {}; + for (const key of Object.keys(src).filter(x => x != '__proto__' && typeof src[x] === 'string')) dst[key.toLowerCase()] = src[key]; + return dst; +} + +function objectAssignWithLcKey(a: Record, b: Record) { + return Object.assign(lcObjectKey(a), lcObjectKey(b)); +} diff --git a/src/remote/activitypub/kernel/announce/note.ts b/src/remote/activitypub/kernel/announce/note.ts index b6ec090b9..5230867f2 100644 --- a/src/remote/activitypub/kernel/announce/note.ts +++ b/src/remote/activitypub/kernel/announce/note.ts @@ -8,6 +8,7 @@ import { extractDbHost } from '@/misc/convert-host'; import { fetchMeta } from '@/misc/fetch-meta'; import { getApLock } from '@/misc/app-lock'; import { parseAudience } from '../../audience'; +import { StatusError } from '@/misc/fetch'; const logger = apLogger; @@ -41,7 +42,7 @@ export default async function(resolver: Resolver, actor: IRemoteUser, activity: renote = await resolveNote(targetUri); } catch (e) { // 対象が4xxならスキップ - if (e.statusCode >= 400 && e.statusCode < 500) { + if (e instanceof StatusError && e.isClientError) { logger.warn(`Ignored announce target ${targetUri} - ${e.statusCode}`); return; } diff --git a/src/remote/activitypub/kernel/create/note.ts b/src/remote/activitypub/kernel/create/note.ts index 5dda85d0f..14e311e4c 100644 --- a/src/remote/activitypub/kernel/create/note.ts +++ b/src/remote/activitypub/kernel/create/note.ts @@ -4,6 +4,7 @@ import { createNote, fetchNote } from '../../models/note'; import { getApId, IObject, ICreate } from '../../type'; import { getApLock } from '@/misc/app-lock'; import { extractDbHost } from '@/misc/convert-host'; +import { StatusError } from '@/misc/fetch'; /** * 投稿作成アクティビティを捌きます @@ -32,7 +33,7 @@ export default async function(resolver: Resolver, actor: IRemoteUser, note: IObj await createNote(note, resolver, silent); return 'ok'; } catch (e) { - if (e.statusCode >= 400 && e.statusCode < 500) { + if (e instanceof StatusError && e.isClientError) { return `skip ${e.statusCode}`; } else { throw e; diff --git a/src/remote/activitypub/models/note.ts b/src/remote/activitypub/models/note.ts index 25004cb4d..cf68f3005 100644 --- a/src/remote/activitypub/models/note.ts +++ b/src/remote/activitypub/models/note.ts @@ -26,6 +26,7 @@ import { createMessage } from '@/services/messages/create'; import { parseAudience } from '../audience'; import { extractApMentions } from './mention'; import DbResolver from '../db-resolver'; +import { StatusError } from '@/misc/fetch'; const logger = apLogger; @@ -177,7 +178,7 @@ export async function createNote(value: string | IObject, resolver?: Resolver, s } } catch (e) { return { - status: e.statusCode >= 400 && e.statusCode < 500 ? 'permerror' : 'temperror' + status: (e instanceof StatusError && e.isClientError) ? 'permerror' : 'temperror' }; } }; diff --git a/src/remote/activitypub/request.ts b/src/remote/activitypub/request.ts index fe1009243..d6ced630c 100644 --- a/src/remote/activitypub/request.ts +++ b/src/remote/activitypub/request.ts @@ -1,66 +1,31 @@ -import * as http from 'http'; -import * as https from 'https'; -import { sign } from 'http-signature'; -import * as crypto from 'crypto'; - import config from '@/config/index'; -import { User } from '@/models/entities/user'; -import { getAgentByUrl } from '@/misc/fetch'; -import { URL } from 'url'; -import got from 'got'; -import * as Got from 'got'; import { getUserKeypair } from '@/misc/keypair-store'; +import { User } from '@/models/entities/user'; +import { getResponse } from '../../misc/fetch'; +import { createSignedPost, createSignedGet } from './ap-request'; export default async (user: { id: User['id'] }, url: string, object: any) => { - const timeout = 10 * 1000; - - const { protocol, hostname, port, pathname, search } = new URL(url); - - const data = JSON.stringify(object); - - const sha256 = crypto.createHash('sha256'); - sha256.update(data); - const hash = sha256.digest('base64'); + const body = JSON.stringify(object); const keypair = await getUserKeypair(user.id); - await new Promise((resolve, reject) => { - const req = https.request({ - agent: getAgentByUrl(new URL(`https://example.net`)), - protocol, - hostname, - port, - method: 'POST', - path: pathname + search, - timeout, - headers: { - 'User-Agent': config.userAgent, - 'Content-Type': 'application/activity+json', - 'Digest': `SHA-256=${hash}` - } - }, res => { - if (res.statusCode! >= 400) { - reject(res); - } else { - resolve(); - } - }); + const req = createSignedPost({ + key: { + privateKeyPem: keypair.privateKey, + keyId: `${config.url}/users/${user.id}#main-key` + }, + url, + body, + additionalHeaders: { + 'User-Agent': config.userAgent, + } + }); - sign(req, { - authorizationHeaderName: 'Signature', - key: keypair.privateKey, - keyId: `${config.url}/users/${user.id}#main-key`, - headers: ['(request-target)', 'date', 'host', 'digest'] - }); - - req.on('timeout', () => req.abort()); - - req.on('error', e => { - if (req.aborted) reject('timeout'); - reject(e); - }); - - req.end(data); + await getResponse({ + url, + method: req.request.method, + headers: req.request.headers, + body, }); }; @@ -70,87 +35,24 @@ export default async (user: { id: User['id'] }, url: string, object: any) => { * @param url URL to fetch */ export async function signedGet(url: string, user: { id: User['id'] }) { - const timeout = 10 * 1000; - const keypair = await getUserKeypair(user.id); - const req = got.get(url, { - headers: { - 'Accept': 'application/activity+json, application/ld+json', + const req = createSignedGet({ + key: { + privateKeyPem: keypair.privateKey, + keyId: `${config.url}/users/${user.id}#main-key` + }, + url, + additionalHeaders: { 'User-Agent': config.userAgent, - }, - responseType: 'json', - timeout, - hooks: { - beforeRequest: [ - options => { - options.request = (url: URL, opt: http.RequestOptions, callback?: (response: any) => void) => { - // Select custom agent by URL - opt.agent = getAgentByUrl(url, false); - - // Wrap original https?.request - const requestFunc = url.protocol === 'http:' ? http.request : https.request; - const clientRequest = requestFunc(url, opt, callback) as http.ClientRequest; - - // HTTP-Signature - sign(clientRequest, { - authorizationHeaderName: 'Signature', - key: keypair.privateKey, - keyId: `${config.url}/users/${user.id}#main-key`, - headers: ['(request-target)', 'host', 'date', 'accept'] - }); - - return clientRequest; - }; - }, - ], - }, - retry: 0, + } }); - const res = await receiveResponce(req, 10 * 1024 * 1024); + const res = await getResponse({ + url, + method: req.request.method, + headers: req.request.headers + }); - return res.body; -} - -/** - * Receive response (with size limit) - * @param req Request - * @param maxSize size limit - */ -export async function receiveResponce(req: Got.CancelableRequest>, maxSize: number) { - // 応答ヘッダでサイズチェック - req.on('response', (res: Got.Response) => { - const contentLength = res.headers['content-length']; - if (contentLength != null) { - const size = Number(contentLength); - if (size > maxSize) { - req.cancel(); - } - } - }); - - // 受信中のデータでサイズチェック - req.on('downloadProgress', (progress: Got.Progress) => { - if (progress.transferred > maxSize) { - req.cancel(); - } - }); - - // 応答取得 with ステータスコードエラーの整形 - const res = await req.catch(e => { - if (e.name === 'HTTPError') { - const statusCode = (e as Got.HTTPError).response.statusCode; - const statusMessage = (e as Got.HTTPError).response.statusMessage; - throw { - name: `StatusError`, - statusCode, - message: `${statusCode} ${statusMessage}`, - }; - } else { - throw e; - } - }); - - return res; + return await res.json(); } diff --git a/src/server/file/send-drive-file.ts b/src/server/file/send-drive-file.ts index a73164ed2..1908c969a 100644 --- a/src/server/file/send-drive-file.ts +++ b/src/server/file/send-drive-file.ts @@ -13,6 +13,7 @@ import { downloadUrl } from '@/misc/download-url'; import { detectType } from '@/misc/get-file-info'; import { convertToJpeg, convertToPngOrJpeg } from '@/services/drive/image-processor'; import { GenerateVideoThumbnail } from '@/services/drive/generate-video-thumbnail'; +import { StatusError } from '@/misc/fetch'; //const _filename = fileURLToPath(import.meta.url); const _filename = __filename; @@ -83,9 +84,9 @@ export default async function(ctx: Koa.Context) { ctx.set('Content-Type', image.type); ctx.set('Cache-Control', 'max-age=31536000, immutable'); } catch (e) { - serverLogger.error(e.statusCode); + serverLogger.error(`${e}`); - if (typeof e.statusCode === 'number' && e.statusCode >= 400 && e.statusCode < 500) { + if (e instanceof StatusError && e.isClientError) { ctx.status = e.statusCode; ctx.set('Cache-Control', 'max-age=86400'); } else { diff --git a/src/server/proxy/proxy-media.ts b/src/server/proxy/proxy-media.ts index 3bd65dfe6..9e13c0877 100644 --- a/src/server/proxy/proxy-media.ts +++ b/src/server/proxy/proxy-media.ts @@ -5,6 +5,7 @@ import { IImage, convertToPng, convertToJpeg } from '@/services/drive/image-proc import { createTemp } from '@/misc/create-temp'; import { downloadUrl } from '@/misc/download-url'; import { detectType } from '@/misc/get-file-info'; +import { StatusError } from '@/misc/fetch'; export async function proxyMedia(ctx: Koa.Context) { const url = 'url' in ctx.query ? ctx.query.url : 'https://' + ctx.params.url; @@ -37,9 +38,9 @@ export async function proxyMedia(ctx: Koa.Context) { ctx.set('Cache-Control', 'max-age=31536000, immutable'); ctx.body = image.data; } catch (e) { - serverLogger.error(e); + serverLogger.error(`${e}`); - if (typeof e.statusCode === 'number' && e.statusCode >= 400 && e.statusCode < 500) { + if (e instanceof StatusError && e.isClientError) { ctx.status = e.statusCode; } else { ctx.status = 500; diff --git a/test/ap-request.ts b/test/ap-request.ts new file mode 100644 index 000000000..4a9799eb9 --- /dev/null +++ b/test/ap-request.ts @@ -0,0 +1,55 @@ +import * as assert from 'assert'; +import { genRsaKeyPair } from '../src/misc/gen-key-pair'; +import { createSignedPost, createSignedGet } from '../src/remote/activitypub/ap-request'; +const httpSignature = require('http-signature'); + +export const buildParsedSignature = (signingString: string, signature: string, algorithm: string) => { + return { + scheme: 'Signature', + params: { + keyId: 'KeyID', // dummy, not used for verify + algorithm: algorithm, + headers: [ '(request-target)', 'date', 'host', 'digest' ], // dummy, not used for verify + signature: signature, + }, + signingString: signingString, + algorithm: algorithm?.toUpperCase(), + keyId: 'KeyID', // dummy, not used for verify + }; +}; + +describe('ap-request', () => { + it('createSignedPost with verify', async () => { + const keypair = await genRsaKeyPair(); + const key = { keyId: 'x', 'privateKeyPem': keypair.privateKey }; + const url = 'https://example.com/inbox'; + const activity = { a: 1 }; + const body = JSON.stringify(activity); + const headers = { + 'User-Agent': 'UA' + }; + + const req = createSignedPost({ key, url, body, additionalHeaders: headers }); + + const parsed = buildParsedSignature(req.signingString, req.signature, 'rsa-sha256'); + + const result = httpSignature.verifySignature(parsed, keypair.publicKey); + assert.deepStrictEqual(result, true); + }); + + it('createSignedGet with verify', async () => { + const keypair = await genRsaKeyPair(); + const key = { keyId: 'x', 'privateKeyPem': keypair.privateKey }; + const url = 'https://example.com/outbox'; + const headers = { + 'User-Agent': 'UA' + }; + + const req = createSignedGet({ key, url, additionalHeaders: headers }); + + const parsed = buildParsedSignature(req.signingString, req.signature, 'rsa-sha256'); + + const result = httpSignature.verifySignature(parsed, keypair.publicKey); + assert.deepStrictEqual(result, true); + }); +});