From a060a1ad290be6daf3d4bc9f9f39a1a7ac786b7e Mon Sep 17 00:00:00 2001 From: Natty Date: Tue, 24 Dec 2024 17:05:41 +0100 Subject: [PATCH] Repeat buffer reads as long as there is data --- packages/backend/src/mag/rpc-client.ts | 59 ++++++++++++++------------ 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/packages/backend/src/mag/rpc-client.ts b/packages/backend/src/mag/rpc-client.ts index 002eb73c7e..eb7e4a66d0 100644 --- a/packages/backend/src/mag/rpc-client.ts +++ b/packages/backend/src/mag/rpc-client.ts @@ -60,37 +60,39 @@ async function getRpcClient(): Promise { client.on("data", (recv) => { buf.writeBuffer(recv); - if (buf.length < 1 + 4 + 8) { - return; - } - - const header = String.fromCharCode(buf.readUInt8()); - if (!["M", "F"].includes(header)) { - logger.error(`Invalid header: ${header}`); - return; - } - - const serial = buf.readBigUInt64BE(); - - let dataDecoded; - if (header === "M") { - const dataLen = buf.readUInt32BE(); - if (buf.remaining() < dataLen) { - buf.readOffset = 0; + while (true) { + if (buf.length < 1 + 4 + 8) { return; } - const data = buf.readBuffer(dataLen); - dataDecoded = decode(data); - } else { - dataDecoded = {success: false, data: "RPC returned failure"}; + + const header = String.fromCharCode(buf.readUInt8()); + if (!["M", "F"].includes(header)) { + logger.error(`Invalid header: ${header}`); + return; + } + + const serial = buf.readBigUInt64BE(); + + let dataDecoded; + if (header === "M") { + const dataLen = buf.readUInt32BE(); + if (buf.remaining() < dataLen) { + buf.readOffset = 0; + return; + } + const data = buf.readBuffer(dataLen); + dataDecoded = decode(data); + } else { + dataDecoded = {success: false, data: "RPC returned failure"}; + } + + // Move the rest of the data to the beginning of the buffer + const rest = buf.readBuffer(); + buf.clear(); + buf.writeBuffer(rest); + + client!.emit(`mag-data:${serial}`, dataDecoded); } - - // Move the rest of the data to the beginning of the buffer - const rest = buf.readBuffer(); - buf.clear(); - buf.writeBuffer(rest); - - client!.emit(`mag-data:${serial}`, dataDecoded); }); client.connect(port, host); @@ -154,6 +156,7 @@ async function rpcCall(method: string, data: D): Promise { if (!data.success) { reject(data.data); + return; } resolve(data.data);