Compare commits

..

3 Commits

Author SHA1 Message Date
Natty 2acc41587a
Drop RPC connections receiving bogus data
ci/woodpecker/push/ociImagePush Pipeline was successful Details
2024-11-16 19:29:46 +01:00
Natty 56f80f290a
Dependency management 2024-11-16 17:50:51 +01:00
Natty d598387795
Recycle the read buffers for RPC data 2024-11-16 17:43:57 +01:00
4 changed files with 71 additions and 61 deletions

70
Cargo.lock generated
View File

@ -455,16 +455,16 @@ checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
[[package]] [[package]]
name = "cached" name = "cached"
version = "0.53.1" version = "0.54.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4d73155ae6b28cf5de4cfc29aeb02b8a1c6dab883cb015d15cd514e42766846" checksum = "9718806c4a2fe9e8a56fd736f97b340dd10ed1be8ed733ed50449f351dc33cae"
dependencies = [ dependencies = [
"ahash 0.8.11", "ahash 0.8.11",
"cached_proc_macro", "cached_proc_macro",
"cached_proc_macro_types", "cached_proc_macro_types",
"hashbrown 0.14.5", "hashbrown 0.14.5",
"once_cell", "once_cell",
"thiserror", "thiserror 1.0.69",
"web-time", "web-time",
] ]
@ -1316,7 +1316,7 @@ dependencies = [
"ipnet", "ipnet",
"once_cell", "once_cell",
"rand", "rand",
"thiserror", "thiserror 1.0.69",
"tinyvec", "tinyvec",
"tokio", "tokio",
"tracing", "tracing",
@ -1339,7 +1339,7 @@ dependencies = [
"rand", "rand",
"resolv-conf", "resolv-conf",
"smallvec", "smallvec",
"thiserror", "thiserror 1.0.69",
"tokio", "tokio",
"tracing", "tracing",
] ]
@ -1803,7 +1803,7 @@ checksum = "062c875482ccb676fd40c804a40e3824d4464c18c364547456d1c8e8e951ae47"
dependencies = [ dependencies = [
"miette 5.10.0", "miette 5.10.0",
"nom", "nom",
"thiserror", "thiserror 1.0.69",
] ]
[[package]] [[package]]
@ -1928,7 +1928,7 @@ dependencies = [
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"strum", "strum",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"toml", "toml",
@ -1950,7 +1950,7 @@ dependencies = [
"either", "either",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
"url", "url",
] ]
@ -1971,7 +1971,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tera", "tera",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
"toml", "toml",
"tower", "tower",
@ -1990,7 +1990,7 @@ dependencies = [
"magnetar_sdk", "magnetar_sdk",
"percent-encoding", "percent-encoding",
"serde", "serde",
"thiserror", "thiserror 2.0.3",
"toml", "toml",
"url", "url",
] ]
@ -2033,7 +2033,7 @@ dependencies = [
"serde_json", "serde_json",
"sha2", "sha2",
"strum", "strum",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@ -2082,7 +2082,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"strum", "strum",
"thiserror", "thiserror 2.0.3",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tracing", "tracing",
@ -2109,7 +2109,7 @@ dependencies = [
"magnetar_core", "magnetar_core",
"magnetar_sdk", "magnetar_sdk",
"miette 7.2.0", "miette 7.2.0",
"thiserror", "thiserror 2.0.3",
"tracing", "tracing",
] ]
@ -2191,7 +2191,7 @@ checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e"
dependencies = [ dependencies = [
"miette-derive 5.10.0", "miette-derive 5.10.0",
"once_cell", "once_cell",
"thiserror", "thiserror 1.0.69",
"unicode-width", "unicode-width",
] ]
@ -2211,7 +2211,7 @@ dependencies = [
"supports-unicode", "supports-unicode",
"terminal_size", "terminal_size",
"textwrap", "textwrap",
"thiserror", "thiserror 1.0.69",
"unicode-width", "unicode-width",
] ]
@ -2579,7 +2579,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442" checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442"
dependencies = [ dependencies = [
"memchr", "memchr",
"thiserror", "thiserror 1.0.69",
"ucd-trie", "ucd-trie",
] ]
@ -3227,7 +3227,7 @@ dependencies = [
"serde_json", "serde_json",
"sqlx", "sqlx",
"strum", "strum",
"thiserror", "thiserror 1.0.69",
"time", "time",
"tracing", "tracing",
"url", "url",
@ -3326,7 +3326,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.87", "syn 2.0.87",
"thiserror", "thiserror 1.0.69",
] ]
[[package]] [[package]]
@ -3647,7 +3647,7 @@ dependencies = [
"sha2", "sha2",
"smallvec", "smallvec",
"sqlformat", "sqlformat",
"thiserror", "thiserror 1.0.69",
"time", "time",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -3736,7 +3736,7 @@ dependencies = [
"smallvec", "smallvec",
"sqlx-core", "sqlx-core",
"stringprep", "stringprep",
"thiserror", "thiserror 1.0.69",
"time", "time",
"tracing", "tracing",
"uuid", "uuid",
@ -3780,7 +3780,7 @@ dependencies = [
"smallvec", "smallvec",
"sqlx-core", "sqlx-core",
"stringprep", "stringprep",
"thiserror", "thiserror 1.0.69",
"time", "time",
"tracing", "tracing",
"uuid", "uuid",
@ -4037,7 +4037,16 @@ version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [ dependencies = [
"thiserror-impl", "thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
dependencies = [
"thiserror-impl 2.0.3",
] ]
[[package]] [[package]]
@ -4051,6 +4060,17 @@ dependencies = [
"syn 2.0.87", "syn 2.0.87",
] ]
[[package]]
name = "thiserror-impl"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]] [[package]]
name = "thread_local" name = "thread_local"
version = "1.1.8" version = "1.1.8"
@ -4243,9 +4263,9 @@ dependencies = [
[[package]] [[package]]
name = "tower-http" name = "tower-http"
version = "0.5.2" version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97"
dependencies = [ dependencies = [
"bitflags", "bitflags",
"bytes", "bytes",
@ -4353,7 +4373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6" checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6"
dependencies = [ dependencies = [
"chrono", "chrono",
"thiserror", "thiserror 1.0.69",
"ts-rs-macros", "ts-rs-macros",
] ]

View File

@ -32,7 +32,7 @@ axum = "0.7"
axum-extra = "0.9" axum-extra = "0.9"
base64 = "0.22" base64 = "0.22"
bytes = "1.7" bytes = "1.7"
cached = "0.53" cached = "0.54"
cfg-if = "1" cfg-if = "1"
chrono = "0.4" chrono = "0.4"
compact_str = "0.8" compact_str = "0.8"
@ -73,13 +73,13 @@ sha2 = "0.10"
smallvec = "1.13" smallvec = "1.13"
strum = "0.26" strum = "0.26"
tera = { version = "1", default-features = false } tera = { version = "1", default-features = false }
thiserror = "1" thiserror = "2"
tokio = "1.24" tokio = "1.24"
tokio-util = "0.7" tokio-util = "0.7"
tokio-stream = "0.1" tokio-stream = "0.1"
toml = "0.8" toml = "0.8"
tower = "0.5" tower = "0.5"
tower-http = "0.5" tower-http = "0.6"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
ts-rs = "7" ts-rs = "7"

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use magnetar_federation::crypto::SigningAlgorithm; use magnetar_federation::crypto::SigningAlgorithm;
use proto::{MagRpc, RpcMessage}; use proto::{MagRpc, RpcMessage};
use serde::Deserialize; use serde::Deserialize;
use tracing::{debug, info}; use tracing::debug;
use crate::{ use crate::{
model::{processing::note::NoteModel, PackingContext}, model::{processing::note::NoteModel, PackingContext},

View File

@ -1,6 +1,6 @@
use crate::service::MagnetarService; use crate::service::MagnetarService;
use futures::{FutureExt, Stream, StreamExt}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use miette::{miette, IntoDiagnostic}; use miette::{miette, Error, IntoDiagnostic};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::any::Any; use std::any::Any;
@ -218,21 +218,14 @@ impl MagRpc {
let fut = async move { let fut = async move {
let src = rx_dec let src = rx_dec
.stream_decode(buf_read, cancel_recv) .stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move { .map_ok(process(context))
if let Err(e) = &r { .try_buffer_unordered(100)
error!("Stream decoding error: {e}");
}
r.ok()
})
.map(process(context))
.buffer_unordered(100)
.boxed(); .boxed();
futures::pin_mut!(src); futures::pin_mut!(src);
while let Some(result) = src.next().await { while let Some(result) = src.try_next().await? {
let Ok(Some((serial, RpcResponse(bytes)))) = result else { let Some((serial, RpcResponse(bytes))) = result else {
continue; continue;
}; };
@ -315,21 +308,14 @@ impl MagRpc {
let fut = async move { let fut = async move {
let src = rx_dec let src = rx_dec
.stream_decode(buf_read, cancel_recv) .stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move { .map_ok(process(context))
if let Err(e) = &r { .try_buffer_unordered(100)
error!("Stream decoding error: {e}");
}
r.ok()
})
.map(process(context))
.buffer_unordered(100)
.boxed(); .boxed();
futures::pin_mut!(src); futures::pin_mut!(src);
while let Some(result) = src.next().await { while let Some(result) = src.try_next().await? {
let Ok(Some((serial, RpcResponse(bytes)))) = result else { let Some((serial, RpcResponse(bytes))) = result else {
continue; continue;
}; };
@ -370,11 +356,13 @@ fn process(
) -> impl Fn( ) -> impl Fn(
(u64, MessageRaw, Arc<MagRpcHandlerMapped>), (u64, MessageRaw, Arc<MagRpcHandlerMapped>),
) -> Pin< ) -> Pin<
Box<dyn Future<Output = Result<Option<(u64, RpcResponse)>, JoinError>> + Send + 'static>, Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
> { > {
move |(serial, payload, listener)| { move |(serial, payload, listener)| {
let ctx = context.clone(); let ctx = context.clone();
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }).boxed() tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) })
.map_err(|e| miette!(e))
.boxed()
} }
} }
@ -396,6 +384,8 @@ impl RpcCallDecoder {
async_stream::try_stream! { async_stream::try_stream! {
let mut messages = 0usize; let mut messages = 0usize;
let mut name_buf = vec![0; 128];
let mut payload_buf = vec![0; 1024];
loop { loop {
let read_fut = async { let read_fut = async {
@ -429,7 +419,7 @@ impl RpcCallDecoder {
)).into_diagnostic(); )).into_diagnostic();
} }
let mut name_buf = vec![0; name_len]; name_buf.resize(name_len, 0);
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?; buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize; let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
@ -441,20 +431,20 @@ impl RpcCallDecoder {
)).into_diagnostic(); )).into_diagnostic();
} }
let mut payload_buf = vec![0; payload_len]; payload_buf.resize(payload_len, 0);
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?; buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf))) miette::Result::<_>::Ok(Some((serial, &name_buf[..], &payload_buf[..])))
}; };
let Some((serial, name_buf, payload_buf)) = select! { let Some((serial, name_slice, payload_slice)) = select! {
read_result = read_fut => read_result, read_result = read_fut => read_result,
_ = &mut cancel => { break; } _ = &mut cancel => { break; }
}? else { }? else {
break; break;
}; };
let name = std::str::from_utf8(&name_buf).into_diagnostic()?; let name = std::str::from_utf8(name_slice).into_diagnostic()?;
let decoder = decoders let decoder = decoders
.get(name) .get(name)
@ -465,7 +455,7 @@ impl RpcCallDecoder {
.ok_or_else(|| miette!("No such RPC call name: {}", name))? .ok_or_else(|| miette!("No such RPC call name: {}", name))?
.clone(); .clone();
let packet = match decoder(&payload_buf) { let packet = match decoder(payload_slice) {
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
error!("Failed to parse packet: {e}"); error!("Failed to parse packet: {e}");