Recycle the read buffers for RPC data

This commit is contained in:
Natty 2024-11-16 17:43:57 +01:00
parent 52cced9537
commit d598387795
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
1 changed files with 8 additions and 6 deletions

View File

@ -396,6 +396,8 @@ impl RpcCallDecoder {
async_stream::try_stream! { async_stream::try_stream! {
let mut messages = 0usize; let mut messages = 0usize;
let mut name_buf = vec![0; 128];
let mut payload_buf = vec![0; 1024];
loop { loop {
let read_fut = async { let read_fut = async {
@ -429,7 +431,7 @@ impl RpcCallDecoder {
)).into_diagnostic(); )).into_diagnostic();
} }
let mut name_buf = vec![0; name_len]; name_buf.resize(name_len, 0);
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?; buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize; let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
@ -441,20 +443,20 @@ impl RpcCallDecoder {
)).into_diagnostic(); )).into_diagnostic();
} }
let mut payload_buf = vec![0; payload_len]; payload_buf.resize(payload_len, 0);
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?; buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf))) miette::Result::<_>::Ok(Some((serial, &name_buf[..], &payload_buf[..])))
}; };
let Some((serial, name_buf, payload_buf)) = select! { let Some((serial, name_slice, payload_slice)) = select! {
read_result = read_fut => read_result, read_result = read_fut => read_result,
_ = &mut cancel => { break; } _ = &mut cancel => { break; }
}? else { }? else {
break; break;
}; };
let name = std::str::from_utf8(&name_buf).into_diagnostic()?; let name = std::str::from_utf8(name_slice).into_diagnostic()?;
let decoder = decoders let decoder = decoders
.get(name) .get(name)
@ -465,7 +467,7 @@ impl RpcCallDecoder {
.ok_or_else(|| miette!("No such RPC call name: {}", name))? .ok_or_else(|| miette!("No such RPC call name: {}", name))?
.clone(); .clone();
let packet = match decoder(&payload_buf) { let packet = match decoder(payload_slice) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
error!("Failed to parse packet: {e}"); error!("Failed to parse packet: {e}");