WIP: Make the MMM parser run in linear time #14
|
@ -1,6 +1,6 @@
|
||||||
# This file is automatically @generated by Cargo.
|
# This file is automatically @generated by Cargo.
|
||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
version = 3
|
version = 4
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "Inflector"
|
name = "Inflector"
|
||||||
|
@ -2028,6 +2028,7 @@ dependencies = [
|
||||||
"miette 7.2.0",
|
"miette 7.2.0",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"quick-xml",
|
"quick-xml",
|
||||||
|
"rand",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rsa",
|
"rsa",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -4197,6 +4198,7 @@ dependencies = [
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -62,6 +62,7 @@ quick-xml = "0.36"
|
||||||
redis = "0.26"
|
redis = "0.26"
|
||||||
regex = "1.9"
|
regex = "1.9"
|
||||||
rmp-serde = "1.3"
|
rmp-serde = "1.3"
|
||||||
|
rand = "0.8"
|
||||||
rsa = "0.9"
|
rsa = "0.9"
|
||||||
reqwest = "0.12"
|
reqwest = "0.12"
|
||||||
sea-orm = "1"
|
sea-orm = "1"
|
||||||
|
@ -112,7 +113,7 @@ headers = { workspace = true }
|
||||||
hyper = { workspace = true, features = ["full"] }
|
hyper = { workspace = true, features = ["full"] }
|
||||||
reqwest = { workspace = true, features = ["hickory-dns"] }
|
reqwest = { workspace = true, features = ["hickory-dns"] }
|
||||||
tokio = { workspace = true, features = ["full"] }
|
tokio = { workspace = true, features = ["full"] }
|
||||||
tokio-stream = { workspace = true }
|
tokio-stream = { workspace = true, features = ["full"] }
|
||||||
tower = { workspace = true }
|
tower = { workspace = true }
|
||||||
tower-http = { workspace = true, features = ["cors", "trace", "fs"] }
|
tower-http = { workspace = true, features = ["cors", "trace", "fs"] }
|
||||||
ulid = { workspace = true }
|
ulid = { workspace = true }
|
||||||
|
|
|
@ -40,6 +40,7 @@ hyper = { workspace = true, features = ["full"] }
|
||||||
percent-encoding = { workspace = true }
|
percent-encoding = { workspace = true }
|
||||||
reqwest = { workspace = true, features = ["stream", "hickory-dns"] }
|
reqwest = { workspace = true, features = ["stream", "hickory-dns"] }
|
||||||
|
|
||||||
|
rand = { workspace = true }
|
||||||
ed25519-dalek = { workspace = true, features = [
|
ed25519-dalek = { workspace = true, features = [
|
||||||
"pem",
|
"pem",
|
||||||
"pkcs8",
|
"pkcs8",
|
||||||
|
|
|
@ -4,6 +4,7 @@ use http::{HeaderMap, HeaderName, HeaderValue, Method};
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use sha2::Digest;
|
use sha2::Digest;
|
||||||
|
use std::fmt::Write;
|
||||||
use std::{fmt::Display, string::FromUtf8Error, sync::Arc};
|
use std::{fmt::Display, string::FromUtf8Error, sync::Arc};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
@ -257,16 +258,17 @@ impl ApClientService for ApClientServiceDefaultProvider {
|
||||||
|
|
||||||
let message = components
|
let message = components
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(k, v)| format!("{}: {}", k.as_ref(), v))
|
.fold(String::new(), |mut acc, (k, v)| {
|
||||||
.collect::<Vec<_>>()
|
writeln!(&mut acc, "{}: {}", k.as_ref(), v).unwrap();
|
||||||
.join("\n");
|
acc
|
||||||
|
});
|
||||||
|
|
||||||
let key_id = signing_key.key_id.clone().into_owned();
|
let key_id = signing_key.key_id.clone().into_owned();
|
||||||
let key = signing_key.into_owned();
|
let key = signing_key.into_owned();
|
||||||
let signature = task::spawn_blocking(move || {
|
let signature = task::spawn_blocking(move || {
|
||||||
key
|
key
|
||||||
.key
|
.key
|
||||||
.sign_base64(signing_algorithm, &message.into_bytes())
|
.sign_base64(signing_algorithm, message.trim_end().as_bytes())
|
||||||
}).await??;
|
}).await??;
|
||||||
|
|
||||||
Ok(ApSignature {
|
Ok(ApSignature {
|
||||||
|
@ -329,7 +331,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
headers.insert(
|
headers.insert(
|
||||||
HeaderName::from_lowercase(b"signature").unwrap(),
|
HeaderName::from_static("signature"),
|
||||||
HeaderValue::try_from(signed.to_string())?,
|
HeaderValue::try_from(signed.to_string())?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -350,10 +352,9 @@ impl ApClientService for ApClientServiceDefaultProvider {
|
||||||
signing_algorithm: SigningAlgorithm,
|
signing_algorithm: SigningAlgorithm,
|
||||||
expires: Option<chrono::DateTime<Utc>>,
|
expires: Option<chrono::DateTime<Utc>>,
|
||||||
url: &str,
|
url: &str,
|
||||||
body: &Value,
|
body_bytes: Vec<u8>,
|
||||||
) -> Result<String, Self::Error> {
|
) -> Result<String, Self::Error> {
|
||||||
let url = url.parse()?;
|
let url = url.parse()?;
|
||||||
let body_bytes = serde_json::to_vec(body)?;
|
|
||||||
// Move in, move out :3
|
// Move in, move out :3
|
||||||
let (digest_raw, body_bytes) = task::spawn_blocking(move || {
|
let (digest_raw, body_bytes) = task::spawn_blocking(move || {
|
||||||
let mut sha = sha2::Sha256::new();
|
let mut sha = sha2::Sha256::new();
|
||||||
|
@ -406,12 +407,12 @@ impl ApClientService for ApClientServiceDefaultProvider {
|
||||||
}
|
}
|
||||||
|
|
||||||
headers.insert(
|
headers.insert(
|
||||||
HeaderName::from_lowercase(b"digest").unwrap(),
|
HeaderName::from_static("digest"),
|
||||||
HeaderValue::try_from(digest_base64)?,
|
HeaderValue::try_from(digest_base64)?,
|
||||||
);
|
);
|
||||||
|
|
||||||
headers.insert(
|
headers.insert(
|
||||||
HeaderName::from_lowercase(b"signature").unwrap(),
|
HeaderName::from_static("signature"),
|
||||||
HeaderValue::try_from(signed.to_string())?,
|
HeaderValue::try_from(signed.to_string())?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
use async_stream::stream;
|
use async_stream::stream;
|
||||||
use futures_util::{select, stream::StreamExt, FutureExt, Stream, TryStreamExt};
|
use futures_util::{stream::StreamExt, Stream, TryStreamExt};
|
||||||
use headers::UserAgent;
|
use headers::UserAgent;
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
use reqwest::{redirect::Policy, Client, RequestBuilder};
|
use reqwest::{redirect::Policy, Client, RequestBuilder};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use std::time::Duration;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::pin;
|
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use magnetar_core::web_model::ContentType;
|
use magnetar_core::web_model::ContentType;
|
||||||
|
@ -58,6 +58,7 @@ impl FederationClient {
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.https_only(force_https)
|
.https_only(force_https)
|
||||||
.redirect(Policy::limited(5))
|
.redirect(Policy::limited(5))
|
||||||
|
.timeout(Duration::from_secs(timeout_seconds))
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
Ok(FederationClient {
|
Ok(FederationClient {
|
||||||
|
@ -144,13 +145,6 @@ impl FederationRequestBuilder<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send(self) -> Result<Vec<u8>, FederationClientError> {
|
pub async fn send(self) -> Result<Vec<u8>, FederationClientError> {
|
||||||
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(
|
|
||||||
self.client.timeout_seconds,
|
|
||||||
))
|
|
||||||
.fuse();
|
|
||||||
tokio::pin!(sleep);
|
|
||||||
|
|
||||||
let body = async move {
|
|
||||||
self.send_stream()
|
self.send_stream()
|
||||||
.await?
|
.await?
|
||||||
.try_fold(Vec::new(), |mut acc, b| async move {
|
.try_fold(Vec::new(), |mut acc, b| async move {
|
||||||
|
@ -159,15 +153,6 @@ impl FederationRequestBuilder<'_> {
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
.fuse();
|
|
||||||
|
|
||||||
pin!(body);
|
|
||||||
|
|
||||||
select! {
|
|
||||||
b = body => b,
|
|
||||||
_ = sleep => Err(FederationClientError::TimeoutError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn json(self) -> Result<Value, FederationClientError> {
|
pub async fn json(self) -> Result<Value, FederationClientError> {
|
||||||
let data = self.send().await?;
|
let data = self.send().await?;
|
||||||
|
|
|
@ -2,7 +2,7 @@ use rsa::pkcs1::DecodeRsaPrivateKey;
|
||||||
use rsa::pkcs1::DecodeRsaPublicKey;
|
use rsa::pkcs1::DecodeRsaPublicKey;
|
||||||
use rsa::pkcs8::DecodePrivateKey;
|
use rsa::pkcs8::DecodePrivateKey;
|
||||||
use rsa::pkcs8::DecodePublicKey;
|
use rsa::pkcs8::DecodePublicKey;
|
||||||
use rsa::signature::Verifier;
|
use rsa::signature::{RandomizedSigner, Verifier};
|
||||||
use rsa::{
|
use rsa::{
|
||||||
sha2::{Sha256, Sha512},
|
sha2::{Sha256, Sha512},
|
||||||
signature::Signer,
|
signature::Signer,
|
||||||
|
@ -268,10 +268,10 @@ impl ApHttpSigningKey<'_> {
|
||||||
) -> Result<Vec<u8>, ApSigningError> {
|
) -> Result<Vec<u8>, ApSigningError> {
|
||||||
match (self, algorithm) {
|
match (self, algorithm) {
|
||||||
(Self::RsaSha256(key), SigningAlgorithm::RsaSha256 | SigningAlgorithm::Hs2019) => {
|
(Self::RsaSha256(key), SigningAlgorithm::RsaSha256 | SigningAlgorithm::Hs2019) => {
|
||||||
Ok(Box::<[u8]>::from(key.sign(message)).into_vec())
|
Ok(Box::<[u8]>::from(key.sign_with_rng(&mut rand::thread_rng(), message)).into_vec())
|
||||||
}
|
}
|
||||||
(Self::RsaSha512(key), SigningAlgorithm::Hs2019) => {
|
(Self::RsaSha512(key), SigningAlgorithm::Hs2019) => {
|
||||||
Ok(Box::<[u8]>::from(key.sign(message)).into_vec())
|
Ok(Box::<[u8]>::from(key.sign_with_rng(&mut rand::thread_rng(), message)).into_vec())
|
||||||
}
|
}
|
||||||
(Self::Ed25519(key), SigningAlgorithm::Hs2019) => {
|
(Self::Ed25519(key), SigningAlgorithm::Hs2019) => {
|
||||||
Ok(key.sign(message).to_bytes().to_vec())
|
Ok(key.sign(message).to_bytes().to_vec())
|
||||||
|
|
|
@ -171,6 +171,6 @@ pub trait ApClientService: Send + Sync {
|
||||||
signing_algorithm: SigningAlgorithm,
|
signing_algorithm: SigningAlgorithm,
|
||||||
expires: Option<chrono::DateTime<Utc>>,
|
expires: Option<chrono::DateTime<Utc>>,
|
||||||
url: &str,
|
url: &str,
|
||||||
body: &Value,
|
body: Vec<u8>,
|
||||||
) -> Result<String, Self::Error>;
|
) -> Result<String, Self::Error>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,10 +74,10 @@ const showTicker =
|
||||||
display: flex;
|
display: flex;
|
||||||
align-items: center;
|
align-items: center;
|
||||||
white-space: nowrap;
|
white-space: nowrap;
|
||||||
justify-self: flex-end;
|
|
||||||
border-radius: 100px;
|
border-radius: 100px;
|
||||||
font-size: 0.8em;
|
font-size: 0.8em;
|
||||||
text-shadow: 0 2px 2px var(--shadow);
|
text-shadow: 0 2px 2px var(--shadow);
|
||||||
|
gap: 0 0.1em;
|
||||||
|
|
||||||
> .avatar {
|
> .avatar {
|
||||||
width: 3.7em;
|
width: 3.7em;
|
||||||
|
@ -99,6 +99,8 @@ const showTicker =
|
||||||
overflow: hidden;
|
overflow: hidden;
|
||||||
text-overflow: ellipsis;
|
text-overflow: ellipsis;
|
||||||
gap: 0.1em 0;
|
gap: 0.1em 0;
|
||||||
|
display: flex;
|
||||||
|
flex-wrap: wrap;
|
||||||
}
|
}
|
||||||
|
|
||||||
&:last-child {
|
&:last-child {
|
||||||
|
@ -151,7 +153,6 @@ const showTicker =
|
||||||
margin: 0 0.5em 0 0;
|
margin: 0 0.5em 0 0;
|
||||||
overflow: hidden;
|
overflow: hidden;
|
||||||
text-overflow: ellipsis;
|
text-overflow: ellipsis;
|
||||||
align-self: flex-start;
|
|
||||||
font-size: 0.9em;
|
font-size: 0.9em;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
v-tooltip="
|
v-tooltip="
|
||||||
capitalize(
|
capitalize(
|
||||||
magTransProperty(instance, 'software_name', 'softwareName') ??
|
magTransProperty(instance, 'software_name', 'softwareName') ??
|
||||||
'?',
|
'?'
|
||||||
)
|
)
|
||||||
"
|
"
|
||||||
ref="ticker"
|
ref="ticker"
|
||||||
|
@ -40,7 +40,7 @@ const instance = props.instance ?? {
|
||||||
name: instanceName,
|
name: instanceName,
|
||||||
themeColor: (
|
themeColor: (
|
||||||
document.querySelector(
|
document.querySelector(
|
||||||
'meta[name="theme-color-orig"]',
|
'meta[name="theme-color-orig"]'
|
||||||
) as HTMLMetaElement
|
) as HTMLMetaElement
|
||||||
)?.content,
|
)?.content,
|
||||||
softwareName: (Instance.softwareName || "Magnetar") as string | null,
|
softwareName: (Instance.softwareName || "Magnetar") as string | null,
|
||||||
|
@ -61,18 +61,18 @@ const bg = {
|
||||||
};
|
};
|
||||||
|
|
||||||
function getInstanceIcon(
|
function getInstanceIcon(
|
||||||
instance?: Misskey.entities.User["instance"] | types.InstanceTicker | null,
|
instance?: Misskey.entities.User["instance"] | types.InstanceTicker | null
|
||||||
): string {
|
): string {
|
||||||
if (!instance) return "/client-assets/dummy.png";
|
if (!instance) return "/client-assets/dummy.png";
|
||||||
|
|
||||||
return (
|
return (
|
||||||
getProxiedImageUrlNullable(
|
getProxiedImageUrlNullable(
|
||||||
magTransProperty(instance, "icon_url", "iconUrl"),
|
magTransProperty(instance, "icon_url", "iconUrl"),
|
||||||
"preview",
|
"preview"
|
||||||
) ??
|
) ??
|
||||||
getProxiedImageUrlNullable(
|
getProxiedImageUrlNullable(
|
||||||
magTransProperty(instance, "favicon_url", "faviconUrl"),
|
magTransProperty(instance, "favicon_url", "faviconUrl"),
|
||||||
"preview",
|
"preview"
|
||||||
) ??
|
) ??
|
||||||
"/client-assets/dummy.png"
|
"/client-assets/dummy.png"
|
||||||
);
|
);
|
||||||
|
@ -90,6 +90,7 @@ function getInstanceIcon(
|
||||||
font-size: 0.8em;
|
font-size: 0.8em;
|
||||||
text-shadow: 0 2px 2px var(--shadow);
|
text-shadow: 0 2px 2px var(--shadow);
|
||||||
overflow: hidden;
|
overflow: hidden;
|
||||||
|
|
||||||
.header > .body & {
|
.header > .body & {
|
||||||
width: max-content;
|
width: max-content;
|
||||||
max-width: 100%;
|
max-width: 100%;
|
||||||
|
@ -108,11 +109,9 @@ function getInstanceIcon(
|
||||||
font-weight: bold;
|
font-weight: bold;
|
||||||
text-overflow: ellipsis;
|
text-overflow: ellipsis;
|
||||||
white-space: nowrap;
|
white-space: nowrap;
|
||||||
text-shadow:
|
text-shadow: -1px -1px 0 var(--bg), 1px -1px 0 var(--bg),
|
||||||
-1px -1px 0 var(--bg),
|
-1px 1px 0 var(--bg), 1px 1px 0 var(--bg);
|
||||||
1px -1px 0 var(--bg),
|
|
||||||
-1px 1px 0 var(--bg),
|
|
||||||
1px 1px 0 var(--bg);
|
|
||||||
.article > .main &,
|
.article > .main &,
|
||||||
.header > .body & {
|
.header > .body & {
|
||||||
display: unset;
|
display: unset;
|
||||||
|
|
|
@ -23,7 +23,7 @@ struct RpcApGet {
|
||||||
struct RpcApPost {
|
struct RpcApPost {
|
||||||
user_id: String,
|
user_id: String,
|
||||||
url: String,
|
url: String,
|
||||||
body: serde_json::Value,
|
body: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_rpc_router() -> MagRpc {
|
pub fn create_rpc_router() -> MagRpc {
|
||||||
|
@ -90,7 +90,7 @@ pub fn create_rpc_router() -> MagRpc {
|
||||||
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
|
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
|
||||||
let result = service
|
let result = service
|
||||||
.ap_client
|
.ap_client
|
||||||
.signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body)
|
.signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, body.into_bytes())
|
||||||
.await?;
|
.await?;
|
||||||
Result::<_, DeliveryError>::Ok(result)
|
Result::<_, DeliveryError>::Ok(result)
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,21 +1,26 @@
|
||||||
use crate::service::MagnetarService;
|
use crate::service::MagnetarService;
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
use futures::{FutureExt, Stream, TryFutureExt, TryStreamExt};
|
||||||
|
use futures_util::future::BoxFuture;
|
||||||
|
use futures_util::stream::FuturesUnordered;
|
||||||
|
use futures_util::{pin_mut, SinkExt, StreamExt};
|
||||||
use miette::{miette, IntoDiagnostic};
|
use miette::{miette, IntoDiagnostic};
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
|
use std::future;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
|
||||||
use tokio::net::{TcpListener, UnixSocket};
|
use tokio::net::{TcpListener, UnixSocket};
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
|
use tokio::time::Instant;
|
||||||
use tracing::{debug, error, info, warn, Instrument};
|
use tracing::{debug, error, info, warn, Instrument};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@ -205,6 +210,8 @@ impl MagRpc {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, remote_addr) = select!(
|
let (stream, remote_addr) = select!(
|
||||||
|
biased;
|
||||||
|
_ = &mut cancel => break,
|
||||||
Some(c) = connections.join_next() => {
|
Some(c) = connections.join_next() => {
|
||||||
debug!("RPC TCP connection closed: {:?}", c);
|
debug!("RPC TCP connection closed: {:?}", c);
|
||||||
continue;
|
continue;
|
||||||
|
@ -217,45 +224,20 @@ impl MagRpc {
|
||||||
|
|
||||||
conn.unwrap()
|
conn.unwrap()
|
||||||
},
|
},
|
||||||
_ = &mut cancel => break
|
|
||||||
);
|
);
|
||||||
|
|
||||||
debug!("RPC TCP connection accepted: {:?}", remote_addr);
|
debug!("RPC TCP connection accepted: {:?}", remote_addr);
|
||||||
|
|
||||||
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
|
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
|
||||||
let (read_half, mut write_half) = stream.into_split();
|
let (read_half, write_half) = stream.into_split();
|
||||||
let buf_read = BufReader::new(read_half);
|
let handler_fut = handle_process(
|
||||||
let context = context.clone();
|
rx_dec.stream_decode(BufReader::new(read_half), cancel_recv),
|
||||||
let rx_dec = rx_dec.clone();
|
BufWriter::new(write_half),
|
||||||
let fut = async move {
|
context.clone(),
|
||||||
let src = rx_dec
|
)
|
||||||
.stream_decode(buf_read, cancel_recv)
|
|
||||||
.map_ok(process(context))
|
|
||||||
.try_buffer_unordered(100)
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
futures::pin_mut!(src);
|
|
||||||
|
|
||||||
while let Some(result) = src.try_next().await? {
|
|
||||||
let Some((serial, RpcResponse(bytes))) = result else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
write_half.write_u8(b'M').await.into_diagnostic()?;
|
|
||||||
write_half.write_u64(serial).await.into_diagnostic()?;
|
|
||||||
write_half
|
|
||||||
.write_u32(bytes.len() as u32)
|
|
||||||
.await
|
|
||||||
.into_diagnostic()?;
|
|
||||||
write_half.write_all(&bytes).await.into_diagnostic()?;
|
|
||||||
write_half.flush().await.into_diagnostic()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(remote_addr)
|
|
||||||
}
|
|
||||||
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
|
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
|
||||||
|
|
||||||
connections.spawn(fut);
|
connections.spawn(handler_fut);
|
||||||
|
|
||||||
cancellation_tokens.push(cancel_send);
|
cancellation_tokens.push(cancel_send);
|
||||||
}
|
}
|
||||||
|
@ -295,6 +277,8 @@ impl MagRpc {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, remote_addr) = select!(
|
let (stream, remote_addr) = select!(
|
||||||
|
biased;
|
||||||
|
_ = &mut cancel => break,
|
||||||
Some(c) = connections.join_next() => {
|
Some(c) = connections.join_next() => {
|
||||||
debug!("RPC Unix connection closed: {:?}", c);
|
debug!("RPC Unix connection closed: {:?}", c);
|
||||||
continue;
|
continue;
|
||||||
|
@ -306,46 +290,21 @@ impl MagRpc {
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.unwrap()
|
conn.unwrap()
|
||||||
},
|
}
|
||||||
_ = &mut cancel => break
|
|
||||||
);
|
);
|
||||||
|
|
||||||
debug!("RPC Unix connection accepted: {:?}", remote_addr);
|
debug!("RPC Unix connection accepted: {:?}", remote_addr);
|
||||||
|
|
||||||
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
|
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
|
||||||
let (read_half, mut write_half) = stream.into_split();
|
let (read_half, write_half) = stream.into_split();
|
||||||
let buf_read = BufReader::new(read_half);
|
let handler_fut = handle_process(
|
||||||
let context = context.clone();
|
rx_dec.stream_decode(BufReader::with_capacity(64 * 1024, read_half), cancel_recv),
|
||||||
let rx_dec = rx_dec.clone();
|
BufWriter::new(write_half),
|
||||||
let fut = async move {
|
context.clone(),
|
||||||
let src = rx_dec
|
)
|
||||||
.stream_decode(buf_read, cancel_recv)
|
|
||||||
.map_ok(process(context))
|
|
||||||
.try_buffer_unordered(100)
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
futures::pin_mut!(src);
|
|
||||||
|
|
||||||
while let Some(result) = src.try_next().await? {
|
|
||||||
let Some((serial, RpcResponse(bytes))) = result else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
write_half.write_u8(b'M').await.into_diagnostic()?;
|
|
||||||
write_half.write_u64(serial).await.into_diagnostic()?;
|
|
||||||
write_half
|
|
||||||
.write_u32(bytes.len() as u32)
|
|
||||||
.await
|
|
||||||
.into_diagnostic()?;
|
|
||||||
write_half.write_all(&bytes).await.into_diagnostic()?;
|
|
||||||
write_half.flush().await.into_diagnostic()?;
|
|
||||||
}
|
|
||||||
|
|
||||||
miette::Result::<()>::Ok(())
|
|
||||||
}
|
|
||||||
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
|
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
|
||||||
|
|
||||||
connections.spawn(fut.boxed());
|
connections.spawn(handler_fut);
|
||||||
|
|
||||||
cancellation_tokens.push(cancel_send);
|
cancellation_tokens.push(cancel_send);
|
||||||
}
|
}
|
||||||
|
@ -363,16 +322,87 @@ impl MagRpc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn write_response(
|
||||||
|
mut buf_write: Pin<&mut BufWriter<impl AsyncWrite>>,
|
||||||
|
serial: u64,
|
||||||
|
result: Option<RpcResponse>,
|
||||||
|
) -> miette::Result<()> {
|
||||||
|
let header = if result.is_some() { b'M' } else { b'F' };
|
||||||
|
buf_write.write_u8(header).await.into_diagnostic()?;
|
||||||
|
buf_write.write_u64(serial).await.into_diagnostic()?;
|
||||||
|
|
||||||
|
let Some(RpcResponse(bytes)) = result else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
buf_write
|
||||||
|
.write_u32(bytes.len() as u32)
|
||||||
|
.await
|
||||||
|
.into_diagnostic()?;
|
||||||
|
buf_write.write_all(&bytes).await.into_diagnostic()?;
|
||||||
|
buf_write.flush().await.into_diagnostic()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_process(
|
||||||
|
task_stream: impl Stream<Item=miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static,
|
||||||
|
mut buf_write: BufWriter<impl AsyncWrite + Unpin>,
|
||||||
|
context: Arc<MagnetarService>,
|
||||||
|
) -> miette::Result<()> {
|
||||||
|
let results = FuturesUnordered::new();
|
||||||
|
pin_mut!(results);
|
||||||
|
|
||||||
|
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||||
|
pin_mut!(rx);
|
||||||
|
let input_stream = tokio::spawn(
|
||||||
|
task_stream
|
||||||
|
.map_ok(process(context))
|
||||||
|
.boxed()
|
||||||
|
.forward(tx.sink_map_err(|e| miette!(e)))
|
||||||
|
);
|
||||||
|
pin_mut!(input_stream);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select!(
|
||||||
|
biased;
|
||||||
|
_ = &mut input_stream => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Some(task) = rx.next() => {
|
||||||
|
results.push(task);
|
||||||
|
}
|
||||||
|
Some(res) = results.next() => {
|
||||||
|
let (serial, result) = res?;
|
||||||
|
write_response(Pin::new(&mut buf_write), serial, result)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
else => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn process(
|
fn process(
|
||||||
context: Arc<MagnetarService>,
|
context: Arc<MagnetarService>,
|
||||||
) -> impl Fn(
|
) -> impl Fn(
|
||||||
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
|
(u64, MessageRaw, Arc<MagRpcHandlerMapped>),
|
||||||
) -> Pin<
|
) -> BoxFuture<'static, miette::Result<(u64, Option<RpcResponse>)>> {
|
||||||
Box<dyn Future<Output = miette::Result<Option<(u64, RpcResponse)>>> + Send + 'static>,
|
|
||||||
> {
|
|
||||||
move |(serial, payload, listener)| {
|
move |(serial, payload, listener)| {
|
||||||
let ctx = context.clone();
|
let ctx = context.clone();
|
||||||
tokio::task::spawn(async move { Some((serial, listener(ctx, payload).await?)) })
|
tokio::task::spawn(async move {
|
||||||
|
let start = Instant::now();
|
||||||
|
let res = listener(ctx, payload).await;
|
||||||
|
let took = start.elapsed();
|
||||||
|
// TODO: Extract this into a config
|
||||||
|
if took.as_secs_f64() > 50.0 {
|
||||||
|
warn!("Handler took long: {} sec", took.as_secs_f64());
|
||||||
|
}
|
||||||
|
|
||||||
|
(serial, res)
|
||||||
|
}.instrument(tracing::info_span!("Request", ?serial)))
|
||||||
.map_err(|e| miette!(e))
|
.map_err(|e| miette!(e))
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
@ -385,9 +415,9 @@ struct RpcCallDecoder {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpcCallDecoder {
|
impl RpcCallDecoder {
|
||||||
fn stream_decode<R: AsyncRead + AsyncReadExt + Unpin + Send + 'static>(
|
fn stream_decode(
|
||||||
&self,
|
&self,
|
||||||
mut buf_read: BufReader<R>,
|
mut buf_read: impl AsyncBufRead + Send + Unpin + 'static,
|
||||||
mut cancel: tokio::sync::oneshot::Receiver<()>,
|
mut cancel: tokio::sync::oneshot::Receiver<()>,
|
||||||
) -> impl Stream<Item=miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static
|
) -> impl Stream<Item=miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static
|
||||||
{
|
{
|
||||||
|
@ -450,8 +480,9 @@ impl RpcCallDecoder {
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some((serial, name_slice, payload_slice)) = select! {
|
let Some((serial, name_slice, payload_slice)) = select! {
|
||||||
read_result = read_fut => read_result,
|
biased;
|
||||||
_ = &mut cancel => { break; }
|
_ = &mut cancel => { break; }
|
||||||
|
read_result = read_fut => read_result,
|
||||||
}? else {
|
}? else {
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue