Fixed RPC client retrieval and made delivery retries depend on Magnetar errors
ci/woodpecker/tag/ociImageTag Pipeline was successful Details

This commit is contained in:
Natty 2024-11-17 03:55:26 +01:00
parent 0560a3d876
commit 367257c3ad
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
3 changed files with 95 additions and 75 deletions

View File

@ -12,84 +12,94 @@ let serial: bigint = BigInt(0);
const logger = new Logger("RpcLog");
function getRpcClient(): Socket {
async function getRpcClient(): Promise<Socket> {
if (client != null) {
return client;
}
const [host, portStr] = config.rpcHost.trim().split(/:(?=[0-9]+$)/, 2);
const port = parseInt(portStr);
client = new Socket();
const reconnectWithBackoff = () => {
setTimeout(() => {
client!.connect(port, host);
}, 1000 * (1 + Math.pow(1.5, exponentialBackoff)));
};
return new Promise((resolve) => {
const [host, portStr] = config.rpcHost.trim().split(/:(?=[0-9]+$)/, 2);
const port = parseInt(portStr);
const buf = new SmartBuffer({
encoding: "binary"
const createClient = (client: Socket) => {
client.setKeepAlive(true);
const buf = new SmartBuffer({
encoding: "binary"
});
client.on("connect", () => {
exponentialBackoff = 0;
resolve(client!);
})
client.on("error", e => {
logger.warn(`RPC connection error: ${e}`);
client!.removeAllListeners("data");
buf.clear();
exponentialBackoff = Math.min(12, exponentialBackoff + 1);
reconnectWithBackoff();
})
client.on("close", () => {
client!.removeAllListeners("data");
buf.clear();
exponentialBackoff = Math.min(12, exponentialBackoff + 1);
reconnectWithBackoff();
});
client.on("data", (recv) => {
logger.info(`data: ${recv}`);
buf.writeBuffer(recv);
if (buf.length < 1 + 4 + 8) {
return;
}
const header = buf.readUInt8();
if (header != 77) {
logger.error(`Invalid header: ${header}`);
return;
}
const serial = buf.readBigUInt64BE();
const dataLen = buf.readUInt32BE();
if (buf.remaining() < dataLen) {
buf.readOffset = 0;
return;
}
const data = buf.readBuffer(dataLen);
const dataDecoded: any = decode(data);
// Move the rest of the data to the beginning of the buffer
const rest = buf.readBuffer();
buf.clear();
buf.writeBuffer(rest);
client!.emit(`mag-data:${serial}`, dataDecoded);
});
client.connect(port, host);
};
const reconnectWithBackoff = () => {
client!.destroy();
client = new Socket();
setTimeout(() => createClient(client!), 1000 * (1 + Math.pow(1.5, exponentialBackoff)));
};
client = new Socket();
createClient(client);
});
client.on("connect", () => {
exponentialBackoff = 0;
})
client.on("error", e => {
logger.warn(`RPC connection error: ${e}`);
client!.removeAllListeners("data");
buf.clear();
exponentialBackoff = Math.min(12, exponentialBackoff + 1);
reconnectWithBackoff();
})
client.on("close", () => {
client!.removeAllListeners("data");
buf.clear();
exponentialBackoff = Math.min(12, exponentialBackoff + 1);
reconnectWithBackoff();
});
client.on("data", (recv) => {
buf.writeBuffer(recv);
if (buf.length < 1 + 4 + 8) {
return;
}
const header = buf.readUInt8();
if (header != 77) {
logger.error(`Invalid header: ${header}`);
return;
}
const serial = buf.readBigUInt64BE();
const dataLen = buf.readUInt32BE();
if (buf.remaining() < dataLen) {
buf.readOffset = 0;
return;
}
const data = buf.readBuffer(dataLen);
const dataDecoded: any = decode(data);
// Move the rest of the data to the beginning of the buffer
const rest = buf.readBuffer();
buf.clear();
buf.writeBuffer(rest);
client!.emit(`mag-data:${serial}`, dataDecoded);
});
client.connect(port, host);
return client;
}
async function rpcCall<D, T>(method: string, data: D): Promise<T> {
const currentSerial = BigInt(new Date().getTime()) * BigInt(100000) + serial;
serial = serial + BigInt(1);
return new Promise((resolve, reject) => {
return new Promise(async (resolve, reject) => {
const header = new Uint8Array([77]);
const textEncoder = new TextEncoder();
const methodBuf = textEncoder.encode(method);
@ -108,7 +118,7 @@ async function rpcCall<D, T>(method: string, data: D): Promise<T> {
packetDataView.setUint32(header.length + serialLength, methodBuf.length);
packetDataView.setUint32(header.length + serialLength + sizeLength + methodBuf.length, dataBuf.length);
const client = getRpcClient();
const client = await getRpcClient();
const timeout = setTimeout(() => {
client.off("close", clearAndReject);

View File

@ -66,18 +66,17 @@ export default async (job: Bull.Job<DeliverJobData>) => {
federationChart.deliverd(i.host, false);
});
if (res instanceof StatusError) {
if (typeof res === "object" && res != null && "retry_class" in res) {
const err = res as { kind: any, message: string, retry_class: "RetriableLater" | "Retriable" | "Unrecovarable"}
// 4xx
if (res.isClientError) {
// HTTPステータスコード4xxはクライアントエラーであり、それはつまり
// 何回再送しても成功することはないということなのでエラーにはしないでおく
return `${res.statusCode} ${res.statusMessage}`;
if (err.retry_class == "Unrecovarable") {
return `${err.message} ${JSON.stringify(err.kind)}`;
}
// 5xx etc.
throw new Error(`${res.statusCode} ${res.statusMessage}`);
throw new Error(`${err.retry_class} ${err.kind}: ${err.message}`);
} else {
// DNS error, socket error, timeout ...
throw res;
}
}

View File

@ -32,6 +32,11 @@ export const meta = {
code: "NO_SUCH_OBJECT",
id: "dc94d745-1262-4e63-a17d-fecaa57efc82",
},
failedToFetch: {
message: "Failed to fetch object.",
code: "FAILED_TO_FETCH_OBJECT",
id: "70cfecd4-dc0f-43c1-8d89-2c6db2cdbff5",
},
},
res: {
@ -127,7 +132,13 @@ async function fetchAny(
// fetching Object once from remote
const resolver = new Resolver(undefined, me ?? undefined);
const object = await resolver.resolve(uri);
let object;
try {
object = await resolver.resolve(uri);
} catch (e) {
throw new ApiError(meta.errors.failedToFetch);
}
// /@user If a URI other than the id is specified,
// the URI is determined here