Refactored RPC client and fixed POST body
ci/woodpecker/tag/ociImageTag Pipeline was successful Details

This commit is contained in:
Natty 2024-11-16 19:42:58 +01:00
parent 58266bcb7f
commit 0560a3d876
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
2 changed files with 50 additions and 35 deletions

View File

@ -76,6 +76,7 @@ function getRpcClient(): Socket {
const rest = buf.readBuffer(); const rest = buf.readBuffer();
buf.clear(); buf.clear();
buf.writeBuffer(rest); buf.writeBuffer(rest);
client!.emit(`mag-data:${serial}`, dataDecoded); client!.emit(`mag-data:${serial}`, dataDecoded);
}); });
@ -85,56 +86,72 @@ function getRpcClient(): Socket {
} }
async function rpcCall<D, T>(method: string, data: D): Promise<T> { async function rpcCall<D, T>(method: string, data: D): Promise<T> {
const currentSerial = serial; const currentSerial = BigInt(new Date().getTime()) * BigInt(100000) + serial;
serial = serial + BigInt(1); serial = serial + BigInt(1);
const header = new Uint8Array([77]); return new Promise((resolve, reject) => {
const textEncoder = new TextEncoder(); const header = new Uint8Array([77]);
const methodBuf = textEncoder.encode(method); const textEncoder = new TextEncoder();
const dataBuf = encode(data); const methodBuf = textEncoder.encode(method);
const dataBuf = encode(data);
const serialLength = 8; const serialLength = 8;
const sizeLength = 4; const sizeLength = 4;
const packetBuf = new Uint8Array(header.length + serialLength + sizeLength + methodBuf.length + sizeLength + dataBuf.length); const packetBuf = new Uint8Array(header.length + serialLength + sizeLength + methodBuf.length + sizeLength + dataBuf.length);
packetBuf.set(header, 0); packetBuf.set(header, 0);
packetBuf.set(methodBuf, header.length + serialLength + sizeLength) packetBuf.set(methodBuf, header.length + serialLength + sizeLength)
packetBuf.set(dataBuf, header.length + serialLength + sizeLength + methodBuf.length + sizeLength); packetBuf.set(dataBuf, header.length + serialLength + sizeLength + methodBuf.length + sizeLength);
const packetDataView = new DataView(packetBuf.buffer); const packetDataView = new DataView(packetBuf.buffer);
packetDataView.setBigUint64(header.length, currentSerial); packetDataView.setBigUint64(header.length, currentSerial);
packetDataView.setUint32(header.length + serialLength, methodBuf.length); packetDataView.setUint32(header.length + serialLength, methodBuf.length);
packetDataView.setUint32(header.length + serialLength + sizeLength + methodBuf.length, dataBuf.length); packetDataView.setUint32(header.length + serialLength + sizeLength + methodBuf.length, dataBuf.length);
const client = getRpcClient(); const client = getRpcClient();
client.write(packetBuf);
const fut = new Promise((resolve, reject) => { const timeout = setTimeout(() => {
client.once(`mag-data:${currentSerial}`, resolve); client.off("close", clearAndReject);
client.once("close", reject); client.off("error", clearAndReject);
client.once("error", reject); reject(new Error("Promise timeout"))
}, 60000);
const clearAndReject = (e: any) => {
clearTimeout(timeout);
client.removeAllListeners(`mag-data:${currentSerial}`);
reject(e);
};
client.once(`mag-data:${currentSerial}`, (data: { success: false, data: any } | { success: true, data: T }) => {
clearTimeout(timeout);
client.off("close", clearAndReject);
client.off("error", clearAndReject);
if (!data.success) {
reject(data.data);
}
resolve(data.data);
});
client.prependOnceListener("close", clearAndReject);
client.prependOnceListener("error", clearAndReject);
client.write(packetBuf);
}); });
const result = await fut as { success: false, data: any } | { success: true, data: T };
if (!result.success) {
throw result.data;
}
return result.data;
} }
export async function magApGet(userId: string, url: string): Promise<IObject> { export async function magApGet(userId: string, url: string): Promise<IObject> {
logger.info(`AP GET to: ${url}`); logger.debug(`AP GET to: ${url}`);
return await rpcCall("/ap/get", { return await rpcCall("/ap/get", {
user_id: userId, user_id: userId,
url url
}); });
} }
export async function magApPost(userId: string, url: string, body: string): Promise<IObject> { export async function magApPost(userId: string, url: string, body: any): Promise<IObject> {
logger.info(`AP POST to: ${url}`); logger.debug(`AP POST to: ${url}`);
return await rpcCall("/ap/post", { return await rpcCall("/ap/post", {
user_id: userId, user_id: userId,
url, url,

View File

@ -1,9 +1,7 @@
import type {User} from "@/models/entities/user.js"; import type {User} from "@/models/entities/user.js";
import {magApGet, magApPost} from "@/mag/rpc-client.js"; import {magApGet, magApPost} from "@/mag/rpc-client.js";
export default async (user: { id: User["id"] }, url: string, object: any) => { export default async (user: { id: User["id"] }, url: string, body: any) => {
const body = JSON.stringify(object);
return await magApPost(user.id, url, body); return await magApPost(user.id, url, body);
}; };