From 367257c3ad5d3a919dfe71b2936a48637f93e9fb Mon Sep 17 00:00:00 2001 From: Natty Date: Sun, 17 Nov 2024 03:55:26 +0100 Subject: [PATCH] Fixed RPC client retrieval and made delivery retries depend on Magnetar errors --- packages/backend/src/mag/rpc-client.ts | 144 ++++++++++-------- .../backend/src/queue/processors/deliver.ts | 13 +- .../src/server/api/endpoints/ap/show.ts | 13 +- 3 files changed, 95 insertions(+), 75 deletions(-) diff --git a/packages/backend/src/mag/rpc-client.ts b/packages/backend/src/mag/rpc-client.ts index 876587b060..6e6ab0c762 100644 --- a/packages/backend/src/mag/rpc-client.ts +++ b/packages/backend/src/mag/rpc-client.ts @@ -12,84 +12,94 @@ let serial: bigint = BigInt(0); const logger = new Logger("RpcLog"); -function getRpcClient(): Socket { +async function getRpcClient(): Promise { if (client != null) { return client; } - const [host, portStr] = config.rpcHost.trim().split(/:(?=[0-9]+$)/, 2); - const port = parseInt(portStr); - client = new Socket(); - const reconnectWithBackoff = () => { - setTimeout(() => { - client!.connect(port, host); - }, 1000 * (1 + Math.pow(1.5, exponentialBackoff))); - }; + return new Promise((resolve) => { + const [host, portStr] = config.rpcHost.trim().split(/:(?=[0-9]+$)/, 2); + const port = parseInt(portStr); - const buf = new SmartBuffer({ - encoding: "binary" + const createClient = (client: Socket) => { + client.setKeepAlive(true); + + const buf = new SmartBuffer({ + encoding: "binary" + }); + + client.on("connect", () => { + exponentialBackoff = 0; + resolve(client!); + }) + + client.on("error", e => { + logger.warn(`RPC connection error: ${e}`); + client!.removeAllListeners("data"); + buf.clear(); + exponentialBackoff = Math.min(12, exponentialBackoff + 1); + reconnectWithBackoff(); + }) + + client.on("close", () => { + client!.removeAllListeners("data"); + buf.clear(); + exponentialBackoff = Math.min(12, exponentialBackoff + 1); + reconnectWithBackoff(); + }); + + client.on("data", (recv) => { + logger.info(`data: ${recv}`); + buf.writeBuffer(recv); + + if (buf.length < 1 + 4 + 8) { + return; + } + + const header = buf.readUInt8(); + if (header != 77) { + logger.error(`Invalid header: ${header}`); + return; + } + + const serial = buf.readBigUInt64BE(); + + const dataLen = buf.readUInt32BE(); + if (buf.remaining() < dataLen) { + buf.readOffset = 0; + return; + } + const data = buf.readBuffer(dataLen); + const dataDecoded: any = decode(data); + + // Move the rest of the data to the beginning of the buffer + const rest = buf.readBuffer(); + buf.clear(); + buf.writeBuffer(rest); + + client!.emit(`mag-data:${serial}`, dataDecoded); + }); + + client.connect(port, host); + }; + + const reconnectWithBackoff = () => { + client!.destroy(); + client = new Socket(); + + setTimeout(() => createClient(client!), 1000 * (1 + Math.pow(1.5, exponentialBackoff))); + }; + + client = new Socket(); + createClient(client); }); - - client.on("connect", () => { - exponentialBackoff = 0; - }) - - client.on("error", e => { - logger.warn(`RPC connection error: ${e}`); - client!.removeAllListeners("data"); - buf.clear(); - exponentialBackoff = Math.min(12, exponentialBackoff + 1); - reconnectWithBackoff(); - }) - - client.on("close", () => { - client!.removeAllListeners("data"); - buf.clear(); - exponentialBackoff = Math.min(12, exponentialBackoff + 1); - reconnectWithBackoff(); - }); - - client.on("data", (recv) => { - buf.writeBuffer(recv); - - if (buf.length < 1 + 4 + 8) { - return; - } - - const header = buf.readUInt8(); - if (header != 77) { - logger.error(`Invalid header: ${header}`); - return; - } - - const serial = buf.readBigUInt64BE(); - - const dataLen = buf.readUInt32BE(); - if (buf.remaining() < dataLen) { - buf.readOffset = 0; - return; - } - const data = buf.readBuffer(dataLen); - const dataDecoded: any = decode(data); - - // Move the rest of the data to the beginning of the buffer - const rest = buf.readBuffer(); - buf.clear(); - buf.writeBuffer(rest); - - client!.emit(`mag-data:${serial}`, dataDecoded); - }); - - client.connect(port, host); - - return client; } async function rpcCall(method: string, data: D): Promise { const currentSerial = BigInt(new Date().getTime()) * BigInt(100000) + serial; serial = serial + BigInt(1); - return new Promise((resolve, reject) => { + return new Promise(async (resolve, reject) => { const header = new Uint8Array([77]); const textEncoder = new TextEncoder(); const methodBuf = textEncoder.encode(method); @@ -108,7 +118,7 @@ async function rpcCall(method: string, data: D): Promise { packetDataView.setUint32(header.length + serialLength, methodBuf.length); packetDataView.setUint32(header.length + serialLength + sizeLength + methodBuf.length, dataBuf.length); - const client = getRpcClient(); + const client = await getRpcClient(); const timeout = setTimeout(() => { client.off("close", clearAndReject); diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts index ac2e52b03c..5f71075aab 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/processors/deliver.ts @@ -66,18 +66,17 @@ export default async (job: Bull.Job) => { federationChart.deliverd(i.host, false); }); - if (res instanceof StatusError) { + if (typeof res === "object" && res != null && "retry_class" in res) { + const err = res as { kind: any, message: string, retry_class: "RetriableLater" | "Retriable" | "Unrecovarable"} + // 4xx - if (res.isClientError) { - // HTTPステータスコード4xxはクライアントエラーであり、それはつまり - // 何回再送しても成功することはないということなのでエラーにはしないでおく - return `${res.statusCode} ${res.statusMessage}`; + if (err.retry_class == "Unrecovarable") { + return `${err.message} ${JSON.stringify(err.kind)}`; } // 5xx etc. - throw new Error(`${res.statusCode} ${res.statusMessage}`); + throw new Error(`${err.retry_class} ${err.kind}: ${err.message}`); } else { - // DNS error, socket error, timeout ... throw res; } } diff --git a/packages/backend/src/server/api/endpoints/ap/show.ts b/packages/backend/src/server/api/endpoints/ap/show.ts index f09a717fc2..16408ea377 100644 --- a/packages/backend/src/server/api/endpoints/ap/show.ts +++ b/packages/backend/src/server/api/endpoints/ap/show.ts @@ -32,6 +32,11 @@ export const meta = { code: "NO_SUCH_OBJECT", id: "dc94d745-1262-4e63-a17d-fecaa57efc82", }, + failedToFetch: { + message: "Failed to fetch object.", + code: "FAILED_TO_FETCH_OBJECT", + id: "70cfecd4-dc0f-43c1-8d89-2c6db2cdbff5", + }, }, res: { @@ -127,7 +132,13 @@ async function fetchAny( // fetching Object once from remote const resolver = new Resolver(undefined, me ?? undefined); - const object = await resolver.resolve(uri); + + let object; + try { + object = await resolver.resolve(uri); + } catch (e) { + throw new ApiError(meta.errors.failedToFetch); + } // /@user If a URI other than the id is specified, // the URI is determined here