From 51375e2ded66b56b0cbe72f8b16c445c9d33eed3 Mon Sep 17 00:00:00 2001 From: Natty Date: Fri, 15 Nov 2024 23:05:19 +0100 Subject: [PATCH] Removed unsafe RPC reads --- src/rpc_v1/proto.rs | 43 ++++++++----------------------------------- 1 file changed, 8 insertions(+), 35 deletions(-) diff --git a/src/rpc_v1/proto.rs b/src/rpc_v1/proto.rs index 949c646..d1fc053 100644 --- a/src/rpc_v1/proto.rs +++ b/src/rpc_v1/proto.rs @@ -373,8 +373,6 @@ impl RpcCallDecoder { let listeners = self.listeners.clone(); async_stream::try_stream! { - let mut name_buf = Vec::new(); - let mut buf = Vec::new(); let mut messages = 0usize; loop { @@ -409,22 +407,8 @@ impl RpcCallDecoder { )).into_diagnostic(); } - if name_len > name_buf.capacity() { - name_buf.reserve(name_len - name_buf.capacity()); - } - - // SAFETY: We use ReadBuf which expects uninit anyway - unsafe { - name_buf.set_len(name_len); - } - - let mut name_buf_write = ReadBuf::uninit(&mut name_buf); - while name_buf_write.has_remaining_mut() { - buf_read - .read_buf(&mut name_buf_write) - .await - .into_diagnostic()?; - } + let mut name_buf = vec![0; name_len]; + buf_read.read_exact(&mut name_buf).await.into_diagnostic()?; let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize; @@ -435,31 +419,20 @@ impl RpcCallDecoder { )).into_diagnostic(); } - if payload_len > buf.capacity() { - buf.reserve(payload_len - buf.capacity()); - } + let mut payload_buf = vec![0; payload_len]; + buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?; - // SAFETY: We use ReadBuf which expects uninit anyway - unsafe { - buf.set_len(payload_len); - } - - let mut buf_write = ReadBuf::uninit(&mut buf); - while buf_write.has_remaining_mut() { - buf_read.read_buf(&mut buf_write).await.into_diagnostic()?; - } - - miette::Result::<_>::Ok(Some((serial, name_buf_write, buf_write))) + miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf))) }; - let Some((serial, name_buf_write, payload)) = select! { + let Some((serial, name_buf, payload_buf)) = select! { read_result = read_fut => read_result, _ = &mut cancel => { break; } }? else { break; }; - let name = std::str::from_utf8(name_buf_write.filled()).into_diagnostic()?; + let name = std::str::from_utf8(&name_buf).into_diagnostic()?; let decoder = decoders .get(name) @@ -470,7 +443,7 @@ impl RpcCallDecoder { .ok_or_else(|| miette!("No such RPC call name: {}", name))? .clone(); - let packet = match decoder(payload.filled()) { + let packet = match decoder(&payload_buf) { Ok(p) => p, Err(e) => { error!("Failed to parse packet: {e}");