diff --git a/packages/backend/src/mag/rpc-client.ts b/packages/backend/src/mag/rpc-client.ts index 3276b1aef7..876587b060 100644 --- a/packages/backend/src/mag/rpc-client.ts +++ b/packages/backend/src/mag/rpc-client.ts @@ -76,6 +76,7 @@ function getRpcClient(): Socket { const rest = buf.readBuffer(); buf.clear(); buf.writeBuffer(rest); + client!.emit(`mag-data:${serial}`, dataDecoded); }); @@ -85,56 +86,72 @@ function getRpcClient(): Socket { } async function rpcCall(method: string, data: D): Promise { - const currentSerial = serial; + const currentSerial = BigInt(new Date().getTime()) * BigInt(100000) + serial; serial = serial + BigInt(1); - const header = new Uint8Array([77]); - const textEncoder = new TextEncoder(); - const methodBuf = textEncoder.encode(method); - const dataBuf = encode(data); + return new Promise((resolve, reject) => { + const header = new Uint8Array([77]); + const textEncoder = new TextEncoder(); + const methodBuf = textEncoder.encode(method); + const dataBuf = encode(data); - const serialLength = 8; - const sizeLength = 4; + const serialLength = 8; + const sizeLength = 4; - const packetBuf = new Uint8Array(header.length + serialLength + sizeLength + methodBuf.length + sizeLength + dataBuf.length); - packetBuf.set(header, 0); - packetBuf.set(methodBuf, header.length + serialLength + sizeLength) - packetBuf.set(dataBuf, header.length + serialLength + sizeLength + methodBuf.length + sizeLength); + const packetBuf = new Uint8Array(header.length + serialLength + sizeLength + methodBuf.length + sizeLength + dataBuf.length); + packetBuf.set(header, 0); + packetBuf.set(methodBuf, header.length + serialLength + sizeLength) + packetBuf.set(dataBuf, header.length + serialLength + sizeLength + methodBuf.length + sizeLength); - const packetDataView = new DataView(packetBuf.buffer); - packetDataView.setBigUint64(header.length, currentSerial); - packetDataView.setUint32(header.length + serialLength, methodBuf.length); - packetDataView.setUint32(header.length + serialLength + sizeLength + methodBuf.length, dataBuf.length); + const packetDataView = new DataView(packetBuf.buffer); + packetDataView.setBigUint64(header.length, currentSerial); + packetDataView.setUint32(header.length + serialLength, methodBuf.length); + packetDataView.setUint32(header.length + serialLength + sizeLength + methodBuf.length, dataBuf.length); - const client = getRpcClient(); - client.write(packetBuf); + const client = getRpcClient(); - const fut = new Promise((resolve, reject) => { - client.once(`mag-data:${currentSerial}`, resolve); - client.once("close", reject); - client.once("error", reject); + const timeout = setTimeout(() => { + client.off("close", clearAndReject); + client.off("error", clearAndReject); + reject(new Error("Promise timeout")) + }, 60000); + + const clearAndReject = (e: any) => { + clearTimeout(timeout); + client.removeAllListeners(`mag-data:${currentSerial}`); + reject(e); + }; + + client.once(`mag-data:${currentSerial}`, (data: { success: false, data: any } | { success: true, data: T }) => { + clearTimeout(timeout); + client.off("close", clearAndReject); + client.off("error", clearAndReject); + + if (!data.success) { + reject(data.data); + } + + resolve(data.data); + }); + + client.prependOnceListener("close", clearAndReject); + client.prependOnceListener("error", clearAndReject); + + client.write(packetBuf); }); - - const result = await fut as { success: false, data: any } | { success: true, data: T }; - - if (!result.success) { - throw result.data; - } - - return result.data; } export async function magApGet(userId: string, url: string): Promise { - logger.info(`AP GET to: ${url}`); + logger.debug(`AP GET to: ${url}`); return await rpcCall("/ap/get", { user_id: userId, url }); } -export async function magApPost(userId: string, url: string, body: string): Promise { - logger.info(`AP POST to: ${url}`); +export async function magApPost(userId: string, url: string, body: any): Promise { + logger.debug(`AP POST to: ${url}`); return await rpcCall("/ap/post", { user_id: userId, url, diff --git a/packages/backend/src/remote/activitypub/request.ts b/packages/backend/src/remote/activitypub/request.ts index 20b5651849..a33e5b2c02 100644 --- a/packages/backend/src/remote/activitypub/request.ts +++ b/packages/backend/src/remote/activitypub/request.ts @@ -1,9 +1,7 @@ import type {User} from "@/models/entities/user.js"; import {magApGet, magApPost} from "@/mag/rpc-client.js"; -export default async (user: { id: User["id"] }, url: string, object: any) => { - const body = JSON.stringify(object); - +export default async (user: { id: User["id"] }, url: string, body: any) => { return await magApPost(user.id, url, body); };