Removed unsafe RPC reads
ci/woodpecker/push/ociImagePush Pipeline was successful
Details
ci/woodpecker/push/ociImagePush Pipeline was successful
Details
This commit is contained in:
parent
531b10ef85
commit
51375e2ded
|
@ -373,8 +373,6 @@ impl RpcCallDecoder {
|
||||||
let listeners = self.listeners.clone();
|
let listeners = self.listeners.clone();
|
||||||
|
|
||||||
async_stream::try_stream! {
|
async_stream::try_stream! {
|
||||||
let mut name_buf = Vec::new();
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
let mut messages = 0usize;
|
let mut messages = 0usize;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -409,22 +407,8 @@ impl RpcCallDecoder {
|
||||||
)).into_diagnostic();
|
)).into_diagnostic();
|
||||||
}
|
}
|
||||||
|
|
||||||
if name_len > name_buf.capacity() {
|
let mut name_buf = vec![0; name_len];
|
||||||
name_buf.reserve(name_len - name_buf.capacity());
|
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
|
||||||
}
|
|
||||||
|
|
||||||
// SAFETY: We use ReadBuf which expects uninit anyway
|
|
||||||
unsafe {
|
|
||||||
name_buf.set_len(name_len);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut name_buf_write = ReadBuf::uninit(&mut name_buf);
|
|
||||||
while name_buf_write.has_remaining_mut() {
|
|
||||||
buf_read
|
|
||||||
.read_buf(&mut name_buf_write)
|
|
||||||
.await
|
|
||||||
.into_diagnostic()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
|
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
|
||||||
|
|
||||||
|
@ -435,31 +419,20 @@ impl RpcCallDecoder {
|
||||||
)).into_diagnostic();
|
)).into_diagnostic();
|
||||||
}
|
}
|
||||||
|
|
||||||
if payload_len > buf.capacity() {
|
let mut payload_buf = vec![0; payload_len];
|
||||||
buf.reserve(payload_len - buf.capacity());
|
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
|
||||||
}
|
|
||||||
|
|
||||||
// SAFETY: We use ReadBuf which expects uninit anyway
|
miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf)))
|
||||||
unsafe {
|
|
||||||
buf.set_len(payload_len);
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut buf_write = ReadBuf::uninit(&mut buf);
|
|
||||||
while buf_write.has_remaining_mut() {
|
|
||||||
buf_read.read_buf(&mut buf_write).await.into_diagnostic()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
miette::Result::<_>::Ok(Some((serial, name_buf_write, buf_write)))
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some((serial, name_buf_write, payload)) = select! {
|
let Some((serial, name_buf, payload_buf)) = select! {
|
||||||
read_result = read_fut => read_result,
|
read_result = read_fut => read_result,
|
||||||
_ = &mut cancel => { break; }
|
_ = &mut cancel => { break; }
|
||||||
}? else {
|
}? else {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
let name = std::str::from_utf8(name_buf_write.filled()).into_diagnostic()?;
|
let name = std::str::from_utf8(&name_buf).into_diagnostic()?;
|
||||||
|
|
||||||
let decoder = decoders
|
let decoder = decoders
|
||||||
.get(name)
|
.get(name)
|
||||||
|
@ -470,7 +443,7 @@ impl RpcCallDecoder {
|
||||||
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
|
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
let packet = match decoder(payload.filled()) {
|
let packet = match decoder(&payload_buf) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to parse packet: {e}");
|
error!("Failed to parse packet: {e}");
|
||||||
|
|
Loading…
Reference in New Issue