WIP: Make the MMM parser run in linear time #14

Draft
natty wants to merge 6 commits from main into mmm
12 changed files with 176 additions and 155 deletions

4
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 4
[[package]] [[package]]
name = "Inflector" name = "Inflector"
@ -2028,6 +2028,7 @@ dependencies = [
"miette 7.2.0", "miette 7.2.0",
"percent-encoding", "percent-encoding",
"quick-xml", "quick-xml",
"rand",
"reqwest", "reqwest",
"rsa", "rsa",
"serde", "serde",
@ -4197,6 +4198,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]

View File

@ -62,6 +62,7 @@ quick-xml = "0.36"
redis = "0.26" redis = "0.26"
regex = "1.9" regex = "1.9"
rmp-serde = "1.3" rmp-serde = "1.3"
rand = "0.8"
rsa = "0.9" rsa = "0.9"
reqwest = "0.12" reqwest = "0.12"
sea-orm = "1" sea-orm = "1"
@ -112,7 +113,7 @@ headers = { workspace = true }
hyper = { workspace = true, features = ["full"] } hyper = { workspace = true, features = ["full"] }
reqwest = { workspace = true, features = ["hickory-dns"] } reqwest = { workspace = true, features = ["hickory-dns"] }
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true } tokio-stream = { workspace = true, features = ["full"] }
tower = { workspace = true } tower = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace", "fs"] } tower-http = { workspace = true, features = ["cors", "trace", "fs"] }
ulid = { workspace = true } ulid = { workspace = true }

View File

