Compare commits

...

3 Commits

Author SHA1 Message Date
Natty 2acc41587a
Drop RPC connections receiving bogus data
ci/woodpecker/push/ociImagePush Pipeline was successful Details
2024-11-16 19:29:46 +01:00
Natty 56f80f290a
Dependency management 2024-11-16 17:50:51 +01:00
Natty d598387795
Recycle the read buffers for RPC data 2024-11-16 17:43:57 +01:00
4 changed files with 71 additions and 61 deletions

70
Cargo.lock generated
View File

@ -455,16 +455,16 @@ checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
[[package]]
name = "cached"
version = "0.53.1"
version = "0.54.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4d73155ae6b28cf5de4cfc29aeb02b8a1c6dab883cb015d15cd514e42766846"
checksum = "9718806c4a2fe9e8a56fd736f97b340dd10ed1be8ed733ed50449f351dc33cae"
dependencies = [
"ahash 0.8.11",
"cached_proc_macro",
"cached_proc_macro_types",
"hashbrown 0.14.5",
"once_cell",
"thiserror",
"thiserror 1.0.69",
"web-time",
]
@ -1316,7 +1316,7 @@ dependencies = [
"ipnet",
"once_cell",
"rand",
"thiserror",
"thiserror 1.0.69",
"tinyvec",
"tokio",
"tracing",
@ -1339,7 +1339,7 @@ dependencies = [
"rand",
"resolv-conf",
"smallvec",
"thiserror",
"thiserror 1.0.69",
"tokio",
"tracing",
]
@ -1803,7 +1803,7 @@ checksum = "062c875482ccb676fd40c804a40e3824d4464c18c364547456d1c8e8e951ae47"
dependencies = [
"miette 5.10.0",
"nom",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@ -1928,7 +1928,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"strum",
"thiserror",
"thiserror 2.0.3",
"tokio",
"tokio-stream",
"toml",
@ -1950,7 +1950,7 @@ dependencies = [
"either",
"serde",
"serde_json",
"thiserror",
"thiserror 2.0.3",
"tokio",
"url",
]
@ -1971,7 +1971,7 @@ dependencies = [
"serde",
"serde_json",
"tera",
"thiserror",
"thiserror 2.0.3",
"tokio",
"toml",
"tower",
@ -1990,7 +1990,7 @@ dependencies = [
"magnetar_sdk",
"percent-encoding",
"serde",
"thiserror",
"thiserror 2.0.3",
"toml",
"url",
]
@ -2033,7 +2033,7 @@ dependencies = [
"serde_json",
"sha2",
"strum",
"thiserror",
"thiserror 2.0.3",
"tokio",
"tracing",
"tracing-subscriber",
@ -2082,7 +2082,7 @@ dependencies = [
"serde",
"serde_json",
"strum",
"thiserror",
"thiserror 2.0.3",
"tokio",
"tokio-util",
"tracing",
@ -2109,7 +2109,7 @@ dependencies = [
"magnetar_core",
"magnetar_sdk",
"miette 7.2.0",
"thiserror",
"thiserror 2.0.3",
"tracing",
]
@ -2191,7 +2191,7 @@ checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e"
dependencies = [
"miette-derive 5.10.0",
"once_cell",
"thiserror",
"thiserror 1.0.69",
"unicode-width",
]
@ -2211,7 +2211,7 @@ dependencies = [
"supports-unicode",
"terminal_size",
"textwrap",
"thiserror",
"thiserror 1.0.69",
"unicode-width",
]
@ -2579,7 +2579,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442"
dependencies = [
"memchr",
"thiserror",
"thiserror 1.0.69",
"ucd-trie",
]
@ -3227,7 +3227,7 @@ dependencies = [
"serde_json",
"sqlx",
"strum",
"thiserror",
"thiserror 1.0.69",
"time",
"tracing",
"url",
@ -3326,7 +3326,7 @@ dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
"thiserror",
"thiserror 1.0.69",
]
[[package]]
@ -3647,7 +3647,7 @@ dependencies = [
"sha2",
"smallvec",
"sqlformat",
"thiserror",
"thiserror 1.0.69",
"time",
"tokio",
"tokio-stream",
@ -3736,7 +3736,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror",
"thiserror 1.0.69",
"time",
"tracing",
"uuid",
@ -3780,7 +3780,7 @@ dependencies = [
"smallvec",
"sqlx-core",
"stringprep",
"thiserror",
"thiserror 1.0.69",
"time",
"tracing",
"uuid",
@ -4037,7 +4037,16 @@ version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl",
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
dependencies = [
"thiserror-impl 2.0.3",
]
[[package]]
@ -4051,6 +4060,17 @@ dependencies = [
"syn 2.0.87",
]
[[package]]
name = "thiserror-impl"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]]
name = "thread_local"
version = "1.1.8"
@ -4243,9 +4263,9 @@ dependencies = [
[[package]]
name = "tower-http"
version = "0.5.2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97"
dependencies = [
"bitflags",
"bytes",
@ -4353,7 +4373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6"
dependencies = [
"chrono",
"thiserror",
"thiserror 1.0.69",
"ts-rs-macros",
]

View File

@ -32,7 +32,7 @@ axum = "0.7"
axum-extra = "0.9"
base64 = "0.22"
bytes = "1.7"
cached = "0.53"
cached = "0.54"
cfg-if = "1"
chrono = "0.4"
compact_str = "0.8"
@ -73,13 +73,13 @@ sha2 = "0.10"
smallvec = "1.13"
strum = "0.26"
tera = { version = "1", default-features = false }
thiserror = "1"
thiserror = "2"
tokio = "1.24"
tokio-util = "0.7"
tokio-stream = "0.1"
toml = "0.8"
tower = "0.5"
tower-http = "0.5"
tower-http = "0.6"
tracing = "0.1"
tracing-subscriber = "0.3"
ts-rs = "7"

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use magnetar_federation::crypto::SigningAlgorithm;
use proto::{MagRpc, RpcMessage};
use serde::Deserialize;
use tracing::{debug, info};
use tracing::debug;
use crate::{
model::{processing::note::NoteModel, PackingContext},

View File

@ -1,6 +1,6 @@
use crate::service::MagnetarService;
use futures::{FutureExt, Stream, StreamExt};
use miette::{miette, IntoDiagnostic};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use miette::{miette, Error, IntoDiagnostic};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::any::Any;
@ -218,21 +218,14 @@ impl MagRpc {
let fut = async move {
let src = rx_dec
.stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move {
if let Err(e) = &r {
error!("Stream decoding error: {e}");
}
r.ok()
})
.map(process(context))
.buffer_unordered(100)
.map_ok(process(context))
.try_buffer_unordered(100)
.boxed();
futures::pin_mut!(src);
while let Some(result) = src.next().await {
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
while let Some(result) = src.try_next().await? {
let Some((serial, RpcResponse(bytes))) = result else {
continue;
};
@ -315,21 +308,14 @@ impl MagRpc {
let fut = async move {
let src = rx_dec
.stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move {
if let Err(e) = &r {
error!("Stream decoding error: {e}");
}
r.ok()
})
.map(process(context))
.buffer_unordered(100)
.map_ok(process(context))
.try_buffer_unordered(100)
.boxed();
futures::pin_mut!(src);
while let Some(result) = src.next().await {
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
while let Some(result) = src.try_next().await? {
let Some((serial, RpcResponse(bytes))) = result else {
continue;
};
@ -370,11 +356,13 @@ fn process(
) -> impl Fn(
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
) -> Pin<
Box<dyn Future<Output = Result<Option<(u64, RpcResponse)>, JoinError>> + Send + 'static>,
Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
> {
move |(serial, payload, listener)| {
let ctx = context.clone();
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }).boxed()
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) })
.map_err(|e| miette!(e))
.boxed()
}
}
@ -396,6 +384,8 @@ impl RpcCallDecoder {
async_stream::try_stream! {
let mut messages = 0usize;
let mut name_buf = vec![0; 128];
let mut payload_buf = vec![0; 1024];
loop {
let read_fut = async {
@ -429,7 +419,7 @@ impl RpcCallDecoder {
)).into_diagnostic();
}
let mut name_buf = vec![0; name_len];
name_buf.resize(name_len, 0);
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
@ -441,20 +431,20 @@ impl RpcCallDecoder {
)).into_diagnostic();
}
let mut payload_buf = vec![0; payload_len];
payload_buf.resize(payload_len, 0);
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf)))
miette::Result::<_>::Ok(Some((serial, &name_buf[..], &payload_buf[..])))
};
let Some((serial, name_buf, payload_buf)) = select! {
let Some((serial, name_slice, payload_slice)) = select! {
read_result = read_fut => read_result,
_ = &mut cancel => { break; }
}? else {
break;
};
let name = std::str::from_utf8(&name_buf).into_diagnostic()?;
let name = std::str::from_utf8(name_slice).into_diagnostic()?;
let decoder = decoders
.get(name)
@ -465,7 +455,7 @@ impl RpcCallDecoder {
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
.clone();
let packet = match decoder(&payload_buf) {
let packet = match decoder(payload_slice) {
Ok(p) => p,
Err(e) => {
error!("Failed to parse packet: {e}");