Compare commits
3 Commits
52cced9537
...
2acc41587a
Author | SHA1 | Date |
---|---|---|
Natty | 2acc41587a | |
Natty | 56f80f290a | |
Natty | d598387795 |
|
@ -455,16 +455,16 @@ checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cached"
|
name = "cached"
|
||||||
version = "0.53.1"
|
version = "0.54.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b4d73155ae6b28cf5de4cfc29aeb02b8a1c6dab883cb015d15cd514e42766846"
|
checksum = "9718806c4a2fe9e8a56fd736f97b340dd10ed1be8ed733ed50449f351dc33cae"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"ahash 0.8.11",
|
"ahash 0.8.11",
|
||||||
"cached_proc_macro",
|
"cached_proc_macro",
|
||||||
"cached_proc_macro_types",
|
"cached_proc_macro_types",
|
||||||
"hashbrown 0.14.5",
|
"hashbrown 0.14.5",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"web-time",
|
"web-time",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1316,7 +1316,7 @@ dependencies = [
|
||||||
"ipnet",
|
"ipnet",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"rand",
|
"rand",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"tinyvec",
|
"tinyvec",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -1339,7 +1339,7 @@ dependencies = [
|
||||||
"rand",
|
"rand",
|
||||||
"resolv-conf",
|
"resolv-conf",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
@ -1803,7 +1803,7 @@ checksum = "062c875482ccb676fd40c804a40e3824d4464c18c364547456d1c8e8e951ae47"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"miette 5.10.0",
|
"miette 5.10.0",
|
||||||
"nom",
|
"nom",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1928,7 +1928,7 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_urlencoded",
|
"serde_urlencoded",
|
||||||
"strum",
|
"strum",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"toml",
|
"toml",
|
||||||
|
@ -1950,7 +1950,7 @@ dependencies = [
|
||||||
"either",
|
"either",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"tokio",
|
"tokio",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
@ -1971,7 +1971,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tera",
|
"tera",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
"tower",
|
"tower",
|
||||||
|
@ -1990,7 +1990,7 @@ dependencies = [
|
||||||
"magnetar_sdk",
|
"magnetar_sdk",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"serde",
|
"serde",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"toml",
|
"toml",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
@ -2033,7 +2033,7 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2",
|
||||||
"strum",
|
"strum",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
@ -2082,7 +2082,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"strum",
|
"strum",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -2109,7 +2109,7 @@ dependencies = [
|
||||||
"magnetar_core",
|
"magnetar_core",
|
||||||
"magnetar_sdk",
|
"magnetar_sdk",
|
||||||
"miette 7.2.0",
|
"miette 7.2.0",
|
||||||
"thiserror",
|
"thiserror 2.0.3",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2191,7 +2191,7 @@ checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"miette-derive 5.10.0",
|
"miette-derive 5.10.0",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"unicode-width",
|
"unicode-width",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2211,7 +2211,7 @@ dependencies = [
|
||||||
"supports-unicode",
|
"supports-unicode",
|
||||||
"terminal_size",
|
"terminal_size",
|
||||||
"textwrap",
|
"textwrap",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"unicode-width",
|
"unicode-width",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2579,7 +2579,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442"
|
checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"ucd-trie",
|
"ucd-trie",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -3227,7 +3227,7 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"strum",
|
"strum",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"time",
|
"time",
|
||||||
"tracing",
|
"tracing",
|
||||||
"url",
|
"url",
|
||||||
|
@ -3326,7 +3326,7 @@ dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"syn 2.0.87",
|
"syn 2.0.87",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -3647,7 +3647,7 @@ dependencies = [
|
||||||
"sha2",
|
"sha2",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"sqlformat",
|
"sqlformat",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"time",
|
"time",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
|
@ -3736,7 +3736,7 @@ dependencies = [
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"sqlx-core",
|
"sqlx-core",
|
||||||
"stringprep",
|
"stringprep",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"time",
|
"time",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
@ -3780,7 +3780,7 @@ dependencies = [
|
||||||
"smallvec",
|
"smallvec",
|
||||||
"sqlx-core",
|
"sqlx-core",
|
||||||
"stringprep",
|
"stringprep",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"time",
|
"time",
|
||||||
"tracing",
|
"tracing",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
@ -4037,7 +4037,16 @@ version = "1.0.69"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
|
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"thiserror-impl",
|
"thiserror-impl 1.0.69",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thiserror"
|
||||||
|
version = "2.0.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
|
||||||
|
dependencies = [
|
||||||
|
"thiserror-impl 2.0.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -4051,6 +4060,17 @@ dependencies = [
|
||||||
"syn 2.0.87",
|
"syn 2.0.87",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thiserror-impl"
|
||||||
|
version = "2.0.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.87",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "thread_local"
|
name = "thread_local"
|
||||||
version = "1.1.8"
|
version = "1.1.8"
|
||||||
|
@ -4243,9 +4263,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-http"
|
name = "tower-http"
|
||||||
version = "0.5.2"
|
version = "0.6.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
|
checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -4353,7 +4373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6"
|
checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"thiserror",
|
"thiserror 1.0.69",
|
||||||
"ts-rs-macros",
|
"ts-rs-macros",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ axum = "0.7"
|
||||||
axum-extra = "0.9"
|
axum-extra = "0.9"
|
||||||
base64 = "0.22"
|
base64 = "0.22"
|
||||||
bytes = "1.7"
|
bytes = "1.7"
|
||||||
cached = "0.53"
|
cached = "0.54"
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
compact_str = "0.8"
|
compact_str = "0.8"
|
||||||
|
@ -73,13 +73,13 @@ sha2 = "0.10"
|
||||||
smallvec = "1.13"
|
smallvec = "1.13"
|
||||||
strum = "0.26"
|
strum = "0.26"
|
||||||
tera = { version = "1", default-features = false }
|
tera = { version = "1", default-features = false }
|
||||||
thiserror = "1"
|
thiserror = "2"
|
||||||
tokio = "1.24"
|
tokio = "1.24"
|
||||||
tokio-util = "0.7"
|
tokio-util = "0.7"
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
toml = "0.8"
|
toml = "0.8"
|
||||||
tower = "0.5"
|
tower = "0.5"
|
||||||
tower-http = "0.5"
|
tower-http = "0.6"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
ts-rs = "7"
|
ts-rs = "7"
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||||
use magnetar_federation::crypto::SigningAlgorithm;
|
use magnetar_federation::crypto::SigningAlgorithm;
|
||||||
use proto::{MagRpc, RpcMessage};
|
use proto::{MagRpc, RpcMessage};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use tracing::{debug, info};
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
model::{processing::note::NoteModel, PackingContext},
|
model::{processing::note::NoteModel, PackingContext},
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::service::MagnetarService;
|
use crate::service::MagnetarService;
|
||||||
use futures::{FutureExt, Stream, StreamExt};
|
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||||
use miette::{miette, IntoDiagnostic};
|
use miette::{miette, Error, IntoDiagnostic};
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
|
@ -218,21 +218,14 @@ impl MagRpc {
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
let src = rx_dec
|
let src = rx_dec
|
||||||
.stream_decode(buf_read, cancel_recv)
|
.stream_decode(buf_read, cancel_recv)
|
||||||
.filter_map(|r| async move {
|
.map_ok(process(context))
|
||||||
if let Err(e) = &r {
|
.try_buffer_unordered(100)
|
||||||
error!("Stream decoding error: {e}");
|
|
||||||
}
|
|
||||||
|
|
||||||
r.ok()
|
|
||||||
})
|
|
||||||
.map(process(context))
|
|
||||||
.buffer_unordered(100)
|
|
||||||
.boxed();
|
.boxed();
|
||||||
|
|
||||||
futures::pin_mut!(src);
|
futures::pin_mut!(src);
|
||||||
|
|
||||||
while let Some(result) = src.next().await {
|
while let Some(result) = src.try_next().await? {
|
||||||
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
|
let Some((serial, RpcResponse(bytes))) = result else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -315,21 +308,14 @@ impl MagRpc {
|
||||||
let fut = async move {
|
let fut = async move {
|
||||||
let src = rx_dec
|
let src = rx_dec
|
||||||
.stream_decode(buf_read, cancel_recv)
|
.stream_decode(buf_read, cancel_recv)
|
||||||
.filter_map(|r| async move {
|
.map_ok(process(context))
|
||||||
if let Err(e) = &r {
|
.try_buffer_unordered(100)
|
||||||
error!("Stream decoding error: {e}");
|
|
||||||
}
|
|
||||||
|
|
||||||
r.ok()
|
|
||||||
})
|
|
||||||
.map(process(context))
|
|
||||||
.buffer_unordered(100)
|
|
||||||
.boxed();
|
.boxed();
|
||||||
|
|
||||||
futures::pin_mut!(src);
|
futures::pin_mut!(src);
|
||||||
|
|
||||||
while let Some(result) = src.next().await {
|
while let Some(result) = src.try_next().await? {
|
||||||
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
|
let Some((serial, RpcResponse(bytes))) = result else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -370,11 +356,13 @@ fn process(
|
||||||
) -> impl Fn(
|
) -> impl Fn(
|
||||||
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
|
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
|
||||||
) -> Pin<
|
) -> Pin<
|
||||||
Box<dyn Future<Output = Result<Option<(u64, RpcResponse)>, JoinError>> + Send + 'static>,
|
Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
|
||||||
> {
|
> {
|
||||||
move |(serial, payload, listener)| {
|
move |(serial, payload, listener)| {
|
||||||
let ctx = context.clone();
|
let ctx = context.clone();
|
||||||
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }).boxed()
|
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) })
|
||||||
|
.map_err(|e| miette!(e))
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -396,6 +384,8 @@ impl RpcCallDecoder {
|
||||||
|
|
||||||
async_stream::try_stream! {
|
async_stream::try_stream! {
|
||||||
let mut messages = 0usize;
|
let mut messages = 0usize;
|
||||||
|
let mut name_buf = vec![0; 128];
|
||||||
|
let mut payload_buf = vec![0; 1024];
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let read_fut = async {
|
let read_fut = async {
|
||||||
|
@ -429,7 +419,7 @@ impl RpcCallDecoder {
|
||||||
)).into_diagnostic();
|
)).into_diagnostic();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut name_buf = vec![0; name_len];
|
name_buf.resize(name_len, 0);
|
||||||
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
|
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
|
||||||
|
|
||||||
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
|
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
|
||||||
|
@ -441,20 +431,20 @@ impl RpcCallDecoder {
|
||||||
)).into_diagnostic();
|
)).into_diagnostic();
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut payload_buf = vec![0; payload_len];
|
payload_buf.resize(payload_len, 0);
|
||||||
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
|
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
|
||||||
|
|
||||||
miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf)))
|
miette::Result::<_>::Ok(Some((serial, &name_buf[..], &payload_buf[..])))
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some((serial, name_buf, payload_buf)) = select! {
|
let Some((serial, name_slice, payload_slice)) = select! {
|
||||||
read_result = read_fut => read_result,
|
read_result = read_fut => read_result,
|
||||||
_ = &mut cancel => { break; }
|
_ = &mut cancel => { break; }
|
||||||
}? else {
|
}? else {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
|
||||||
let name = std::str::from_utf8(&name_buf).into_diagnostic()?;
|
let name = std::str::from_utf8(name_slice).into_diagnostic()?;
|
||||||
|
|
||||||
let decoder = decoders
|
let decoder = decoders
|
||||||
.get(name)
|
.get(name)
|
||||||
|
@ -465,7 +455,7 @@ impl RpcCallDecoder {
|
||||||
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
|
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
let packet = match decoder(&payload_buf) {
|
let packet = match decoder(payload_slice) {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Failed to parse packet: {e}");
|
error!("Failed to parse packet: {e}");
|
||||||
|
|
Loading…
Reference in New Issue