Compare commits
3 Commits
52cced9537
...
2acc41587a
Author | SHA1 | Date |
---|---|---|
Natty | 2acc41587a | |
Natty | 56f80f290a | |
Natty | d598387795 |
|
@ -455,16 +455,16 @@ checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
|
|||
|
||||
[[package]]
|
||||
name = "cached"
|
||||
version = "0.53.1"
|
||||
version = "0.54.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4d73155ae6b28cf5de4cfc29aeb02b8a1c6dab883cb015d15cd514e42766846"
|
||||
checksum = "9718806c4a2fe9e8a56fd736f97b340dd10ed1be8ed733ed50449f351dc33cae"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"cached_proc_macro",
|
||||
"cached_proc_macro_types",
|
||||
"hashbrown 0.14.5",
|
||||
"once_cell",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
|
@ -1316,7 +1316,7 @@ dependencies = [
|
|||
"ipnet",
|
||||
"once_cell",
|
||||
"rand",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"tinyvec",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
@ -1339,7 +1339,7 @@ dependencies = [
|
|||
"rand",
|
||||
"resolv-conf",
|
||||
"smallvec",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
@ -1803,7 +1803,7 @@ checksum = "062c875482ccb676fd40c804a40e3824d4464c18c364547456d1c8e8e951ae47"
|
|||
dependencies = [
|
||||
"miette 5.10.0",
|
||||
"nom",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1928,7 +1928,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
"strum",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"toml",
|
||||
|
@ -1950,7 +1950,7 @@ dependencies = [
|
|||
"either",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
@ -1971,7 +1971,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"tera",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"tokio",
|
||||
"toml",
|
||||
"tower",
|
||||
|
@ -1990,7 +1990,7 @@ dependencies = [
|
|||
"magnetar_sdk",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"toml",
|
||||
"url",
|
||||
]
|
||||
|
@ -2033,7 +2033,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"sha2",
|
||||
"strum",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
|
@ -2082,7 +2082,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_json",
|
||||
"strum",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
|
@ -2109,7 +2109,7 @@ dependencies = [
|
|||
"magnetar_core",
|
||||
"magnetar_sdk",
|
||||
"miette 7.2.0",
|
||||
"thiserror",
|
||||
"thiserror 2.0.3",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
@ -2191,7 +2191,7 @@ checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e"
|
|||
dependencies = [
|
||||
"miette-derive 5.10.0",
|
||||
"once_cell",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"unicode-width",
|
||||
]
|
||||
|
||||
|
@ -2211,7 +2211,7 @@ dependencies = [
|
|||
"supports-unicode",
|
||||
"terminal_size",
|
||||
"textwrap",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"unicode-width",
|
||||
]
|
||||
|
||||
|
@ -2579,7 +2579,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "879952a81a83930934cbf1786752d6dedc3b1f29e8f8fb2ad1d0a36f377cf442"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"ucd-trie",
|
||||
]
|
||||
|
||||
|
@ -3227,7 +3227,7 @@ dependencies = [
|
|||
"serde_json",
|
||||
"sqlx",
|
||||
"strum",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"time",
|
||||
"tracing",
|
||||
"url",
|
||||
|
@ -3326,7 +3326,7 @@ dependencies = [
|
|||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -3647,7 +3647,7 @@ dependencies = [
|
|||
"sha2",
|
||||
"smallvec",
|
||||
"sqlformat",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"time",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
@ -3736,7 +3736,7 @@ dependencies = [
|
|||
"smallvec",
|
||||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"time",
|
||||
"tracing",
|
||||
"uuid",
|
||||
|
@ -3780,7 +3780,7 @@ dependencies = [
|
|||
"smallvec",
|
||||
"sqlx-core",
|
||||
"stringprep",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"time",
|
||||
"tracing",
|
||||
"uuid",
|
||||
|
@ -4037,7 +4037,16 @@ version = "1.0.69"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
|
||||
dependencies = [
|
||||
"thiserror-impl",
|
||||
"thiserror-impl 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "2.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c006c85c7651b3cf2ada4584faa36773bd07bac24acfb39f3c431b36d7e667aa"
|
||||
dependencies = [
|
||||
"thiserror-impl 2.0.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -4051,6 +4060,17 @@ dependencies = [
|
|||
"syn 2.0.87",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror-impl"
|
||||
version = "2.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f077553d607adc1caf65430528a576c757a71ed73944b66ebb58ef2bbd243568"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.87",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.8"
|
||||
|
@ -4243,9 +4263,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "tower-http"
|
||||
version = "0.5.2"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
|
||||
checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"bytes",
|
||||
|
@ -4353,7 +4373,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "fc2cae1fc5d05d47aa24b64f9a4f7cba24cdc9187a2084dd97ac57bef5eccae6"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"thiserror",
|
||||
"thiserror 1.0.69",
|
||||
"ts-rs-macros",
|
||||
]
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ axum = "0.7"
|
|||
axum-extra = "0.9"
|
||||
base64 = "0.22"
|
||||
bytes = "1.7"
|
||||
cached = "0.53"
|
||||
cached = "0.54"
|
||||
cfg-if = "1"
|
||||
chrono = "0.4"
|
||||
compact_str = "0.8"
|
||||
|
@ -73,13 +73,13 @@ sha2 = "0.10"
|
|||
smallvec = "1.13"
|
||||
strum = "0.26"
|
||||
tera = { version = "1", default-features = false }
|
||||
thiserror = "1"
|
||||
thiserror = "2"
|
||||
tokio = "1.24"
|
||||
tokio-util = "0.7"
|
||||
tokio-stream = "0.1"
|
||||
toml = "0.8"
|
||||
tower = "0.5"
|
||||
tower-http = "0.5"
|
||||
tower-http = "0.6"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
ts-rs = "7"
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use magnetar_federation::crypto::SigningAlgorithm;
|
||||
use proto::{MagRpc, RpcMessage};
|
||||
use serde::Deserialize;
|
||||
use tracing::{debug, info};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{
|
||||
model::{processing::note::NoteModel, PackingContext},
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::service::MagnetarService;
|
||||
use futures::{FutureExt, Stream, StreamExt};
|
||||
use miette::{miette, IntoDiagnostic};
|
||||
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use miette::{miette, Error, IntoDiagnostic};
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::any::Any;
|
||||
|
@ -218,21 +218,14 @@ impl MagRpc {
|
|||
let fut = async move {
|
||||
let src = rx_dec
|
||||
.stream_decode(buf_read, cancel_recv)
|
||||
.filter_map(|r| async move {
|
||||
if let Err(e) = &r {
|
||||
error!("Stream decoding error: {e}");
|
||||
}
|
||||
|
||||
r.ok()
|
||||
})
|
||||
.map(process(context))
|
||||
.buffer_unordered(100)
|
||||
.map_ok(process(context))
|
||||
.try_buffer_unordered(100)
|
||||
.boxed();
|
||||
|
||||
futures::pin_mut!(src);
|
||||
|
||||
while let Some(result) = src.next().await {
|
||||
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
|
||||
while let Some(result) = src.try_next().await? {
|
||||
let Some((serial, RpcResponse(bytes))) = result else {
|
||||
continue;
|
||||
};
|
||||
|
||||
|
@ -315,21 +308,14 @@ impl MagRpc {
|
|||
let fut = async move {
|
||||
let src = rx_dec
|
||||
.stream_decode(buf_read, cancel_recv)
|
||||
.filter_map(|r| async move {
|
||||
if let Err(e) = &r {
|
||||
error!("Stream decoding error: {e}");
|
||||
}
|
||||
|
||||
r.ok()
|
||||
})
|
||||
.map(process(context))
|
||||
.buffer_unordered(100)
|
||||
.map_ok(process(context))
|
||||
.try_buffer_unordered(100)
|
||||
.boxed();
|
||||
|
||||
futures::pin_mut!(src);
|
||||
|
||||
while let Some(result) = src.next().await {
|
||||
let Ok(Some((serial, RpcResponse(bytes)))) = result else {
|
||||
while let Some(result) = src.try_next().await? {
|
||||
let Some((serial, RpcResponse(bytes))) = result else {
|
||||
continue;
|
||||
};
|
||||
|
||||
|
@ -370,11 +356,13 @@ fn process(
|
|||
) -> impl Fn(
|
||||
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
|
||||
) -> Pin<
|
||||
Box<dyn Future<Output = Result<Option<(u64, RpcResponse)>, JoinError>> + Send + 'static>,
|
||||
Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
|
||||
> {
|
||||
move |(serial, payload, listener)| {
|
||||
let ctx = context.clone();
|
||||
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) }).boxed()
|
||||
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) })
|
||||
.map_err(|e| miette!(e))
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -396,6 +384,8 @@ impl RpcCallDecoder {
|
|||
|
||||
async_stream::try_stream! {
|
||||
let mut messages = 0usize;
|
||||
let mut name_buf = vec![0; 128];
|
||||
let mut payload_buf = vec![0; 1024];
|
||||
|
||||
loop {
|
||||
let read_fut = async {
|
||||
|
@ -429,7 +419,7 @@ impl RpcCallDecoder {
|
|||
)).into_diagnostic();
|
||||
}
|
||||
|
||||
let mut name_buf = vec![0; name_len];
|
||||
name_buf.resize(name_len, 0);
|
||||
buf_read.read_exact(&mut name_buf).await.into_diagnostic()?;
|
||||
|
||||
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
|
||||
|
@ -441,20 +431,20 @@ impl RpcCallDecoder {
|
|||
)).into_diagnostic();
|
||||
}
|
||||
|
||||
let mut payload_buf = vec![0; payload_len];
|
||||
payload_buf.resize(payload_len, 0);
|
||||
buf_read.read_exact(&mut payload_buf).await.into_diagnostic()?;
|
||||
|
||||
miette::Result::<_>::Ok(Some((serial, name_buf, payload_buf)))
|
||||
miette::Result::<_>::Ok(Some((serial, &name_buf[..], &payload_buf[..])))
|
||||
};
|
||||
|
||||
let Some((serial, name_buf, payload_buf)) = select! {
|
||||
let Some((serial, name_slice, payload_slice)) = select! {
|
||||
read_result = read_fut => read_result,
|
||||
_ = &mut cancel => { break; }
|
||||
}? else {
|
||||
break;
|
||||
};
|
||||
|
||||
let name = std::str::from_utf8(&name_buf).into_diagnostic()?;
|
||||
let name = std::str::from_utf8(name_slice).into_diagnostic()?;
|
||||
|
||||
let decoder = decoders
|
||||
.get(name)
|
||||
|
@ -465,7 +455,7 @@ impl RpcCallDecoder {
|
|||
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
|
||||
.clone();
|
||||
|
||||
let packet = match decoder(&payload_buf) {
|
||||
let packet = match decoder(payload_slice) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
error!("Failed to parse packet: {e}");
|
||||
|
|
Loading…
Reference in New Issue