Allow concurrent RPC calls on the server
ci/woodpecker/push/ociImagePush Pipeline was successful Details

This commit is contained in:
Natty 2024-11-16 03:34:43 +01:00
parent c10594bf7e
commit 52cced9537
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
1 changed files with 30 additions and 11 deletions

View File

@ -14,7 +14,7 @@ use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, UnixSocket}; use tokio::net::{TcpListener, UnixSocket};
use tokio::select; use tokio::select;
use tokio::task::JoinSet; use tokio::task::{JoinError, JoinSet};
use tracing::{debug, error, info, warn, Instrument}; use tracing::{debug, error, info, warn, Instrument};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -225,14 +225,17 @@ impl MagRpc {
r.ok() r.ok()
}) })
.filter_map(|(serial, payload, listener)| { .map(process(context))
let ctx = context.clone(); .buffer_unordered(100)
async move { Some((serial, listener(ctx, payload).await?)) } .boxed();
});
futures::pin_mut!(src); futures::pin_mut!(src);
while let Some((serial, RpcResponse(bytes))) = src.next().await { while let Some(result) = src.next().await {
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
continue;
};
write_half.write_u8(b'M').await.into_diagnostic()?; write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?; write_half.write_u64(serial).await.into_diagnostic()?;
write_half write_half
@ -319,14 +322,17 @@ impl MagRpc {
r.ok() r.ok()
}) })
.filter_map(|(serial, payload, listener)| { .map(process(context))
let ctx = context.clone(); .buffer_unordered(100)
async move { Some((serial, listener(ctx, payload).await?)) } .boxed();
});
futures::pin_mut!(src); futures::pin_mut!(src);
while let Some((serial, RpcResponse(bytes))) = src.next().await { while let Some(result) = src.next().await {
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
continue;
};
write_half.write_u8(b'M').await.into_diagnostic()?; write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?; write_half.write_u64(serial).await.into_diagnostic()?;
write_half write_half
@ -359,6 +365,19 @@ impl MagRpc {
} }
} }
fn process(
context: Arc<MagnetarService>,
) -> impl Fn(
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
) -> Pin<
Box<dyn Future<Output = Result<Option<(u64, RpcResponse)>, JoinError>> + Send + 'static>,
> {
move |(serial, payload, listener)| {
let ctx = context.clone();
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }).boxed()
}
}
#[derive(Clone)] #[derive(Clone)]
struct RpcCallDecoder { struct RpcCallDecoder {
listeners: Arc<HashMap<String, Arc<MagRpcHandlerMapped>>>, listeners: Arc<HashMap<String, Arc<MagRpcHandlerMapped>>>,