Drop RPC connections receiving bogus data
ci/woodpecker/push/ociImagePush Pipeline was successful Details

This commit is contained in:
Natty 2024-11-16 19:29:46 +01:00
parent 56f80f290a
commit 2acc41587a
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
1 changed files with 14 additions and 26 deletions

View File

@ -1,6 +1,6 @@
use crate::service::MagnetarService;
use futures::{FutureExt, Stream, StreamExt};
use miette::{miette, IntoDiagnostic};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use miette::{miette, Error, IntoDiagnostic};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::any::Any;
@ -218,21 +218,14 @@ impl MagRpc {
let fut = async move {
let src = rx_dec
.stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move {
if let Err(e) = &r {
error!("Stream decoding error: {e}");
}
r.ok()
})
.map(process(context))
.buffer_unordered(100)
.map_ok(process(context))
.try_buffer_unordered(100)
.boxed();
futures::pin_mut!(src);
while let Some(result) = src.next().await {
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
while let Some(result) = src.try_next().await? {
let Some((serial, RpcResponse(bytes))) = result else {
continue;
};
@ -315,21 +308,14 @@ impl MagRpc {
let fut = async move {
let src = rx_dec
.stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move {
if let Err(e) = &r {
error!("Stream decoding error: {e}");
}
r.ok()
})
.map(process(context))
.buffer_unordered(100)
.map_ok(process(context))
.try_buffer_unordered(100)
.boxed();
futures::pin_mut!(src);
while let Some(result) = src.next().await {
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
while let Some(result) = src.try_next().await? {
let Some((serial, RpcResponse(bytes))) = result else {
continue;
};
@ -370,11 +356,13 @@ fn process(
) -> impl Fn(
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
) -> Pin<
Box<dyn Future<Output = Result<Option<(u64, RpcResponse)>, JoinError>> + Send + 'static>,
Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
> {
move |(serial, payload, listener)| {
let ctx = context.clone();
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }).boxed()
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) })
.map_err(|e| miette!(e))
.boxed()
}
}