@ -40,6 +40,7 @@ hyper = { workspace = true, features = ["full"] }
percent-encoding = { workspace = true } percent-encoding = { workspace = true }
reqwest = { workspace = true, features = ["stream", "hickory-dns"] } reqwest = { workspace = true, features = ["stream", "hickory-dns"] }
rand = { workspace = true }
ed25519-dalek = { workspace = true, features = [ ed25519-dalek = { workspace = true, features = [
"pem", "pem",
"pkcs8", "pkcs8",

View File

@ -4,6 +4,7 @@ use http::{HeaderMap, HeaderName, HeaderValue, Method};
use indexmap::IndexSet; use indexmap::IndexSet;
use serde_json::Value; use serde_json::Value;
use sha2::Digest; use sha2::Digest;
use std::fmt::Write;
use std::{fmt::Display, string::FromUtf8Error, sync::Arc}; use std::{fmt::Display, string::FromUtf8Error, sync::Arc};
use thiserror::Error; use thiserror::Error;
use tokio::task; use tokio::task;
@ -257,16 +258,17 @@ impl ApClientService for ApClientServiceDefaultProvider {
let message = components let message = components
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k.as_ref(), v)) .fold(String::new(), |mut acc, (k, v)| {
.collect::<Vec<_>>() writeln!(&mut acc, "{}: {}", k.as_ref(), v).unwrap();
.join("\n"); acc
});
let key_id = signing_key.key_id.clone().into_owned(); let key_id = signing_key.key_id.clone().into_owned();
let key = signing_key.into_owned(); let key = signing_key.into_owned();
let signature = task::spawn_blocking(move || { let signature = task::spawn_blocking(move || {
key key
.key .key
.sign_base64(signing_algorithm, &message.into_bytes()) .sign_base64(signing_algorithm, message.trim_end().as_bytes())
}).await??; }).await??;
Ok(ApSignature { Ok(ApSignature {
@ -329,7 +331,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
} }
headers.insert( headers.insert(
HeaderName::from_lowercase(b"signature").unwrap(), HeaderName::from_static("signature"),
HeaderValue::try_from(signed.to_string())?, HeaderValue::try_from(signed.to_string())?,
); );
@ -350,10 +352,9 @@ impl ApClientService for ApClientServiceDefaultProvider {
signing_algorithm: SigningAlgorithm, signing_algorithm: SigningAlgorithm,
expires: Option<chrono::DateTime<Utc>>, expires: Option<chrono::DateTime<Utc>>,
url: &str, url: &str,
body: &Value, body_bytes: Vec<u8>,
) -> Result<String, Self::Error> { ) -> Result<String, Self::Error> {
let url = url.parse()?; let url = url.parse()?;
let body_bytes = serde_json::to_vec(body)?;
// Move in, move out :3 // Move in, move out :3
let (digest_raw, body_bytes) = task::spawn_blocking(move || { let (digest_raw, body_bytes) = task::spawn_blocking(move || {
let mut sha = sha2::Sha256::new(); let mut sha = sha2::Sha256::new();
@ -406,12 +407,12 @@ impl ApClientService for ApClientServiceDefaultProvider {
} }
headers.insert( headers.insert(
HeaderName::from_lowercase(b"digest").unwrap(), HeaderName::from_static("digest"),
HeaderValue::try_from(digest_base64)?, HeaderValue::try_from(digest_base64)?,
); );
headers.insert( headers.insert(
HeaderName::from_lowercase(b"signature").unwrap(), HeaderName::from_static("signature"),
HeaderValue::try_from(signed.to_string())?, HeaderValue::try_from(signed.to_string())?,
); );

View File

@ -1,11 +1,11 @@
use async_stream::stream; use async_stream::stream;
use futures_util::{select, stream::StreamExt, FutureExt, Stream, TryStreamExt}; use futures_util::{stream::StreamExt, Stream, TryStreamExt};
use headers::UserAgent; use headers::UserAgent;
use hyper::body::Bytes; use hyper::body::Bytes;
use reqwest::{redirect::Policy, Client, RequestBuilder}; use reqwest::{redirect::Policy, Client, RequestBuilder};
use serde_json::Value; use serde_json::Value;
use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use tokio::pin;
use url::Url; use url::Url;
use magnetar_core::web_model::ContentType; use magnetar_core::web_model::ContentType;
@ -58,6 +58,7 @@ impl FederationClient {
let client = Client::builder() let client = Client::builder()
.https_only(force_https) .https_only(force_https)
.redirect(Policy::limited(5)) .redirect(Policy::limited(5))
.timeout(Duration::from_secs(timeout_seconds))
.build()?; .build()?;
Ok(FederationClient { Ok(FederationClient {
@ -144,13 +145,6 @@ impl FederationRequestBuilder<'_> {
} }
pub async fn send(self) -> Result<Vec<u8>, FederationClientError> { pub async fn send(self) -> Result<Vec<u8>, FederationClientError> {
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(
self.client.timeout_seconds,
))
.fuse();
tokio::pin!(sleep);
let body = async move {
self.send_stream() self.send_stream()
.await? .await?
.try_fold(Vec::new(), |mut acc, b| async move { .try_fold(Vec::new(), |mut acc, b| async move {
@ -159,15 +153,6 @@ impl FederationRequestBuilder<'_> {
}) })
.await .await
} }
.fuse();
pin!(body);
select! {
b = body => b,
_ = sleep => Err(FederationClientError::TimeoutError)
}
}
pub async fn json(self) -> Result<Value, FederationClientError> { pub async fn json(self) -> Result<Value, FederationClientError> {
let data = self.send().await?; let data = self.send().await?;

View File

@ -2,7 +2,7 @@ use rsa::pkcs1::DecodeRsaPrivateKey;
use rsa::pkcs1::DecodeRsaPublicKey; use rsa::pkcs1::DecodeRsaPublicKey;
use rsa::pkcs8::DecodePrivateKey; use rsa::pkcs8::DecodePrivateKey;
use rsa::pkcs8::DecodePublicKey; use rsa::pkcs8::DecodePublicKey;
use rsa::signature::Verifier; use rsa::signature::{RandomizedSigner, Verifier};
use rsa::{ use rsa::{
sha2::{Sha256, Sha512}, sha2::{Sha256, Sha512},
signature::Signer, signature::Signer,
@ -268,10 +268,10 @@ impl ApHttpSigningKey<'_> {
) -> Result<Vec<u8>, ApSigningError> { ) -> Result<Vec<u8>, ApSigningError> {
match (self, algorithm) { match (self, algorithm) {
(Self::RsaSha256(key), SigningAlgorithm::RsaSha256 | SigningAlgorithm::Hs2019) => { (Self::RsaSha256(key), SigningAlgorithm::RsaSha256 | SigningAlgorithm::Hs2019) => {
Ok(Box::<[u8]>::from(key.sign(message)).into_vec()) Ok(Box::<[u8]>::from(key.sign_with_rng(&mut rand::thread_rng(), message)).into_vec())
} }
(Self::RsaSha512(key), SigningAlgorithm::Hs2019) => { (Self::RsaSha512(key), SigningAlgorithm::Hs2019) => {
Ok(Box::<[u8]>::from(key.sign(message)).into_vec()) Ok(Box::<[u8]>::from(key.sign_with_rng(&mut rand::thread_rng(), message)).into_vec())
} }
(Self::Ed25519(key), SigningAlgorithm::Hs2019) => { (Self::Ed25519(key), SigningAlgorithm::Hs2019) => {
Ok(key.sign(message).to_bytes().to_vec()) Ok(key.sign(message).to_bytes().to_vec())

View File

@ -171,6 +171,6 @@ pub trait ApClientService: Send + Sync {
signing_algorithm: SigningAlgorithm, signing_algorithm: SigningAlgorithm,
expires: Option<chrono::DateTime<Utc>>, expires: Option<chrono::DateTime<Utc>>,
url: &str, url: &str,
body: &Value, body: Vec<u8>,
) -> Result<String, Self::Error>; ) -> Result<String, Self::Error>;
} }

View File

@ -74,10 +74,10 @@ const showTicker =
display: flex; display: flex;
align-items: center; align-items: center;
white-space: nowrap; white-space: nowrap;
justify-self: flex-end;
border-radius: 100px; border-radius: 100px;
font-size: 0.8em; font-size: 0.8em;
text-shadow: 0 2px 2px var(--shadow); text-shadow: 0 2px 2px var(--shadow);
gap: 0 0.1em;
> .avatar { > .avatar {
width: 3.7em; width: 3.7em;
@ -99,6 +99,8 @@ const showTicker =
overflow: hidden; overflow: hidden;
text-overflow: ellipsis; text-overflow: ellipsis;
gap: 0.1em 0; gap: 0.1em 0;
display: flex;
flex-wrap: wrap;
} }
&:last-child { &:last-child {
@ -151,7 +153,6 @@ const showTicker =
margin: 0 0.5em 0 0; margin: 0 0.5em 0 0;
overflow: hidden; overflow: hidden;
text-overflow: ellipsis; text-overflow: ellipsis;
align-self: flex-start;
font-size: 0.9em; font-size: 0.9em;
} }

View File

@ -4,7 +4,7 @@
v-tooltip=" v-tooltip="
capitalize( capitalize(
magTransProperty(instance, 'software_name', 'softwareName') ?? magTransProperty(instance, 'software_name', 'softwareName') ??
'?', '?'
) )
" "
ref="ticker" ref="ticker"
@ -40,7 +40,7 @@ const instance = props.instance ?? {
name: instanceName, name: instanceName,
themeColor: ( themeColor: (
document.querySelector( document.querySelector(
'meta[name="theme-color-orig"]', 'meta[name="theme-color-orig"]'
) as HTMLMetaElement ) as HTMLMetaElement
)?.content, )?.content,
softwareName: (Instance.softwareName || "Magnetar") as string | null, softwareName: (Instance.softwareName || "Magnetar") as string | null,
@ -61,18 +61,18 @@ const bg = {
}; };
function getInstanceIcon( function getInstanceIcon(
instance?: Misskey.entities.User["instance"] | types.InstanceTicker | null, instance?: Misskey.entities.User["instance"] | types.InstanceTicker | null
): string { ): string {
if (!instance) return "/client-assets/dummy.png"; if (!instance) return "/client-assets/dummy.png";
return ( return (
getProxiedImageUrlNullable( getProxiedImageUrlNullable(
magTransProperty(instance, "icon_url", "iconUrl"), magTransProperty(instance, "icon_url", "iconUrl"),
"preview", "preview"
) ?? ) ??
getProxiedImageUrlNullable( getProxiedImageUrlNullable(
magTransProperty(instance, "favicon_url", "faviconUrl"), magTransProperty(instance, "favicon_url", "faviconUrl"),
"preview", "preview"
) ?? ) ??
"/client-assets/dummy.png" "/client-assets/dummy.png"
); );
@ -90,6 +90,7 @@ function getInstanceIcon(
font-size: 0.8em; font-size: 0.8em;
text-shadow: 0 2px 2px var(--shadow); text-shadow: 0 2px 2px var(--shadow);
overflow: hidden; overflow: hidden;
.header > .body & { .header > .body & {
width: max-content; width: max-content;
max-width: 100%; max-width: 100%;
@ -108,11 +109,9 @@ function getInstanceIcon(
font-weight: bold; font-weight: bold;
text-overflow: ellipsis; text-overflow: ellipsis;
white-space: nowrap; white-space: nowrap;
text-shadow: text-shadow: -1px -1px 0 var(--bg), 1px -1px 0 var(--bg),
-1px -1px 0 var(--bg), -1px 1px 0 var(--bg), 1px 1px 0 var(--bg);
1px -1px 0 var(--bg),
-1px 1px 0 var(--bg),
1px 1px 0 var(--bg);
.article > .main &, .article > .main &,
.header > .body & { .header > .body & {
display: unset; display: unset;

View File

@ -23,7 +23,7 @@ struct RpcApGet {
struct RpcApPost { struct RpcApPost {
user_id: String, user_id: String,
url: String, url: String,
body: serde_json::Value, body: String,
} }
pub fn create_rpc_router() -> MagRpc { pub fn create_rpc_router() -> MagRpc {
@ -90,7 +90,7 @@ pub fn create_rpc_router() -> MagRpc {
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?; .create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
let result = service let result = service
.ap_client .ap_client
.signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body) .signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, body.into_bytes())
.await?; .await?;
Result::<_, DeliveryError>::Ok(result) Result::<_, DeliveryError>::Ok(result)
}, },

View File

@ -1,21 +1,26 @@
use crate::service::MagnetarService; use crate::service::MagnetarService;
use either::Either; use either::Either;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt};
use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{pin_mut, SinkExt, StreamExt};
use miette::{miette, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::any::Any; use std::any::Any;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Debug; use std::fmt::Debug;
use std::future;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::{TcpListener, UnixSocket}; use tokio::net::{TcpListener, UnixSocket};
use tokio::select; use tokio::select;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::{debug, error, info, warn, Instrument}; use tracing::{debug, error, info, warn, Instrument};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -205,6 +210,8 @@ impl MagRpc {
loop { loop {
let (stream, remote_addr) = select!( let (stream, remote_addr) = select!(
biased;
_ = &mut cancel => break,
Some(c) = connections.join_next() => { Some(c) = connections.join_next() => {
debug!("RPC TCP connection closed: {:?}", c); debug!("RPC TCP connection closed: {:?}", c);
continue; continue;
@ -217,45 +224,20 @@ impl MagRpc {
conn.unwrap() conn.unwrap()
}, },
_ = &mut cancel => break
); );
debug!("RPC TCP connection accepted: {:?}", remote_addr); debug!("RPC TCP connection accepted: {:?}", remote_addr);
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>(); let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
let (read_half, mut write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let buf_read = BufReader::new(read_half); let handler_fut = handle_process(
let context = context.clone(); rx_dec.stream_decode(BufReader::new(read_half), cancel_recv),
let rx_dec = rx_dec.clone(); BufWriter::new(write_half),
let fut = async move { context.clone(),
let src = rx_dec )
.stream_decode(buf_read, cancel_recv)
.map_ok(process(context))
.try_buffer_unordered(100)
.boxed();
futures::pin_mut!(src);
while let Some(result) = src.try_next().await? {
let Some((serial, RpcResponse(bytes))) = result else {
continue;
};
write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?;
write_half
.write_u32(bytes.len() as u32)
.await
.into_diagnostic()?;
write_half.write_all(&bytes).await.into_diagnostic()?;
write_half.flush().await.into_diagnostic()?;
}
Ok(remote_addr)
}
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr)); .instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
connections.spawn(fut); connections.spawn(handler_fut);
cancellation_tokens.push(cancel_send); cancellation_tokens.push(cancel_send);
} }
@ -295,6 +277,8 @@ impl MagRpc {
loop { loop {
let (stream, remote_addr) = select!( let (stream, remote_addr) = select!(
biased;
_ = &mut cancel => break,
Some(c) = connections.join_next() => { Some(c) = connections.join_next() => {
debug!("RPC Unix connection closed: {:?}", c); debug!("RPC Unix connection closed: {:?}", c);
continue; continue;
@ -306,46 +290,21 @@ impl MagRpc {
} }
conn.unwrap() conn.unwrap()
}, }
_ = &mut cancel => break
); );
debug!("RPC Unix connection accepted: {:?}", remote_addr); debug!("RPC Unix connection accepted: {:?}", remote_addr);
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>(); let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
let (read_half, mut write_half) = stream.into_split(); let (read_half, write_half) = stream.into_split();
let buf_read = BufReader::new(read_half); let handler_fut = handle_process(
let context = context.clone(); rx_dec.stream_decode(BufReader::with_capacity(64 * 1024, read_half), cancel_recv),
let rx_dec = rx_dec.clone(); BufWriter::new(write_half),
let fut = async move { context.clone(),
let src = rx_dec )
.stream_decode(buf_read, cancel_recv)
.map_ok(process(context))
.try_buffer_unordered(100)
.boxed();
futures::pin_mut!(src);
while let Some(result) = src.try_next().await? {
let Some((serial, RpcResponse(bytes))) = result else {
continue;
};
write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?;
write_half
.write_u32(bytes.len() as u32)
.await
.into_diagnostic()?;
write_half.write_all(&bytes).await.into_diagnostic()?;
write_half.flush().await.into_diagnostic()?;
}
miette::Result::<()>::Ok(())
}
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr)); .instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
connections.spawn(fut.boxed()); connections.spawn(handler_fut);
cancellation_tokens.push(cancel_send); cancellation_tokens.push(cancel_send);
} }
@ -363,16 +322,87 @@ impl MagRpc {
} }
} }
async fn write_response(
mut buf_write: Pin<&mut BufWriter<impl AsyncWrite>>,
serial: u64,
result: Option<RpcResponse>,
) -> miette::Result<()> {
let header = if result.is_some() { b'M' } else { b'F' };
buf_write.write_u8(header).await.into_diagnostic()?;
buf_write.write_u64(serial).await.into_diagnostic()?;
let Some(RpcResponse(bytes)) = result else {
return Ok(());
};
buf_write
.write_u32(bytes.len() as u32)
.await
.into_diagnostic()?;
buf_write.write_all(&bytes).await.into_diagnostic()?;
buf_write.flush().await.into_diagnostic()?;
Ok(())
}
async fn handle_process(
task_stream: impl Stream<Item=miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static,
mut buf_write: BufWriter<impl AsyncWrite + Unpin>,
context: Arc<MagnetarService>,
) -> miette::Result<()> {
let results = FuturesUnordered::new();
pin_mut!(results);
let (tx, rx) = futures::channel::mpsc::unbounded();
pin_mut!(rx);
let input_stream = tokio::spawn(
task_stream
.map_ok(process(context))
.boxed()
.forward(tx.sink_map_err(|e| miette!(e)))
);
pin_mut!(input_stream);
loop {
select!(
biased;
_ = &mut input_stream => {
break;
}
Some(task) = rx.next() => {
results.push(task);
}
Some(res) = results.next() => {
let (serial, result) = res?;
write_response(Pin::new(&mut buf_write), serial, result)
.await?;
}
else => {
break;
}
);
}
Ok(())
}
fn process( fn process(
context: Arc<MagnetarService>, context: Arc<MagnetarService>,
) -> impl Fn( ) -> impl Fn(
(u64, MessageRaw, Arc<MagRpcHandlerMapped>), (u64, MessageRaw, Arc<MagRpcHandlerMapped>),
) -> Pin< ) -> BoxFuture<'static, miette::Result<(u64, Option<RpcResponse>)>> {
Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
> {
move |(serial, payload, listener)| { move |(serial, payload, listener)| {
let ctx = context.clone(); let ctx = context.clone();
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }) tokio::task::spawn(async move {
let start = Instant::now();
let res = listener(ctx, payload).await;
let took = start.elapsed();
// TODO: Extract this into a config
if took.as_secs_f64() > 50.0 {
warn!("Handler took long: {} sec", took.as_secs_f64());
}
(serial, res)
}.instrument(tracing::info_span!("Request", ?serial)))
.map_err(|e| miette!(e)) .map_err(|e| miette!(e))
.boxed() .boxed()
} }
@ -385,9 +415,9 @@ struct RpcCallDecoder {
} }
impl RpcCallDecoder { impl RpcCallDecoder {
fn stream_decode<R: AsyncRead + AsyncReadExt + Unpin + Send + 'static>( fn stream_decode(
&self, &self,
mut buf_read: BufReader<R>, mut buf_read: impl AsyncBufRead + Send + Unpin + 'static,
mut cancel: tokio::sync::oneshot::Receiver<()>, mut cancel: tokio::sync::oneshot::Receiver<()>,
) -> impl Stream<Item=miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static ) -> impl Stream<Item=miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static
{ {
@ -450,8 +480,9 @@ impl RpcCallDecoder {
}; };
let Some((serial, name_slice, payload_slice)) = select! { let Some((serial, name_slice, payload_slice)) = select! {
read_result = read_fut => read_result, biased;
_ = &mut cancel => { break; } _ = &mut cancel => { break; }
read_result = read_fut => read_result,
}? else { }? else {
break; break;
}; };