Compare commits

..

No commits in common. "5fb85e0db6e7f0c5cb7079673ec080d277932d49" and "611afc591c855ed4e971b4d2662bd5345c03f484" have entirely different histories.

32 changed files with 385 additions and 1414 deletions

103
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 3
[[package]] [[package]]
name = "Inflector" name = "Inflector"
@ -1144,10 +1144,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"js-sys",
"libc", "libc",
"wasi", "wasi",
"wasm-bindgen",
] ]
[[package]] [[package]]
@ -1695,17 +1693,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "kdl"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "062c875482ccb676fd40c804a40e3824d4464c18c364547456d1c8e8e951ae47"
dependencies = [
"miette 5.10.0",
"nom",
"thiserror",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.5.0" version = "1.5.0"
@ -1782,7 +1769,6 @@ dependencies = [
"async-stream", "async-stream",
"axum", "axum",
"axum-extra", "axum-extra",
"bytes",
"cached", "cached",
"cfg-if", "cfg-if",
"chrono", "chrono",
@ -1793,8 +1779,8 @@ dependencies = [
"futures-util", "futures-util",
"headers", "headers",
"hyper", "hyper",
"idna 1.0.2",
"itertools", "itertools",
"kdl",
"lru", "lru",
"magnetar_common", "magnetar_common",
"magnetar_core", "magnetar_core",
@ -1802,13 +1788,12 @@ dependencies = [
"magnetar_host_meta", "magnetar_host_meta",
"magnetar_model", "magnetar_model",
"magnetar_nodeinfo", "magnetar_nodeinfo",
"magnetar_runtime",
"magnetar_sdk", "magnetar_sdk",
"magnetar_webfinger", "magnetar_webfinger",
"miette 7.2.0", "miette",
"percent-encoding", "percent-encoding",
"quick-xml", "quick-xml",
"rmp-serde", "regex",
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
@ -1821,7 +1806,6 @@ dependencies = [
"tower-http", "tower-http",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"ulid",
"unicode-segmentation", "unicode-segmentation",
"url", "url",
] ]
@ -1851,7 +1835,7 @@ dependencies = [
"headers", "headers",
"hyper", "hyper",
"magnetar_common", "magnetar_common",
"miette 7.2.0", "miette",
"percent-encoding", "percent-encoding",
"serde", "serde",
"serde_json", "serde_json",
@ -1909,7 +1893,7 @@ dependencies = [
"magnetar_core", "magnetar_core",
"magnetar_host_meta", "magnetar_host_meta",
"magnetar_webfinger", "magnetar_webfinger",
"miette 7.2.0", "miette",
"percent-encoding", "percent-encoding",
"quick-xml", "quick-xml",
"reqwest", "reqwest",
@ -1945,7 +1929,6 @@ dependencies = [
"nom_locate", "nom_locate",
"quick-xml", "quick-xml",
"serde", "serde",
"smallvec",
"strum", "strum",
"tracing", "tracing",
"unicode-segmentation", "unicode-segmentation",
@ -1983,22 +1966,6 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "magnetar_runtime"
version = "0.3.0-alpha"
dependencies = [
"either",
"futures-channel",
"futures-core",
"futures-util",
"itertools",
"magnetar_core",
"magnetar_sdk",
"miette 7.2.0",
"thiserror",
"tracing",
]
[[package]] [[package]]
name = "magnetar_sdk" name = "magnetar_sdk"
version = "0.3.0-alpha" version = "0.3.0-alpha"
@ -2063,18 +2030,6 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miette"
version = "5.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e"
dependencies = [
"miette-derive 5.10.0",
"once_cell",
"thiserror",
"unicode-width",
]
[[package]] [[package]]
name = "miette" name = "miette"
version = "7.2.0" version = "7.2.0"
@ -2084,7 +2039,7 @@ dependencies = [
"backtrace", "backtrace",
"backtrace-ext", "backtrace-ext",
"cfg-if", "cfg-if",
"miette-derive 7.2.0", "miette-derive",
"owo-colors", "owo-colors",
"supports-color", "supports-color",
"supports-hyperlinks", "supports-hyperlinks",
@ -2095,17 +2050,6 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "miette-derive"
version = "5.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
]
[[package]] [[package]]
name = "miette-derive" name = "miette-derive"
version = "7.2.0" version = "7.2.0"
@ -2878,28 +2822,6 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "rmp"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]] [[package]]
name = "rsa" name = "rsa"
version = "0.9.6" version = "0.9.6"
@ -4293,17 +4215,6 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "ulid"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289"
dependencies = [
"getrandom",
"rand",
"web-time",
]
[[package]] [[package]]
name = "unic-char-property" name = "unic-char-property"
version = "0.9.0" version = "0.9.0"

View File

@ -15,7 +15,6 @@ members = [
"ext_model", "ext_model",
"fe_calckey", "fe_calckey",
"magnetar_common", "magnetar_common",
"magnetar_runtime",
"magnetar_sdk", "magnetar_sdk",
"magnetar_mmm_parser", "magnetar_mmm_parser",
"core", "core",
@ -31,7 +30,6 @@ async-stream = "0.3"
axum = "0.7" axum = "0.7"
axum-extra = "0.9" axum-extra = "0.9"
base64 = "0.22" base64 = "0.22"
bytes = "1.7"
cached = "0.53" cached = "0.53"
cfg-if = "1" cfg-if = "1"
chrono = "0.4" chrono = "0.4"
@ -51,7 +49,6 @@ hyper = "1.1"
idna = "1" idna = "1"
indexmap = "2.2" indexmap = "2.2"
itertools = "0.13" itertools = "0.13"
kdl = "4"
lru = "0.12" lru = "0.12"
miette = "7" miette = "7"
nom = "7" nom = "7"
@ -61,7 +58,6 @@ priority-queue = "2.0"
quick-xml = "0.36" quick-xml = "0.36"
redis = "0.26" redis = "0.26"
regex = "1.9" regex = "1.9"
rmp-serde = "1.3"
rsa = "0.9" rsa = "0.9"
reqwest = "0.12" reqwest = "0.12"
sea-orm = "1" sea-orm = "1"
@ -83,7 +79,6 @@ tower-http = "0.5"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
ts-rs = "7" ts-rs = "7"
ulid = "1"
unicode-segmentation = "1.10" unicode-segmentation = "1.10"
url = "2.3" url = "2.3"
walkdir = "2.3" walkdir = "2.3"
@ -96,7 +91,6 @@ magnetar_host_meta = { path = "./ext_host_meta" }
magnetar_webfinger = { path = "./ext_webfinger" } magnetar_webfinger = { path = "./ext_webfinger" }
magnetar_nodeinfo = { path = "./ext_nodeinfo" } magnetar_nodeinfo = { path = "./ext_nodeinfo" }
magnetar_model = { path = "./ext_model" } magnetar_model = { path = "./ext_model" }
magnetar_runtime = { path = "./magnetar_runtime" }
magnetar_sdk = { path = "./magnetar_sdk" } magnetar_sdk = { path = "./magnetar_sdk" }
cached = { workspace = true } cached = { workspace = true }
@ -114,15 +108,16 @@ tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true } tokio-stream = { workspace = true }
tower = { workspace = true } tower = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace", "fs"] } tower-http = { workspace = true, features = ["cors", "trace", "fs"] }
ulid = { workspace = true }
url = { workspace = true } url = { workspace = true }
idna = { workspace = true }
regex = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] } tracing-subscriber = { workspace = true, features = ["env-filter"] }
tracing = { workspace = true } tracing = { workspace = true }
cfg-if = { workspace = true } cfg-if = { workspace = true }
bytes = { workspace = true }
compact_str = { workspace = true } compact_str = { workspace = true }
either = { workspace = true } either = { workspace = true }
futures = { workspace = true } futures = { workspace = true }
@ -134,8 +129,6 @@ thiserror = { workspace = true }
percent-encoding = { workspace = true } percent-encoding = { workspace = true }
kdl = { workspace = true }
rmp-serde = { workspace = true }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true } serde_json = { workspace = true }
serde_urlencoded = { workspace = true } serde_urlencoded = { workspace = true }

1
config/.gitignore vendored
View File

@ -1,4 +1,3 @@
* *
!.gitignore !.gitignore
!default.toml !default.toml
!default-vars.kdl

View File

@ -1,34 +0,0 @@
cache {
local-user-cache {
// Size is unlimited
lifetime 300min
}
emoji-cache {
size 4096
}
remote-instance-cache {
size 256
lifetime 100s
}
drive-file-cache {
size 128
lifetime 10s
}
}
api-model {
note {
buffer 10
}
notification {
buffer 10
}
}
activity-pub {
user-agent "magnetar/$version ($host)"
}

View File

@ -59,20 +59,6 @@
# Environment variable: MAG_C_PROXY_REMOTE_FILES # Environment variable: MAG_C_PROXY_REMOTE_FILES
# networking.proxy_remote_files = false # networking.proxy_remote_files = false
# ------------------------------[ RPC CONNECTION ]-----------------------------
# [Optional]
# A type of connection to use for the application's internal RPC
# Possible values: "none", "tcp", "unix"
# Default: "none"
# Environment variable: MAG_C_RPC_CONNECTION_TYPE
# rpc.connection_type = "none"
# [Optional]
# The corresponding bind address (or path for Unix-domain sockets) for the internal RPC
# Default: ""
# Environment variable: MAG_C_RPC_BIND_ADDR
# rpc.bind_addr = ""
# -----------------------------[ CALCKEY FRONTEND ]---------------------------- # -----------------------------[ CALCKEY FRONTEND ]----------------------------
@ -97,6 +83,7 @@
# -------------------------------[ FEDERATION ]-------------------------------- # -------------------------------[ FEDERATION ]--------------------------------
# --------------------------------[ BRANDING ]--------------------------------- # --------------------------------[ BRANDING ]---------------------------------
# [Optional] # [Optional]

View File

@ -11,21 +11,13 @@ use url::Url;
use magnetar_core::web_model::content_type::ContentActivityStreams; use magnetar_core::web_model::content_type::ContentActivityStreams;
use crate::{ use crate::{
client::federation_client::{FederationClient, FederationClientError}, ApClientService,
crypto::{ApSigningError, ApSigningKey, SigningAlgorithm}, ApSignature,
ApClientService, ApSignature, ApSigningField, ApSigningHeaders, SigningInput, SigningParts, ApSigningField, ApSigningHeaders, client::federation_client::{FederationClient, FederationClientError}, crypto::{ApSigningError, ApSigningKey, SigningAlgorithm}, SigningInput, SigningParts,
}; };
pub struct ApClientServiceDefaultProvider { pub struct ApClientServiceDefaultProvider {
client: Arc<dyn AsRef<FederationClient> + Send + Sync>, client: Arc<FederationClient>,
}
impl ApClientServiceDefaultProvider {
pub fn new(client: impl AsRef<FederationClient> + Send + Sync + 'static) -> Self {
Self {
client: Arc::new(client),
}
}
} }
impl Display for ApSignature { impl Display for ApSignature {
@ -245,7 +237,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
&self, &self,
signing_key: ApSigningKey<'_>, signing_key: ApSigningKey<'_>,
signing_algorithm: SigningAlgorithm, signing_algorithm: SigningAlgorithm,
request: &dyn SigningInput, request: impl SigningInput,
) -> Result<ApSignature, Self::Error> { ) -> Result<ApSignature, Self::Error> {
let components = request.create_signing_input(); let components = request.create_signing_input();
@ -285,7 +277,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::RsaSha256 => self.sign_request( SigningAlgorithm::RsaSha256 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
&SigningInputGetRsaSha256 { SigningInputGetRsaSha256 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::GET, method: Method::GET,
@ -298,7 +290,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::Hs2019 => self.sign_request( SigningAlgorithm::Hs2019 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
&SigningInputGetHs2019 { SigningInputGetHs2019 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::GET, method: Method::GET,
@ -326,8 +318,6 @@ impl ApClientService for ApClientServiceDefaultProvider {
Ok(self Ok(self
.client .client
.as_ref()
.as_ref()
.get(url) .get(url)
.accept(ContentActivityStreams) .accept(ContentActivityStreams)
.headers(headers) .headers(headers)
@ -355,7 +345,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::RsaSha256 => self.sign_request( SigningAlgorithm::RsaSha256 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
&SigningInputPostRsaSha256 { SigningInputPostRsaSha256 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::POST, method: Method::POST,
@ -369,7 +359,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::Hs2019 => self.sign_request( SigningAlgorithm::Hs2019 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
&SigningInputPostHs2019 { SigningInputPostHs2019 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::POST, method: Method::POST,
@ -404,8 +394,6 @@ impl ApClientService for ApClientServiceDefaultProvider {
Ok(self Ok(self
.client .client
.as_ref()
.as_ref()
.builder(Method::POST, url) .builder(Method::POST, url)
.accept(ContentActivityStreams) .accept(ContentActivityStreams)
.content_type(ContentActivityStreams) .content_type(ContentActivityStreams)
@ -426,9 +414,9 @@ mod test {
use crate::{ use crate::{
ap_client::ApClientServiceDefaultProvider, ap_client::ApClientServiceDefaultProvider,
ApClientService,
client::federation_client::FederationClient, client::federation_client::FederationClient,
crypto::{ApHttpPrivateKey, SigningAlgorithm}, crypto::{ApHttpPrivateKey, SigningAlgorithm},
ApClientService,
}; };
#[tokio::test] #[tokio::test]
@ -439,7 +427,7 @@ mod test {
let rsa_key = rsa::RsaPrivateKey::from_pkcs8_pem(key.trim()).into_diagnostic()?; let rsa_key = rsa::RsaPrivateKey::from_pkcs8_pem(key.trim()).into_diagnostic()?;
let ap_client = ApClientServiceDefaultProvider { let ap_client = ApClientServiceDefaultProvider {
client: Arc::new(Box::new( client: Arc::new(
FederationClient::new( FederationClient::new(
true, true,
128_000, 128_000,
@ -447,12 +435,12 @@ mod test {
UserAgent::from_static("magnetar/0.42 (https://astolfo.social)"), UserAgent::from_static("magnetar/0.42 (https://astolfo.social)"),
) )
.into_diagnostic()?, .into_diagnostic()?,
)), ),
}; };
let val = ap_client let val = ap_client
.signed_get( .signed_get(
ApHttpPrivateKey::Rsa(Cow::Owned(Box::new(rsa_key))) ApHttpPrivateKey::Rsa(Box::new(Cow::Owned(rsa_key)))
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256) .create_signing_key(&key_id, SigningAlgorithm::RsaSha256)
.into_diagnostic()?, .into_diagnostic()?,
SigningAlgorithm::RsaSha256, SigningAlgorithm::RsaSha256,

View File

@ -1,16 +1,12 @@
use rsa::pkcs1::DecodeRsaPrivateKey; use std::{borrow::Cow, fmt::Display};
use rsa::pkcs1::DecodeRsaPublicKey;
use rsa::pkcs8::DecodePrivateKey;
use rsa::pkcs8::DecodePublicKey;
use rsa::signature::Verifier; use rsa::signature::Verifier;
use rsa::{ use rsa::{
sha2::{Sha256, Sha512}, sha2::{Sha256, Sha512},
signature::Signer, signature::Signer,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::Formatter;
use std::str::FromStr;
use std::{borrow::Cow, fmt::Display};
use strum::AsRefStr; use strum::AsRefStr;
use thiserror::Error; use thiserror::Error;
@ -39,7 +35,7 @@ pub enum SigningAlgorithm {
} }
impl Display for SigningAlgorithm { impl Display for SigningAlgorithm {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
Self::Hs2019 => write!(f, "hs2019"), Self::Hs2019 => write!(f, "hs2019"),
Self::RsaSha256 => write!(f, "rsa-sha256"), Self::RsaSha256 => write!(f, "rsa-sha256"),
@ -65,51 +61,6 @@ pub enum ApHttpPublicKey<'a> {
Ed25519(Cow<'a, ed25519_dalek::VerifyingKey>), Ed25519(Cow<'a, ed25519_dalek::VerifyingKey>),
} }
#[derive(Debug, Copy, Clone, Error)]
#[error("Failed to parse the public key: No available parser could decode the PEM string")]
pub struct ApHttpPublicKeyParseError;
impl FromStr for ApHttpPublicKey<'_> {
type Err = ApHttpPublicKeyParseError;
fn from_str(input_pem: &str) -> Result<Self, Self::Err> {
let pem = input_pem.trim();
let parse_pkcs1_rsa: &dyn Fn(_) -> _ = &|p| {
Some(ApHttpPublicKey::Rsa(Cow::Owned(
rsa::RsaPublicKey::from_pkcs1_pem(p).ok()?,
)))
};
let parse_spki_rsa: &dyn Fn(_) -> _ = &|p| {
Some(ApHttpPublicKey::Rsa(Cow::Owned(
rsa::RsaPublicKey::from_public_key_pem(p).ok()?,
)))
};
let parse_spki_ed25519: &dyn Fn(_) -> _ = &|p| {
Some(ApHttpPublicKey::Ed25519(Cow::Owned(
ed25519_dalek::VerifyingKey::from_public_key_pem(p).ok()?,
)))
};
// Some heuristics
let parsers: &[_] = match pem {
p if p.starts_with("-----BEGIN PUBLIC KEY-----") => {
&[parse_spki_rsa, parse_spki_ed25519]
}
p if p.starts_with("-----BEGIN RSA PUBLIC KEY-----") => &[parse_pkcs1_rsa],
_ => &[parse_spki_rsa, parse_spki_ed25519, parse_pkcs1_rsa],
};
for parser in parsers {
if let Some(k) = parser(pem) {
return Ok(k);
}
}
Err(ApHttpPublicKeyParseError)
}
}
impl ApHttpVerificationKey<'_> { impl ApHttpVerificationKey<'_> {
pub fn verify(&self, message: &[u8], signature: &[u8]) -> Result<(), ApVerificationError> { pub fn verify(&self, message: &[u8], signature: &[u8]) -> Result<(), ApVerificationError> {
match self { match self {
@ -158,10 +109,12 @@ impl ApHttpPublicKey<'_> {
)); ));
Ok(verification_key.verify(message, signature)?) Ok(verification_key.verify(message, signature)?)
} }
(_, SigningAlgorithm::RsaSha256) => Err(ApVerificationError::KeyAlgorithmMismatch( (_, SigningAlgorithm::RsaSha256) => {
return Err(ApVerificationError::KeyAlgorithmMismatch(
algorithm, algorithm,
self.as_ref().to_owned(), self.as_ref().to_owned(),
)), ));
}
(Self::Ed25519(key), SigningAlgorithm::Hs2019) => { (Self::Ed25519(key), SigningAlgorithm::Hs2019) => {
let verification_key = ApHttpVerificationKey::Ed25519(Cow::Borrowed(key.as_ref())); let verification_key = ApHttpVerificationKey::Ed25519(Cow::Borrowed(key.as_ref()));
Ok(verification_key.verify(message, signature)?) Ok(verification_key.verify(message, signature)?)
@ -173,64 +126,17 @@ impl ApHttpPublicKey<'_> {
#[derive(Debug, Clone, AsRefStr)] #[derive(Debug, Clone, AsRefStr)]
pub enum ApHttpPrivateKey<'a> { pub enum ApHttpPrivateKey<'a> {
#[strum(serialize = "rsa")] #[strum(serialize = "rsa")]
Rsa(Cow<'a, Box<rsa::RsaPrivateKey>>), Rsa(Box<Cow<'a, rsa::RsaPrivateKey>>),
#[strum(serialize = "ed25519")] #[strum(serialize = "ed25519")]
Ed25519(Cow<'a, ed25519_dalek::SecretKey>), Ed25519(Cow<'a, ed25519_dalek::SecretKey>),
} }
#[derive(Debug, Copy, Clone, Error)]
#[error("Failed to parse the private key: No available parser could decode the PEM string")]
pub struct ApHttpPrivateKeyParseError;
impl FromStr for ApHttpPrivateKey<'_> {
type Err = ApHttpPrivateKeyParseError;
fn from_str(input_pem: &str) -> Result<Self, Self::Err> {
let pem = input_pem.trim();
let parse_pkcs1_rsa: &dyn Fn(_) -> _ = &|p| {
Some(ApHttpPrivateKey::Rsa(Cow::Owned(Box::new(
rsa::RsaPrivateKey::from_pkcs1_pem(p).ok()?,
))))
};
let parse_pkcs8_rsa: &dyn Fn(_) -> _ = &|p| {
Some(ApHttpPrivateKey::Rsa(Cow::Owned(Box::new(
rsa::RsaPrivateKey::from_pkcs8_pem(p).ok()?,
))))
};
let parse_pkcs8_ed25519: &dyn Fn(_) -> _ = &|p| {
Some(ApHttpPrivateKey::Ed25519(Cow::Owned(
ed25519_dalek::SigningKey::from_pkcs8_pem(p)
.ok()?
.to_bytes(),
)))
};
// Some heuristics
let parsers: &[_] = match pem {
p if p.contains("-----BEGIN PRIVATE KEY-----") => {
&[parse_pkcs8_rsa, parse_pkcs8_ed25519]
}
p if p.contains("-----BEGIN RSA PRIVATE KEY-----") => &[parse_pkcs1_rsa],
_ => &[parse_pkcs8_rsa, parse_pkcs8_ed25519, parse_pkcs1_rsa],
};
for parser in parsers {
if let Some(k) = parser(pem) {
return Ok(k);
}
}
Err(ApHttpPrivateKeyParseError)
}
}
#[derive(Debug, Clone, AsRefStr)] #[derive(Debug, Clone, AsRefStr)]
pub enum ApHttpSigningKey<'a> { pub enum ApHttpSigningKey<'a> {
#[strum(serialize = "rsa-sha256")] #[strum(serialize = "rsa-sha256")]
RsaSha256(Cow<'a, rsa::pkcs1v15::SigningKey<Sha256>>), RsaSha256(Cow<'a, rsa::pkcs1v15::SigningKey<rsa::sha2::Sha256>>),
#[strum(serialize = "rsa-sha512")] #[strum(serialize = "rsa-sha512")]
RsaSha512(Cow<'a, rsa::pkcs1v15::SigningKey<Sha512>>), RsaSha512(Cow<'a, rsa::pkcs1v15::SigningKey<rsa::sha2::Sha512>>),
#[strum(serialize = "ed25519")] #[strum(serialize = "ed25519")]
Ed25519(Cow<'a, ed25519_dalek::SigningKey>), Ed25519(Cow<'a, ed25519_dalek::SigningKey>),
} }
@ -286,7 +192,7 @@ impl ApHttpPrivateKey<'_> {
key: match (self, algorithm) { key: match (self, algorithm) {
(Self::Rsa(key), SigningAlgorithm::RsaSha256 | SigningAlgorithm::Hs2019) => { (Self::Rsa(key), SigningAlgorithm::RsaSha256 | SigningAlgorithm::Hs2019) => {
ApHttpSigningKey::RsaSha256(Cow::Owned(rsa::pkcs1v15::SigningKey::new( ApHttpSigningKey::RsaSha256(Cow::Owned(rsa::pkcs1v15::SigningKey::new(
*key.as_ref().to_owned(), key.clone().into_owned(),
))) )))
} }
(Self::Ed25519(key), SigningAlgorithm::Hs2019) => ApHttpSigningKey::Ed25519( (Self::Ed25519(key), SigningAlgorithm::Hs2019) => ApHttpSigningKey::Ed25519(

View File

@ -154,7 +154,7 @@ pub trait ApClientService: Send + Sync {
&self, &self,
signing_key: ApSigningKey<'_>, signing_key: ApSigningKey<'_>,
signing_algorithm: SigningAlgorithm, signing_algorithm: SigningAlgorithm,
request: &dyn SigningInput, request: impl SigningInput,
) -> Result<ApSignature, Self::Error>; ) -> Result<ApSignature, Self::Error>;
async fn signed_get( async fn signed_get(

View File

@ -1,23 +1,23 @@
use std::future::Future; use std::future::Future;
use chrono::Utc; use chrono::Utc;
use futures_util::StreamExt; use futures_util::{SinkExt, StreamExt};
use redis::IntoConnectionInfo; use redis::IntoConnectionInfo;
pub use sea_orm; pub use sea_orm;
use sea_orm::ActiveValue::Set; use sea_orm::{ActiveValue::Set, ConnectionTrait};
use sea_orm::{ use sea_orm::{
ColumnTrait, ConnectOptions, DatabaseConnection, DbErr, EntityTrait, QueryFilter, ColumnTrait, ConnectOptions, DatabaseConnection, DbErr, EntityTrait, QueryFilter,
TransactionTrait, TransactionTrait,
}; };
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use serde::de::Error;
use serde_json::Value; use serde_json::Value;
use strum::IntoStaticStr; use strum::IntoStaticStr;
use thiserror::Error; use thiserror::Error;
use tokio::select; use tokio::select;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::log::LevelFilter;
use tracing::{error, info, trace, warn}; use tracing::{error, info, trace, warn};
use tracing::log::LevelFilter;
use url::Host; use url::Host;
pub use ck; pub use ck;
@ -122,25 +122,13 @@ impl CalckeyModel {
.await?) .await?)
} }
pub async fn get_user_for_cache_by_id(&self, id: &str) -> Result<Option<(user::Model, user_profile::Model, user_keypair::Model)>, CalckeyDbError> { pub async fn get_user_and_profile_by_id(&self, id: &str) -> Result<Option<(user::Model, user_profile::Model)>, CalckeyDbError> {
let txn = self.0.begin().await?; Ok(user::Entity::find()
let Some((user, Some(profile))) = user::Entity::find()
.filter(user::Column::Id.eq(id)) .filter(user::Column::Id.eq(id))
.find_also_related(user_profile::Entity) .find_also_related(user_profile::Entity)
.one(&txn) .one(&self.0)
.await? else { .await?
return Ok(None); .and_then(|(u, p)| p.map(|pp| (u, pp))))
};
let Some(keys) = user_keypair::Entity::find()
.filter(user_keypair::Column::UserId.eq(id))
.one(&txn)
.await? else {
return Ok(None);
};
Ok(Some((user, profile, keys)))
} }
pub async fn get_user_security_keys_by_id( pub async fn get_user_security_keys_by_id(
@ -177,30 +165,20 @@ impl CalckeyModel {
.await?) .await?)
} }
pub async fn get_user_for_cache_by_token( pub async fn get_user_and_profile_by_token(
&self, &self,
token: &str, token: &str,
) -> Result<Option<(user::Model, user_profile::Model, user_keypair::Model)>, CalckeyDbError> { ) -> Result<Option<(user::Model, user_profile::Model)>, CalckeyDbError> {
let txn = self.0.begin().await?; Ok(user::Entity::find()
.filter(
let Some((user, Some(profile))) = user::Entity::find() user::Column::Token
.filter(user::Column::Token
.eq(token) .eq(token)
.and(user::Column::Host.is_null())) .and(user::Column::Host.is_null()),
)
.find_also_related(user_profile::Entity) .find_also_related(user_profile::Entity)
.one(&txn) .one(&self.0)
.await? else { .await?
return Ok(None); .and_then(|(u, p)| p.map(|pp| (u, pp))))
};
let Some(keys) = user_keypair::Entity::find()
.filter(user_keypair::Column::UserId.eq(&user.id))
.one(&txn)
.await? else {
return Ok(None);
};
Ok(Some((user, profile, keys)))
} }
pub async fn get_user_by_uri(&self, uri: &str) -> Result<Option<user::Model>, CalckeyDbError> { pub async fn get_user_by_uri(&self, uri: &str) -> Result<Option<user::Model>, CalckeyDbError> {

View File

@ -11,7 +11,7 @@ use data::{sub_interaction_reaction, sub_interaction_renote, NoteData};
use ext_model_migration::SelectStatement; use ext_model_migration::SelectStatement;
use magnetar_sdk::types::SpanFilter; use magnetar_sdk::types::SpanFilter;
use sea_orm::sea_query::{Asterisk, Expr, IntoIden, Query, SelectExpr, SimpleExpr}; use sea_orm::sea_query::{Asterisk, Expr, IntoIden, Query, SelectExpr, SimpleExpr};
use sea_orm::{ColumnTrait, Condition, EntityTrait, Iden, JoinType, QueryFilter, QueryOrder, QuerySelect, QueryTrait, Select}; use sea_orm::{ColumnTrait, Condition, EntityTrait, Iden, JoinType, QueryFilter, QueryOrder, QuerySelect, QueryTrait, Select, StatementBuilder};
use std::sync::Arc; use std::sync::Arc;
const PINS: &str = "pins."; const PINS: &str = "pins.";
@ -45,8 +45,7 @@ impl NoteResolveMode {
match self { match self {
NoteResolveMode::Single(id) => Ok(id_col.eq(id)), NoteResolveMode::Single(id) => Ok(id_col.eq(id)),
NoteResolveMode::Multiple(ids) => Ok(id_col.is_in(ids)), NoteResolveMode::Multiple(ids) => Ok(id_col.is_in(ids)),
// We do this in a separate query, because before we used an inner join, and it caused // We add a CTE for pins
// a massive performance penalty
NoteResolveMode::PinsFromUserId(user_id) => { NoteResolveMode::PinsFromUserId(user_id) => {
let cte_query = user_note_pining::Entity::find() let cte_query = user_note_pining::Entity::find()
.column(user_note_pining::Column::NoteId) .column(user_note_pining::Column::NoteId)

View File

@ -1,7 +1,6 @@
use serde::Deserialize; use serde::Deserialize;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::net::{IpAddr, SocketAddr}; use std::net::IpAddr;
use std::path::PathBuf;
use thiserror::Error; use thiserror::Error;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -95,56 +94,6 @@ impl Default for MagnetarNetworking {
} }
} }
#[derive(Deserialize, Debug, Default, Clone)]
#[serde(
rename_all = "snake_case",
tag = "connection_type",
content = "bind_addr"
)]
pub enum MagnetarRpcSocketKind {
#[default]
None,
Unix(PathBuf),
Tcp(SocketAddr),
}
#[derive(Deserialize, Debug)]
#[non_exhaustive]
pub struct MagnetarRpcConfig {
pub connection_settings: MagnetarRpcSocketKind,
}
fn env_rpc_connection() -> MagnetarRpcSocketKind {
match std::env::var("MAG_C_RPC_CONNECTION_TYPE")
.unwrap_or_else(|_| "none".to_owned())
.to_lowercase()
.as_str()
{
"none" => MagnetarRpcSocketKind::None,
"unix" => MagnetarRpcSocketKind::Unix(
std::env::var("MAG_C_RPC_BIND_ADDR")
.unwrap_or_default()
.parse()
.expect("MAG_C_RPC_BIND_ADDR must be a valid path"),
),
"tcp" => MagnetarRpcSocketKind::Tcp(
std::env::var("MAG_C_RPC_BIND_ADDR")
.unwrap_or_default()
.parse()
.expect("MAG_C_RPC_BIND_ADDR must be a valid socket address"),
),
_ => panic!("MAG_C_RPC_CONNECTION_TYPE must be a valid protocol or 'none'"),
}
}
impl Default for MagnetarRpcConfig {
fn default() -> Self {
MagnetarRpcConfig {
connection_settings: env_rpc_connection(),
}
}
}
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
#[non_exhaustive] #[non_exhaustive]
pub struct MagnetarCalckeyFrontendConfig { pub struct MagnetarCalckeyFrontendConfig {
@ -247,8 +196,6 @@ pub struct MagnetarConfig {
#[serde(default)] #[serde(default)]
pub networking: MagnetarNetworking, pub networking: MagnetarNetworking,
#[serde(default)] #[serde(default)]
pub rpc: MagnetarRpcConfig,
#[serde(default)]
pub branding: MagnetarBranding, pub branding: MagnetarBranding,
#[serde(default)] #[serde(default)]
pub calckey_frontend: MagnetarCalckeyFrontendConfig, pub calckey_frontend: MagnetarCalckeyFrontendConfig,

View File

@ -1,21 +0,0 @@
[package]
name = "magnetar_runtime"
version.workspace = true
edition.workspace = true
[lib]
crate-type = ["rlib"]
[dependencies]
magnetar_core = { path = "../core" }
magnetar_sdk = { path = "../magnetar_sdk" }
either = { workspace = true }
futures-channel = { workspace = true }
futures-util = { workspace = true }
futures-core = { workspace = true }
itertools = { workspace = true }
miette = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

View File

@ -1 +0,0 @@

View File

@ -15,7 +15,7 @@ use serde::{Deserialize, Deserializer, Serialize};
use ts_rs::TS; use ts_rs::TS;
pub(crate) mod packed_time { pub(crate) mod packed_time {
use chrono::{DateTime, Utc}; use chrono::{DateTime, NaiveDateTime, Utc};
use serde::de::Error; use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer}; use serde::{Deserialize, Deserializer, Serializer};
@ -30,12 +30,15 @@ pub(crate) mod packed_time {
where where
D: Deserializer<'de>, D: Deserializer<'de>,
{ {
DateTime::<Utc>::from_timestamp_millis( Ok(DateTime::<Utc>::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_millis(
String::deserialize(deserializer)? String::deserialize(deserializer)?
.parse::<i64>() .parse::<i64>()
.map_err(Error::custom)?, .map_err(Error::custom)?,
) )
.ok_or_else(|| Error::custom("millisecond value out of range")) .ok_or_else(|| Error::custom("millisecond value out of range"))?,
Utc,
))
} }
} }

View File

@ -2,10 +2,8 @@ mod api_v1;
pub mod host_meta; pub mod host_meta;
pub mod model; pub mod model;
pub mod nodeinfo; pub mod nodeinfo;
mod rpc_v1;
pub mod service; pub mod service;
pub mod util; pub mod util;
pub mod vars;
pub mod web; pub mod web;
pub mod webfinger; pub mod webfinger;
@ -16,14 +14,8 @@ use crate::service::MagnetarService;
use axum::routing::get; use axum::routing::get;
use axum::Router; use axum::Router;
use dotenvy::dotenv; use dotenvy::dotenv;
use futures::{select, FutureExt};
use magnetar_common::config::{MagnetarConfig, MagnetarRpcSocketKind};
use magnetar_model::{CacheConnectorConfig, CalckeyCache, CalckeyModel, ConnectorConfig}; use magnetar_model::{CacheConnectorConfig, CalckeyCache, CalckeyModel, ConnectorConfig};
use miette::{miette, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use rpc_v1::create_rpc_router;
use rpc_v1::proto::RpcSockAddr;
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -65,44 +57,16 @@ async fn main() -> miette::Result<()> {
}) })
.into_diagnostic()?; .into_diagnostic()?;
let service = Arc::new(MagnetarService::new(config, db.clone(), redis).await?); let service = Arc::new(
MagnetarService::new(config, db.clone(), redis)
.await
.into_diagnostic()?,
);
let shutdown_signal = shutdown_signal().shared();
select! {
rpc_res = run_rpc(service.clone(), config, shutdown_signal.clone()).fuse() => rpc_res,
web_res = run_web(service, config, shutdown_signal).fuse() => web_res
}
}
async fn run_rpc(
service: Arc<MagnetarService>,
config: &'static MagnetarConfig,
shutdown_signal: impl Future<Output = ()> + Send + 'static,
) -> miette::Result<()> {
let rpc_bind_addr = match &config.rpc.connection_settings {
MagnetarRpcSocketKind::None => {
std::future::pending::<Infallible>().await;
unreachable!();
}
MagnetarRpcSocketKind::Unix(path) => RpcSockAddr::Unix(path.clone()),
MagnetarRpcSocketKind::Tcp(ip) => RpcSockAddr::Ip(*ip),
};
let rpc = create_rpc_router();
rpc.run(service, rpc_bind_addr, Some(shutdown_signal)).await
}
async fn run_web(
service: Arc<MagnetarService>,
config: &'static MagnetarConfig,
shutdown_signal: impl Future<Output = ()> + Send + 'static,
) -> miette::Result<()> {
let well_known_router = Router::new() let well_known_router = Router::new()
.route( .route(
"/webfinger", "/webfinger",
get(webfinger::handle_webfinger).with_state((config, service.db.clone())), get(webfinger::handle_webfinger).with_state((config, db)),
) )
.route("/host-meta", get(handle_host_meta)) .route("/host-meta", get(handle_host_meta))
.route("/nodeinfo", get(handle_nodeinfo)); .route("/nodeinfo", get(handle_nodeinfo));
@ -129,7 +93,7 @@ async fn run_web(
let listener = TcpListener::bind(addr).await.into_diagnostic()?; let listener = TcpListener::bind(addr).await.into_diagnostic()?;
info!("Serving..."); info!("Serving...");
axum::serve(listener, app.into_make_service()) axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal) .with_graceful_shutdown(shutdown_signal())
.await .await
.map_err(|e| miette!("Error running server: {}", e)) .map_err(|e| miette!("Error running server: {}", e))
} }
@ -157,5 +121,5 @@ async fn shutdown_signal() {
_ = terminate => {}, _ = terminate => {},
} }
info!("Received a signal to shut down..."); info!("Shutting down...");
} }

View File

@ -5,7 +5,6 @@ use crate::service::instance_meta_cache::InstanceMetaCacheError;
use magnetar_model::sea_orm::DbErr; use magnetar_model::sea_orm::DbErr;
use magnetar_model::CalckeyDbError; use magnetar_model::CalckeyDbError;
use magnetar_sdk::mmm::Token; use magnetar_sdk::mmm::Token;
use miette::Diagnostic;
use thiserror::Error; use thiserror::Error;
pub mod drive; pub mod drive;
@ -14,37 +13,27 @@ pub mod note;
pub mod notification; pub mod notification;
pub mod user; pub mod user;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, strum::IntoStaticStr)]
pub enum PackError { pub enum PackError {
#[error("Database error: {0}")] #[error("Database error: {0}")]
#[diagnostic(code(mag::pack_error::db_error))]
DbError(#[from] DbErr), DbError(#[from] DbErr),
#[error("Calckey database wrapper error: {0}")] #[error("Calckey database wrapper error: {0}")]
#[diagnostic(code(mag::pack_error::db_wrapper_error))]
CalckeyDbError(#[from] CalckeyDbError), CalckeyDbError(#[from] CalckeyDbError),
#[error("Data error: {0}")] #[error("Data error: {0}")]
#[diagnostic(code(mag::pack_error::data_error))]
DataError(String), DataError(String),
#[error("Emoji cache error: {0}")] #[error("Emoji cache error: {0}")]
#[diagnostic(code(mag::pack_error::emoji_cache_error))]
EmojiCacheError(#[from] EmojiCacheError), EmojiCacheError(#[from] EmojiCacheError),
#[error("Instance cache error: {0}")] #[error("Instance cache error: {0}")]
#[diagnostic(code(mag::pack_error::instance_meta_cache_error))]
InstanceMetaCacheError(#[from] InstanceMetaCacheError), InstanceMetaCacheError(#[from] InstanceMetaCacheError),
#[error("Generic cache error: {0}")] #[error("Generic cache error: {0}")]
#[diagnostic(code(mag::pack_error::generic_id_cache_error))]
GenericCacheError(#[from] GenericIdCacheError), GenericCacheError(#[from] GenericIdCacheError),
#[error("Remote instance cache error: {0}")] #[error("Remote instance cache error: {0}")]
#[diagnostic(code(mag::pack_error::remote_instance_cache_error))]
RemoteInstanceCacheError(#[from] RemoteInstanceCacheError), RemoteInstanceCacheError(#[from] RemoteInstanceCacheError),
#[error("Deserializer error: {0}")] #[error("Deserializer error: {0}")]
#[diagnostic(code(mag::pack_error::deserializer_error))]
DeserializerError(#[from] serde_json::Error), DeserializerError(#[from] serde_json::Error),
#[error("URL parse error: {0}")] #[error("URL parse error: {0}")]
#[diagnostic(code(mag::pack_error::url_parse_error))]
UrlParseError(#[from] url::ParseError), UrlParseError(#[from] url::ParseError),
#[error("Parallel processing error: {0}")] #[error("Parallel processing error: {0}")]
#[diagnostic(code(mag::pack_error::task_join_error))]
JoinError(#[from] tokio::task::JoinError), JoinError(#[from] tokio::task::JoinError),
} }

View File

@ -7,7 +7,7 @@ use crate::model::{PackType, PackingContext};
use compact_str::CompactString; use compact_str::CompactString;
use either::Either; use either::Either;
use futures_util::future::try_join_all; use futures_util::future::try_join_all;
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{FutureExt, StreamExt, TryStreamExt};
use magnetar_model::ck::sea_orm_active_enums::NoteVisibilityEnum; use magnetar_model::ck::sea_orm_active_enums::NoteVisibilityEnum;
use magnetar_model::model_ext::AliasColumnExt; use magnetar_model::model_ext::AliasColumnExt;
use magnetar_model::note_model::data::{ use magnetar_model::note_model::data::{

View File

@ -1,98 +0,0 @@
use std::sync::Arc;
use magnetar_federation::crypto::SigningAlgorithm;
use proto::{MagRpc, RpcMessage};
use serde::Deserialize;
use tracing::{debug, info};
use crate::{
model::{processing::note::NoteModel, PackingContext},
service::MagnetarService,
web::{ApiError, ObjectNotFound},
};
pub mod proto;
#[derive(Debug, Deserialize)]
struct RpcApGet {
user_id: String,
url: String,
}
#[derive(Debug, Deserialize)]
struct RpcApPost {
user_id: String,
url: String,
body: serde_json::Value,
}
pub fn create_rpc_router() -> MagRpc {
MagRpc::new()
.handle(
"/ping",
|_, RpcMessage(message): RpcMessage<String>| async move {
debug!("Received RPC ping: {}", message);
RpcMessage("pong".to_owned())
},
)
.handle(
"/note/by-id",
|service, RpcMessage(id): RpcMessage<String>| async move {
let ctx = PackingContext::new(service, None).await?;
let note = NoteModel {
attachments: true,
with_context: true,
}
.fetch_single(&ctx, &id)
.await?
.ok_or(ObjectNotFound(id))?;
Result::<_, ApiError>::Ok(note)
},
)
.handle(
"/ap/get",
|service: Arc<MagnetarService>,
RpcMessage(RpcApGet { user_id, url }): RpcMessage<RpcApGet>| async move {
let Some(user) = service.local_user_cache.get_by_id(&user_id).await? else {
return Err(ObjectNotFound(format!("LocalUserID:{user_id}")).into());
};
let key_id = format!(
"https://{}/users/{}#main-key",
service.config.networking.host, user_id
);
let signing_key = user
.private_key
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
let result = service
.ap_client
.signed_get(signing_key, SigningAlgorithm::RsaSha256, None, &url)
.await?;
Result::<_, ApiError>::Ok(result)
},
)
.handle(
"/ap/post",
|service: Arc<MagnetarService>,
RpcMessage(RpcApPost { user_id, url, body }): RpcMessage<RpcApPost>| async move {
let Some(user) = service.local_user_cache.get_by_id(&user_id).await? else {
return Err(ObjectNotFound(format!("LocalUserID:{user_id}")).into());
};
let key_id = format!(
"https://{}/users/{}#main-key",
service.config.networking.host, user_id
);
let signing_key = user
.private_key
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
let result = service
.ap_client
.signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body)
.await?;
Result::<_, ApiError>::Ok(result)
},
)
}

View File

@ -1,470 +0,0 @@
use crate::service::MagnetarService;
use bytes::BufMut;
use futures::{FutureExt, Stream, StreamExt};
use miette::{miette, IntoDiagnostic};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf};
use tokio::net::{TcpListener, UnixSocket};
use tokio::select;
use tokio::task::JoinSet;
use tracing::{debug, error, info, Instrument};
#[derive(Debug, Clone)]
pub enum RpcSockAddr {
Ip(SocketAddr),
Unix(PathBuf),
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[repr(transparent)]
pub struct RpcMessage<T>(pub T);
pub trait IntoRpcResponse: Send {
fn into_rpc_response(self) -> Option<RpcResponse>;
}
#[derive(Debug, Clone)]
pub struct RpcResponse(Vec<u8>);
impl<T: Serialize + Send + 'static> IntoRpcResponse for RpcMessage<T> {
fn into_rpc_response(self) -> Option<RpcResponse> {
rmp_serde::to_vec_named(&self)
.inspect_err(|e| {
error!(
"Failed to serialize value of type {}: {}",
std::any::type_name::<T>(),
e
)
})
.ok()
.map(RpcResponse)
}
}
#[derive(Debug, Serialize)]
pub struct RpcResult<T> {
success: bool,
data: T,
}
impl<T: Serialize + Send + 'static, E: Serialize + Send + 'static> IntoRpcResponse
for Result<T, E>
{
fn into_rpc_response(self) -> Option<RpcResponse> {
match self {
Ok(data) => RpcMessage(RpcResult {
success: true,
data,
})
.into_rpc_response(),
Err(data) => RpcMessage(RpcResult {
success: false,
data,
})
.into_rpc_response(),
}
}
}
pub trait RpcHandler<T>: Send + Sync + 'static
where
T: Send + 'static,
{
fn process(
&self,
context: Arc<MagnetarService>,
message: RpcMessage<T>,
) -> impl Future<Output = Option<RpcResponse>> + Send;
}
impl<T, F, Fut, RR> RpcHandler<T> for F
where
T: Send + 'static,
F: Fn(Arc<MagnetarService>, RpcMessage<T>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = RR> + Send,
RR: IntoRpcResponse,
{
async fn process(
&self,
context: Arc<MagnetarService>,
message: RpcMessage<T>,
) -> Option<RpcResponse> {
self(context, message).await.into_rpc_response()
}
}
type MessageRaw = Box<dyn Any + Send + 'static>;
type MagRpcHandlerMapped = dyn Fn(
Arc<MagnetarService>,
MessageRaw,
) -> Pin<Box<dyn Future<Output = Option<RpcResponse>> + Send + 'static>>
+ Send
+ Sync
+ 'static;
type MagRpcDecoderMapped =
dyn (Fn(&'_ [u8]) -> Result<MessageRaw, rmp_serde::decode::Error>) + Send + Sync + 'static;
pub struct MagRpc {
listeners: HashMap<String, Arc<MagRpcHandlerMapped>>,
payload_decoders: HashMap<String, Box<MagRpcDecoderMapped>>,
}
impl MagRpc {
pub fn new() -> Self {
MagRpc {
listeners: HashMap::new(),
payload_decoders: HashMap::new(),
}
}
pub fn handle<H, T>(mut self, method: impl Into<String>, handler: H) -> Self
where
T: DeserializeOwned + Send + 'static,
H: RpcHandler<T> + Sync + 'static,
{
let handler_ref = Arc::new(handler);
let method = method.into();
self.listeners.insert(
method.clone(),
Arc::new(move |ctx, data| {
let handler = handler_ref.clone();
async move {
handler
.process(ctx, RpcMessage(*data.downcast().unwrap()))
.await
}
.boxed()
}),
);
self.payload_decoders.insert(
method,
Box::new(move |data| Ok(Box::new(rmp_serde::from_slice::<'_, T>(data)?))),
);
self
}
pub async fn run(
self,
context: Arc<MagnetarService>,
addr: RpcSockAddr,
graceful_shutdown: Option<impl Future<Output = ()>>,
) -> miette::Result<()> {
match addr {
RpcSockAddr::Ip(sock_addr) => {
self.run_tcp(context, &sock_addr, graceful_shutdown).await
}
RpcSockAddr::Unix(path) => self.run_unix(context, &path, graceful_shutdown).await,
}
}
async fn run_tcp(
self,
context: Arc<MagnetarService>,
sock_addr: &SocketAddr,
graceful_shutdown: Option<impl Future<Output = ()>>,
) -> miette::Result<()> {
debug!("Binding RPC TCP socket to {}", sock_addr);
let listener = TcpListener::bind(sock_addr).await.into_diagnostic()?;
info!("Listening for RPC calls on {}", sock_addr);
let (sender, mut cancel) = tokio::sync::oneshot::channel::<()>();
let mut cancellation_tokens = Vec::new();
let rx_dec = RpcCallDecoder {
listeners: Arc::new(self.listeners),
payload_decoders: Arc::new(self.payload_decoders),
};
let mut connections = JoinSet::<miette::Result<_>>::new();
loop {
let (stream, remote_addr) = select!(
Some(c) = connections.join_next() => {
debug!("RPC TCP connection closed: {:?}", c);
continue;
},
conn = listener.accept() => {
if let Err(e) = conn {
error!("Connection error: {}", e);
break
}
conn.unwrap()
},
_ = &mut cancel => break
);
debug!("RPC TCP connection accepted: {:?}", remote_addr);
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
let (read_half, mut write_half) = stream.into_split();
let buf_read = BufReader::new(read_half);
let context = context.clone();
let rx_dec = rx_dec.clone();
let fut = async move {
let src = rx_dec
.stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move {
if let Err(e) = &r {
error!("Stream decoding error: {e}");
}
r.ok()
})
.filter_map(|(serial, payload, listener)| {
let ctx = context.clone();
async move { Some((serial, listener(ctx, payload).await?)) }
});
futures::pin_mut!(src);
while let Some((serial, RpcResponse(bytes))) = src.next().await {
write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?;
write_half
.write_u32(bytes.len() as u32)
.await
.into_diagnostic()?;
write_half.write_all(&bytes).await.into_diagnostic()?;
write_half.flush().await.into_diagnostic()?;
}
Ok(remote_addr)
}
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
connections.spawn(fut);
cancellation_tokens.push(cancel_send);
}
if let Some(graceful_shutdown) = graceful_shutdown {
graceful_shutdown.await;
sender.send(()).ok();
}
info!("Awaiting shutdown of all RPC connections...");
connections.join_all().await;
Ok(())
}
async fn run_unix(
self,
context: Arc<MagnetarService>,
addr: &Path,
graceful_shutdown: Option<impl Future<Output = ()>>,
) -> miette::Result<()> {
let sock = UnixSocket::new_stream().into_diagnostic()?;
debug!("Binding RPC Unix socket to {}", addr.display());
sock.bind(addr).into_diagnostic()?;
let listener = sock.listen(16).into_diagnostic()?;
let (sender, mut cancel) = tokio::sync::oneshot::channel::<()>();
let mut cancellation_tokens = Vec::new();
let rx_dec = RpcCallDecoder {
listeners: Arc::new(self.listeners),
payload_decoders: Arc::new(self.payload_decoders),
};
let mut connections = JoinSet::<miette::Result<_>>::new();
loop {
let (stream, remote_addr) = select!(
Some(c) = connections.join_next() => {
debug!("RPC Unix connection closed: {:?}", c);
continue;
},
conn = listener.accept() => {
if let Err(e) = conn {
error!("Connection error: {}", e);
break
}
conn.unwrap()
},
_ = &mut cancel => break
);
debug!("RPC Unix connection accepted: {:?}", remote_addr);
let (cancel_send, cancel_recv) = tokio::sync::oneshot::channel::<()>();
let (read_half, mut write_half) = stream.into_split();
let buf_read = BufReader::new(read_half);
let context = context.clone();
let rx_dec = rx_dec.clone();
let fut = async move {
let src = rx_dec
.stream_decode(buf_read, cancel_recv)
.filter_map(|r| async move {
if let Err(e) = &r {
error!("Stream decoding error: {e}");
}
r.ok()
})
.filter_map(|(serial, payload, listener)| {
let ctx = context.clone();
async move { Some((serial, listener(ctx, payload).await?)) }
});
futures::pin_mut!(src);
while let Some((serial, RpcResponse(bytes))) = src.next().await {
write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?;
write_half
.write_u32(bytes.len() as u32)
.await
.into_diagnostic()?;
write_half.write_all(&bytes).await.into_diagnostic()?;
write_half.flush().await.into_diagnostic()?;
}
miette::Result::<()>::Ok(())
}
.instrument(tracing::info_span!("RPC", remote_addr = ?remote_addr));
connections.spawn(fut.boxed());
cancellation_tokens.push(cancel_send);
}
if let Some(graceful_shutdown) = graceful_shutdown {
graceful_shutdown.await;
sender.send(()).ok();
}
info!("Awaiting shutdown of all RPC connections...");
connections.join_all().await;
Ok(())
}
}
#[derive(Clone)]
struct RpcCallDecoder {
listeners: Arc<HashMap<String, Arc<MagRpcHandlerMapped>>>,
payload_decoders: Arc<HashMap<String, Box<MagRpcDecoderMapped>>>,
}
impl RpcCallDecoder {
fn stream_decode<R: AsyncRead + AsyncReadExt + Unpin + Send + 'static>(
&self,
mut buf_read: BufReader<R>,
mut cancel: tokio::sync::oneshot::Receiver<()>,
) -> impl Stream<Item = miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static
{
let decoders = self.payload_decoders.clone();
let listeners = self.listeners.clone();
async_stream::try_stream! {
let mut name_buf = Vec::new();
let mut buf = Vec::new();
let mut messages = 0usize;
loop {
let read_fut = async {
let mut header = [0u8; 1];
if buf_read.read(&mut header).await.into_diagnostic()? == 0 {
return if messages > 0 {
Ok(None)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Unexpected end of stream, expected a header"
)).into_diagnostic()
}
}
if !matches!(header, [b'M']) {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Unexpected data in stream, expected a header"
)).into_diagnostic();
}
let serial = buf_read.read_u64().await.into_diagnostic()?;
let name_len = buf_read.read_u32().await.into_diagnostic()? as usize;
if name_len > name_buf.capacity() {
name_buf.reserve(name_len - name_buf.capacity());
}
// SAFETY: We use ReadBuf which expects uninit anyway
unsafe {
name_buf.set_len(name_len);
}
let mut name_buf_write = ReadBuf::uninit(&mut name_buf);
while name_buf_write.has_remaining_mut() {
buf_read
.read_buf(&mut name_buf_write)
.await
.into_diagnostic()?;
}
let payload_len = buf_read.read_u32().await.into_diagnostic()? as usize;
if payload_len > buf.capacity() {
buf.reserve(payload_len - buf.capacity());
}
// SAFETY: We use ReadBuf which expects uninit anyway
unsafe {
buf.set_len(payload_len);
}
let mut buf_write = ReadBuf::uninit(&mut buf);
while buf_write.has_remaining_mut() {
buf_read.read_buf(&mut buf_write).await.into_diagnostic()?;
}
miette::Result::<_>::Ok(Some((serial, name_buf_write, buf_write)))
};
let Some((serial, name_buf_write, payload)) = select! {
read_result = read_fut => read_result,
_ = &mut cancel => { break; }
}? else {
break;
};
let name = std::str::from_utf8(name_buf_write.filled()).into_diagnostic()?;
let decoder = decoders
.get(name)
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
.as_ref();
let listener = listeners
.get(name)
.ok_or_else(|| miette!("No such RPC call name: {}", name))?
.clone();
let packet = match decoder(payload.filled()) {
Ok(p) => p,
Err(e) => {
error!("Failed to parse packet: {e}");
continue;
}
};
yield (serial, packet, listener);
messages += 1;
}
}
}
}

View File

@ -2,23 +2,27 @@ use crate::web::ApiError;
use lru::LruCache; use lru::LruCache;
use magnetar_model::emoji::{EmojiResolver, EmojiTag}; use magnetar_model::emoji::{EmojiResolver, EmojiTag};
use magnetar_model::{ck, CalckeyDbError, CalckeyModel}; use magnetar_model::{ck, CalckeyDbError, CalckeyModel};
use miette::Diagnostic;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use strum::VariantNames;
use thiserror::Error; use thiserror::Error;
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, VariantNames)]
#[error("Emoji cache error: {}")]
pub enum EmojiCacheError { pub enum EmojiCacheError {
#[error("Database error: {0}")] #[error("Database error: {0}")]
#[diagnostic(code(mag::emoji_cache_error::db_error))]
DbError(#[from] CalckeyDbError), DbError(#[from] CalckeyDbError),
} }
impl From<EmojiCacheError> for ApiError { impl From<EmojiCacheError> for ApiError {
fn from(err: EmojiCacheError) -> Self { fn from(err: EmojiCacheError) -> Self {
Self::internal("Cache error", err) let mut api_error: ApiError = match err {
EmojiCacheError::DbError(err) => err.into(),
};
api_error.message = format!("Emoji cache error: {}", api_error.message);
api_error
} }
} }

View File

@ -1,32 +0,0 @@
use std::str::FromStr;
use headers::UserAgent;
use magnetar_common::config::MagnetarConfig;
use magnetar_federation::{
ap_client::{ApClientError, ApClientServiceDefaultProvider},
client::federation_client::FederationClient,
ApClientService,
};
use miette::IntoDiagnostic;
pub(super) fn new_federation_client_service(
config: &'static MagnetarConfig,
) -> miette::Result<FederationClient> {
FederationClient::new(
true,
256000,
20,
UserAgent::from_str(&format!(
"magnetar/{} (https://{})",
config.branding.version, config.networking.host
))
.into_diagnostic()?,
)
.into_diagnostic()
}
pub(super) fn new_ap_client_service(
federation_client: impl AsRef<FederationClient> + Send + Sync + 'static,
) -> impl ApClientService<Error = ApClientError> {
ApClientServiceDefaultProvider::new(federation_client)
}

View File

@ -1,29 +0,0 @@
use std::{sync::Arc, time::SystemTime};
use super::MagnetarService;
pub struct GenIdService;
impl GenIdService {
pub fn new_id(&self) -> ulid::Ulid {
ulid::Ulid::new()
}
pub fn new_id_str(&self) -> String {
self.new_id().to_string()
}
pub fn new_for_time(&self, time: impl Into<SystemTime>) -> ulid::Ulid {
ulid::Ulid::from_datetime(time.into())
}
pub fn new_str_for_time(&self, time: impl Into<SystemTime>) -> String {
self.new_for_time(time).to_string()
}
}
impl AsRef<GenIdService> for Arc<MagnetarService> {
fn as_ref(&self) -> &GenIdService {
&self.gen_id
}
}

View File

@ -2,16 +2,14 @@ use crate::web::ApiError;
use lru::LruCache; use lru::LruCache;
use magnetar_model::sea_orm::{EntityTrait, PrimaryKeyTrait}; use magnetar_model::sea_orm::{EntityTrait, PrimaryKeyTrait};
use magnetar_model::{CalckeyDbError, CalckeyModel}; use magnetar_model::{CalckeyDbError, CalckeyModel};
use miette::Diagnostic;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use strum::VariantNames;
use thiserror::Error; use thiserror::Error;
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, VariantNames)]
#[error("Generic ID cache error: {}")]
#[diagnostic(code(mag::generic_id_cache_error))]
pub enum GenericIdCacheError { pub enum GenericIdCacheError {
#[error("Database error: {0}")] #[error("Database error: {0}")]
DbError(#[from] CalckeyDbError), DbError(#[from] CalckeyDbError),
@ -19,7 +17,13 @@ pub enum GenericIdCacheError {
impl From<GenericIdCacheError> for ApiError { impl From<GenericIdCacheError> for ApiError {
fn from(err: GenericIdCacheError) -> Self { fn from(err: GenericIdCacheError) -> Self {
Self::internal("Cache error", err) let mut api_error: ApiError = match err {
GenericIdCacheError::DbError(err) => err.into(),
};
api_error.message = format!("Generic ID cache error: {}", api_error.message);
api_error
} }
} }

View File

@ -2,15 +2,13 @@ use crate::web::ApiError;
use lru::LruCache; use lru::LruCache;
use magnetar_common::config::MagnetarConfig; use magnetar_common::config::MagnetarConfig;
use magnetar_model::{ck, CalckeyDbError, CalckeyModel}; use magnetar_model::{ck, CalckeyDbError, CalckeyModel};
use miette::Diagnostic;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use strum::VariantNames;
use thiserror::Error; use thiserror::Error;
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, VariantNames)]
#[error("Remote instance cache error: {}")]
#[diagnostic(code(mag::remote_instance_cache_error))]
pub enum RemoteInstanceCacheError { pub enum RemoteInstanceCacheError {
#[error("Database error: {0}")] #[error("Database error: {0}")]
DbError(#[from] CalckeyDbError), DbError(#[from] CalckeyDbError),
@ -18,7 +16,13 @@ pub enum RemoteInstanceCacheError {
impl From<RemoteInstanceCacheError> for ApiError { impl From<RemoteInstanceCacheError> for ApiError {
fn from(err: RemoteInstanceCacheError) -> Self { fn from(err: RemoteInstanceCacheError) -> Self {
Self::internal("Cache error", err) let mut api_error: ApiError = match err {
RemoteInstanceCacheError::DbError(err) => err.into(),
};
api_error.message = format!("Remote instance cache error: {}", api_error.message);
api_error
} }
} }

View File

@ -1,15 +1,13 @@
use crate::web::ApiError; use crate::web::ApiError;
use magnetar_model::{ck, CalckeyDbError, CalckeyModel}; use magnetar_model::{ck, CalckeyDbError, CalckeyModel};
use miette::Diagnostic;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use strum::VariantNames;
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use tracing::error; use tracing::error;
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, VariantNames)]
#[error("Instance meta cache error: {}")]
#[diagnostic(code(mag::instance_meta_cache_error))]
pub enum InstanceMetaCacheError { pub enum InstanceMetaCacheError {
#[error("Database error: {0}")] #[error("Database error: {0}")]
DbError(#[from] CalckeyDbError), DbError(#[from] CalckeyDbError),
@ -19,7 +17,14 @@ pub enum InstanceMetaCacheError {
impl From<InstanceMetaCacheError> for ApiError { impl From<InstanceMetaCacheError> for ApiError {
fn from(err: InstanceMetaCacheError) -> Self { fn from(err: InstanceMetaCacheError) -> Self {
Self::internal("Cache error", err) let mut api_error: ApiError = match err {
InstanceMetaCacheError::DbError(err) => err.into(),
InstanceMetaCacheError::ChannelClosed => err.into(),
};
api_error.message = format!("Instance meta cache error: {}", api_error.message);
api_error
} }
} }

View File

@ -2,58 +2,52 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use cached::{Cached, TimedCache}; use cached::{Cached, TimedCache};
use miette::Diagnostic; use strum::VariantNames;
use thiserror::Error; use thiserror::Error;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::error; use tracing::error;
use crate::web::ApiError;
use magnetar_common::config::MagnetarConfig; use magnetar_common::config::MagnetarConfig;
use magnetar_federation::crypto::{ApHttpPrivateKey, ApHttpPrivateKeyParseError, ApHttpPublicKey, ApHttpPublicKeyParseError};
use magnetar_model::{ use magnetar_model::{
ck, CalckeyCache, CalckeyCacheError, CalckeyDbError, CalckeyModel, CalckeySub, ck, CalckeyCache, CalckeyCacheError, CalckeyDbError, CalckeyModel, CalckeySub,
InternalStreamMessage, SubMessage, InternalStreamMessage, SubMessage,
}; };
#[derive(Debug, Error, Diagnostic)] use crate::web::ApiError;
#[error("Local user cache error: {}")]
#[diagnostic(code(mag::local_user_cache_error))] #[derive(Debug, Error, VariantNames)]
pub enum UserCacheError { pub enum UserCacheError {
#[error("Database error: {0}")] #[error("Database error: {0}")]
DbError(#[from] CalckeyDbError), DbError(#[from] CalckeyDbError),
#[error("Redis error: {0}")] #[error("Redis error: {0}")]
RedisError(#[from] CalckeyCacheError), RedisError(#[from] CalckeyCacheError),
#[error("Private key parse error: {0}")]
PrivateKeyParseError(#[from] ApHttpPrivateKeyParseError),
#[error("Public key parse error: {0}")]
PublicKeyParseError(#[from] ApHttpPublicKeyParseError),
} }
impl From<UserCacheError> for ApiError {
fn from(err: UserCacheError) -> Self {
Self::internal("Cache error", err)
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct CachedLocalUser { pub struct CachedLocalUser {
pub user: Arc<ck::user::Model>, pub user: Arc<ck::user::Model>,
pub profile: Arc<ck::user_profile::Model>, pub profile: Arc<ck::user_profile::Model>,
pub private_key: Arc<ApHttpPrivateKey<'static>>,
pub public_key: Arc<ApHttpPublicKey<'static>>,
} }
impl TryFrom<(ck::user::Model, ck::user_profile::Model, ck::user_keypair::Model)> for CachedLocalUser { impl From<(ck::user::Model, ck::user_profile::Model)> for CachedLocalUser {
type Error = UserCacheError; fn from((user, profile): (ck::user::Model, ck::user_profile::Model)) -> Self {
CachedLocalUser {
fn try_from((user, profile, key_pair): (ck::user::Model, ck::user_profile::Model, ck::user_keypair::Model)) -> Result<Self, Self::Error> {
Ok(CachedLocalUser {
user: Arc::new(user), user: Arc::new(user),
profile: Arc::new(profile), profile: Arc::new(profile),
private_key: Arc::new(key_pair.private_key.parse()?), }
public_key: Arc::new(key_pair.public_key.parse()?), }
}) }
impl From<UserCacheError> for ApiError {
fn from(err: UserCacheError) -> Self {
let mut api_error: ApiError = match err {
UserCacheError::DbError(err) => err.into(),
UserCacheError::RedisError(err) => err.into(),
};
api_error.message = format!("Local user cache error: {}", api_error.message);
api_error
} }
} }
@ -162,7 +156,7 @@ impl LocalUserCacheService {
| InternalStreamMessage::UserChangeSuspendedState { id, .. } | InternalStreamMessage::UserChangeSuspendedState { id, .. }
| InternalStreamMessage::RemoteUserUpdated { id } | InternalStreamMessage::RemoteUserUpdated { id }
| InternalStreamMessage::UserTokenRegenerated { id, .. } => { | InternalStreamMessage::UserTokenRegenerated { id, .. } => {
let user_profile = match db.get_user_for_cache_by_id(&id).await { let user_profile = match db.get_user_and_profile_by_id(&id).await {
Ok(Some(m)) => m, Ok(Some(m)) => m,
Ok(None) => return, Ok(None) => return,
Err(e) => { Err(e) => {
@ -171,15 +165,7 @@ impl LocalUserCacheService {
} }
}; };
let cached: CachedLocalUser = match user_profile.try_into() { cache.lock().await.refresh(&CachedLocalUser::from(user_profile));
Ok(c) => c,
Err(e) => {
error!("Error parsing user from database: {}", e);
return;
}
};
cache.lock().await.refresh(&cached);
} }
_ => {} _ => {}
}; };
@ -216,7 +202,7 @@ impl LocalUserCacheService {
return Ok(Some(user)); return Ok(Some(user));
} }
self.map_cache_user(self.db.get_user_for_cache_by_token(token).await?.map(CachedLocalUser::try_from).transpose()?) self.map_cache_user(self.db.get_user_and_profile_by_token(token).await?.map(CachedLocalUser::from))
.await .await
} }
@ -230,6 +216,6 @@ impl LocalUserCacheService {
return Ok(Some(user)); return Ok(Some(user));
} }
self.map_cache_user(self.db.get_user_for_cache_by_id(id).await?.map(CachedLocalUser::try_from).transpose()?).await self.map_cache_user(self.db.get_user_and_profile_by_id(id).await?.map(CachedLocalUser::from)).await
} }
} }

View File

@ -1,27 +1,19 @@
use federation_client::{new_ap_client_service, new_federation_client_service};
use gen_id::GenIdService;
use magnetar_common::config::MagnetarConfig; use magnetar_common::config::MagnetarConfig;
use magnetar_federation::ap_client::ApClientError;
use magnetar_federation::client::federation_client::FederationClient;
use magnetar_federation::ApClientService;
use magnetar_model::{ck, CalckeyCache, CalckeyModel}; use magnetar_model::{ck, CalckeyCache, CalckeyModel};
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::time::Duration; use std::time::Duration;
use thiserror::Error;
pub mod emoji_cache; pub mod emoji_cache;
pub mod federation_client;
pub mod gen_id;
pub mod generic_id_cache; pub mod generic_id_cache;
pub mod instance_cache; pub mod instance_cache;
pub mod instance_meta_cache; pub mod instance_meta_cache;
pub mod local_user_cache; pub mod local_user_cache;
pub type ApClient = dyn ApClientService<Error = ApClientError> + Send + Sync;
#[non_exhaustive] #[non_exhaustive]
pub struct MagnetarService { pub struct MagnetarService {
pub db: CalckeyModel, pub db: CalckeyModel,
pub gen_id: GenIdService,
pub cache: CalckeyCache, pub cache: CalckeyCache,
pub config: &'static MagnetarConfig, pub config: &'static MagnetarConfig,
pub local_user_cache: local_user_cache::LocalUserCacheService, pub local_user_cache: local_user_cache::LocalUserCacheService,
@ -29,8 +21,6 @@ pub struct MagnetarService {
pub remote_instance_cache: instance_cache::RemoteInstanceCacheService, pub remote_instance_cache: instance_cache::RemoteInstanceCacheService,
pub emoji_cache: emoji_cache::EmojiCacheService, pub emoji_cache: emoji_cache::EmojiCacheService,
pub drive_file_cache: generic_id_cache::GenericIdCacheService<ck::drive_file::Entity>, pub drive_file_cache: generic_id_cache::GenericIdCacheService<ck::drive_file::Entity>,
pub federation_client: FederationClient,
pub ap_client: Box<ApClient>,
} }
impl Debug for MagnetarService { impl Debug for MagnetarService {
@ -43,12 +33,18 @@ impl Debug for MagnetarService {
} }
} }
#[derive(Debug, Error)]
pub enum ServiceInitError {
#[error("Authentication cache initialization error: {0}")]
AuthCacheError(#[from] local_user_cache::UserCacheError),
}
impl MagnetarService { impl MagnetarService {
pub async fn new( pub async fn new(
config: &'static MagnetarConfig, config: &'static MagnetarConfig,
db: CalckeyModel, db: CalckeyModel,
cache: CalckeyCache, cache: CalckeyCache,
) -> miette::Result<Self> { ) -> Result<Self, ServiceInitError> {
let local_user_cache = let local_user_cache =
local_user_cache::LocalUserCacheService::new(config, db.clone(), cache.clone()).await?; local_user_cache::LocalUserCacheService::new(config, db.clone(), cache.clone()).await?;
let instance_meta_cache = instance_meta_cache::InstanceMetaCacheService::new(db.clone()); let instance_meta_cache = instance_meta_cache::InstanceMetaCacheService::new(db.clone());
@ -62,9 +58,6 @@ impl MagnetarService {
let drive_file_cache = let drive_file_cache =
generic_id_cache::GenericIdCacheService::new(db.clone(), 128, Duration::from_secs(10)); generic_id_cache::GenericIdCacheService::new(db.clone(), 128, Duration::from_secs(10));
let federation_client = new_federation_client_service(config)?;
let ap_client = Box::new(new_ap_client_service(Box::new(federation_client.clone())));
Ok(Self { Ok(Self {
db, db,
cache, cache,
@ -74,16 +67,6 @@ impl MagnetarService {
remote_instance_cache, remote_instance_cache,
emoji_cache, emoji_cache,
drive_file_cache, drive_file_cache,
gen_id: GenIdService,
federation_client,
ap_client,
}) })
} }
pub fn service<T>(&self) -> &T
where
Self: AsRef<T>,
{
self.as_ref()
}
} }

View File

@ -1,18 +0,0 @@
//! Dynamic configuration variables that would be very messy to configure from the environment
//!
//! Most of these values were arbitrarily chosen, so your mileage may vary when adjusting these.
//!
//! Larger instances may benefit from higher cache sizes and increased parallelism, while smaller
//! ones may want to opt for memory savings.
use miette::IntoDiagnostic;
#[derive(Debug)]
pub struct MagVars {}
fn parse_vars() -> miette::Result<MagVars> {
let default_cfg = std::fs::read_to_string("config/default-vars.kdl").into_diagnostic()?;
let doc = default_cfg.parse::<kdl::KdlDocument>().into_diagnostic()?;
Ok(MagVars {})
}

View File

@ -2,23 +2,23 @@ use std::convert::Infallible;
use std::sync::Arc; use std::sync::Arc;
use axum::async_trait; use axum::async_trait;
use axum::extract::rejection::ExtensionRejection;
use axum::extract::{FromRequestParts, Request, State}; use axum::extract::{FromRequestParts, Request, State};
use axum::http::request::Parts; use axum::extract::rejection::ExtensionRejection;
use axum::http::{HeaderMap, StatusCode}; use axum::http::{HeaderMap, StatusCode};
use axum::http::request::Parts;
use axum::middleware::Next; use axum::middleware::Next;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use headers::authorization::Bearer;
use headers::{Authorization, HeaderMapExt}; use headers::{Authorization, HeaderMapExt};
use miette::{miette, Diagnostic}; use headers::authorization::Bearer;
use strum::IntoStaticStr;
use thiserror::Error; use thiserror::Error;
use tracing::error; use tracing::error;
use magnetar_model::{ck, CalckeyDbError}; use magnetar_model::{CalckeyDbError, ck};
use crate::service::local_user_cache::{CachedLocalUser, UserCacheError}; use crate::service::local_user_cache::{CachedLocalUser, UserCacheError};
use crate::service::MagnetarService; use crate::service::MagnetarService;
use crate::web::ApiError; use crate::web::{ApiError, IntoErrorCode};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum AuthMode { pub enum AuthMode {
@ -45,13 +45,15 @@ pub struct AuthUserRejection(ApiError);
impl From<ExtensionRejection> for AuthUserRejection { impl From<ExtensionRejection> for AuthUserRejection {
fn from(rejection: ExtensionRejection) -> Self { fn from(rejection: ExtensionRejection) -> Self {
AuthUserRejection( AuthUserRejection(ApiError {
ApiError::new( status: StatusCode::UNAUTHORIZED,
StatusCode::UNAUTHORIZED, code: "Unauthorized".error_code(),
"Unauthorized", message: if cfg!(debug_assertions) {
miette!(code = "mag::auth_user_rejection", "Missing auth extension: {}", rejection), format!("Missing auth extension: {}", rejection)
) } else {
) "Unauthorized".to_string()
},
})
} }
} }
@ -87,46 +89,76 @@ pub struct AuthState {
service: Arc<MagnetarService>, service: Arc<MagnetarService>,
} }
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, IntoStaticStr)]
#[error("Auth error: {}")]
enum AuthError { enum AuthError {
#[error("Unsupported authorization scheme")] #[error("Unsupported authorization scheme")]
#[diagnostic(code(mag::auth_error::unsupported_scheme))]
UnsupportedScheme, UnsupportedScheme,
#[error("Cache error: {0}")] #[error("Cache error: {0}")]
#[diagnostic(code(mag::auth_error::cache_error))]
CacheError(#[from] UserCacheError), CacheError(#[from] UserCacheError),
#[error("Database error: {0}")] #[error("Database error: {0}")]
#[diagnostic(code(mag::auth_error::db_error))]
DbError(#[from] CalckeyDbError), DbError(#[from] CalckeyDbError),
#[error("Invalid token")] #[error("Invalid token")]
#[diagnostic(code(mag::auth_error::invalid_token))]
InvalidToken, InvalidToken,
#[error("Invalid token referencing user \"{user}\"")] #[error("Invalid token \"{token}\" referencing user \"{user}\"")]
#[diagnostic(code(mag::auth_error::invalid_token_user))] InvalidTokenUser { token: String, user: String },
InvalidTokenUser { user: String }, #[error("Invalid access token \"{access_token}\" referencing app \"{app}\"")]
#[error("Invalid access token referencing app \"{app}\"")] InvalidAccessTokenApp { access_token: String, app: String },
#[diagnostic(code(mag::auth_error::invalid_token_app))]
InvalidAccessTokenApp { app: String },
} }
impl From<AuthError> for ApiError { impl From<AuthError> for ApiError {
fn from(err: AuthError) -> Self { fn from(err: AuthError) -> Self {
let code = match err { match err {
AuthError::InvalidToken => StatusCode::UNAUTHORIZED, AuthError::UnsupportedScheme => ApiError {
_ => StatusCode::INTERNAL_SERVER_ERROR status: StatusCode::UNAUTHORIZED,
}; code: err.error_code(),
message: "Unsupported authorization scheme".to_string(),
},
AuthError::CacheError(err) => err.into(),
AuthError::DbError(err) => err.into(),
AuthError::InvalidTokenUser {
ref token,
ref user,
} => {
error!("Invalid token \"{}\" referencing user \"{}\"", token, user);
let message = match err { ApiError {
AuthError::UnsupportedScheme => "Unsupported authorization scheme", status: StatusCode::INTERNAL_SERVER_ERROR,
AuthError::InvalidTokenUser { .. } => "Invalid token and user combination", code: err.error_code(),
AuthError::InvalidAccessTokenApp { .. } => "Invalid token and app combination", message: if cfg!(debug_assertions) {
AuthError::CacheError(_) => "Cache error", format!("Invalid token \"{}\" referencing user \"{}\"", token, user)
AuthError::DbError(_) => "Database error", } else {
AuthError::InvalidToken => "Invalid account token" "Invalid token-user link".to_string()
}; },
}
}
AuthError::InvalidAccessTokenApp {
ref access_token,
ref app,
} => {
error!(
"Invalid access token \"{}\" referencing app \"{}\"",
access_token, app
);
ApiError::new(code, message, miette!(err)) ApiError {
status: StatusCode::INTERNAL_SERVER_ERROR,
code: err.error_code(),
message: if cfg!(debug_assertions) {
format!(
"Invalid access token \"{}\" referencing app \"{}\"",
access_token, app
)
} else {
"Invalid access token-app link".to_string()
},
}
}
AuthError::InvalidToken => ApiError {
status: StatusCode::UNAUTHORIZED,
code: err.error_code(),
message: "Invalid token".to_string(),
},
}
} }
} }
@ -171,6 +203,7 @@ impl AuthState {
if user.is_none() { if user.is_none() {
return Err(AuthError::InvalidTokenUser { return Err(AuthError::InvalidTokenUser {
token: access_token.id,
user: access_token.user_id, user: access_token.user_id,
}); });
} }
@ -187,6 +220,7 @@ impl AuthState {
}), }),
}), }),
None => Err(AuthError::InvalidAccessTokenApp { None => Err(AuthError::InvalidAccessTokenApp {
access_token: access_token.id,
app: access_token.user_id, app: access_token.user_id,
}), }),
}; };

View File

@ -1,10 +1,9 @@
use axum::{http::HeaderValue, response::IntoResponse}; use axum::{http::HeaderValue, response::IntoResponse};
use hyper::header; use hyper::{header, StatusCode};
use magnetar_core::web_model::{content_type::ContentXrdXml, ContentType}; use magnetar_core::web_model::{content_type::ContentXrdXml, ContentType};
use miette::miette;
use serde::Serialize; use serde::Serialize;
use crate::web::ApiError; use crate::web::{ApiError, ErrorCode};
pub struct XrdXmlExt<T>(pub T); pub struct XrdXmlExt<T>(pub T);
@ -21,10 +20,15 @@ impl<T: Serialize> IntoResponse for XrdXmlExt<T> {
buf.into_bytes(), buf.into_bytes(),
) )
.into_response(), .into_response(),
Err(e) => ApiError::internal( Err(e) => ApiError {
"XmlSerializationError", status: StatusCode::INTERNAL_SERVER_ERROR,
miette!(code = "mag::xrd_xml_ext_error", "XML serialization error: {}", e), code: ErrorCode("XmlSerializationError".into()),
) message: if cfg!(debug_assertions) {
format!("Serialization error: {}", e)
} else {
"Serialization error".to_string()
},
}
.into_response(), .into_response(),
} }
} }

View File

@ -3,97 +3,75 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use magnetar_common::util::FediverseTagParseError; use magnetar_common::util::FediverseTagParseError;
use magnetar_federation::ap_client::ApClientError;
use magnetar_federation::crypto::{
ApHttpPrivateKeyParseError, ApHttpPublicKeyParseError, ApSigningError,
};
use magnetar_model::{CalckeyCacheError, CalckeyDbError}; use magnetar_model::{CalckeyCacheError, CalckeyDbError};
use miette::{diagnostic, miette, Diagnostic, Report};
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::borrow::Cow; use std::fmt::{Display, Formatter};
use std::fmt::Display;
use thiserror::Error; use thiserror::Error;
use tracing::warn;
use ulid::Ulid;
pub mod auth; pub mod auth;
pub mod extractors; pub mod extractors;
pub mod pagination; pub mod pagination;
#[derive(Debug, Clone, Serialize)]
#[repr(transparent)]
pub struct ErrorCode(pub String);
// This janky hack allows us to use `.error_code()` on enums with strum::IntoStaticStr
pub trait IntoErrorCode {
fn error_code<'a, 'b: 'a>(&'a self) -> ErrorCode
where
&'a Self: Into<&'b str>;
}
impl<T: ?Sized> IntoErrorCode for T {
fn error_code<'a, 'b: 'a>(&'a self) -> ErrorCode
where
&'a Self: Into<&'b str>,
{
ErrorCode(<&Self as Into<&'b str>>::into(self).to_string())
}
}
impl ErrorCode {
pub fn join(&self, other: &str) -> Self {
Self(format!("{}:{}", other, self.0))
}
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
#[error("API Error")]
pub struct ApiError { pub struct ApiError {
pub status: StatusCode, pub status: StatusCode,
pub nonce: Ulid, pub code: ErrorCode,
pub message: Cow<'static, str>, pub message: String,
pub cause: miette::Report,
} }
#[derive(Debug, Serialize)] impl Display for ApiError {
pub struct ApiErrorBare<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
pub status: u16, write!(
pub nonce: &'a str, f,
pub message: &'a str, "ApiError[status = \"{}\", code = \"{:?}\"]: \"{}\"",
} self.status, self.code, self.message
)
impl Serialize for ApiError {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
ApiErrorBare {
status: self.status.as_u16(),
nonce: &self.nonce.to_string(),
message: &self.message,
}
.serialize(serializer)
} }
} }
impl ApiError { #[derive(Debug)]
pub fn new( pub struct AccessForbidden(pub String);
status: StatusCode,
message: impl Into<Cow<'static, str>>,
cause: impl Into<Report>,
) -> Self {
Self {
status,
nonce: Ulid::new(),
message: message.into(),
cause: cause.into(),
}
}
pub fn internal(message: impl Into<Cow<'static, str>>, cause: impl Into<Report>) -> Self { impl From<&AccessForbidden> for &str {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, message, cause) fn from(_: &AccessForbidden) -> &'static str {
"AccessForbidden"
} }
} }
impl IntoResponse for ApiError { impl IntoResponse for ApiError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let mut buf = [0; ulid::ULID_LEN];
let nonce = self.nonce.array_to_str(&mut buf);
warn!(
"[status={},nonce={}] {}",
self.status.as_str(),
nonce,
self.cause
);
let code = self
.cause
.code()
.as_deref()
.map(<dyn Display as ToString>::to_string);
( (
self.status, self.status,
Json(json!({ Json(json!({
"status": self.status.as_u16(), "status": self.status.as_u16(),
"code": code, "code": self.code,
"nonce": nonce,
"message": self.message, "message": self.message,
})), })),
) )
@ -101,106 +79,114 @@ impl IntoResponse for ApiError {
} }
} }
#[derive(Debug, Error, Diagnostic)]
#[error("Access forbidden: {0}")]
#[diagnostic(code(mag::access_forbidden))]
pub struct AccessForbidden(pub String);
impl From<AccessForbidden> for ApiError { impl From<AccessForbidden> for ApiError {
fn from(err: AccessForbidden) -> Self { fn from(err: AccessForbidden) -> Self {
Self::new(StatusCode::FORBIDDEN, "Access forbidden", err) Self {
status: StatusCode::FORBIDDEN,
code: err.error_code(),
message: if cfg!(debug_assertions) {
format!("Forbidden: {}", err.0)
} else {
"Forbidden".to_string()
},
}
} }
} }
impl From<FediverseTagParseError> for ApiError { impl From<FediverseTagParseError> for ApiError {
fn from(err: FediverseTagParseError) -> Self { fn from(err: FediverseTagParseError) -> Self {
Self::new( Self {
StatusCode::BAD_REQUEST, status: StatusCode::BAD_REQUEST,
"Fediverse tag parse error", code: err.error_code(),
miette!(code = "mag::access_forbidden", "{}", err), message: if cfg!(debug_assertions) {
) format!("Fediverse tag parse error: {}", err)
} else {
"Fediverse tag parse error".to_string()
},
}
} }
} }
impl From<CalckeyDbError> for ApiError { impl From<CalckeyDbError> for ApiError {
fn from(err: CalckeyDbError) -> Self { fn from(err: CalckeyDbError) -> Self {
Self::internal("Database error", miette!(code = "mag::db_error", "{}", err)) Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
code: err.error_code(),
message: if cfg!(debug_assertions) {
format!("Database error: {}", err)
} else {
"Database error".to_string()
},
}
} }
} }
impl From<CalckeyCacheError> for ApiError { impl From<CalckeyCacheError> for ApiError {
fn from(err: CalckeyCacheError) -> Self { fn from(err: CalckeyCacheError) -> Self {
Self::internal("Cache error", miette!(code = "mag::cache_error", "{}", err)) Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
code: err.error_code(),
message: if cfg!(debug_assertions) {
format!("Cache error: {}", err)
} else {
"Cache error".to_string()
},
}
} }
} }
impl From<PackError> for ApiError { impl From<PackError> for ApiError {
fn from(err: PackError) -> Self { fn from(err: PackError) -> Self {
Self::internal( Self {
"Data transformation error", status: StatusCode::INTERNAL_SERVER_ERROR,
miette!(code = "mag::pack_error", "{}", err), code: err.error_code(),
) message: if cfg!(debug_assertions) {
format!("Data transformation error: {}", err)
} else {
"Data transformation error".to_string()
},
}
} }
} }
#[derive(Debug, Error, Diagnostic)] #[derive(Debug)]
#[error("Object not found: {0}")]
#[diagnostic(code(mag::object_not_found))]
pub struct ObjectNotFound(pub String); pub struct ObjectNotFound(pub String);
impl From<&ObjectNotFound> for &str {
fn from(_: &ObjectNotFound) -> Self {
"ObjectNotFound"
}
}
impl From<ObjectNotFound> for ApiError { impl From<ObjectNotFound> for ApiError {
fn from(err: ObjectNotFound) -> Self { fn from(err: ObjectNotFound) -> Self {
Self::new(StatusCode::NOT_FOUND, "Object not found", miette!(err)) Self {
status: StatusCode::NOT_FOUND,
code: err.error_code(),
message: if cfg!(debug_assertions) {
format!("Object not found: {}", err.0)
} else {
"Object not found".to_string()
},
}
} }
} }
#[derive(Debug, Error, Diagnostic)] #[derive(Debug)]
#[error("Argument out of range: {0}")]
#[diagnostic(code(mag::argument_out_of_range))]
pub struct ArgumentOutOfRange(pub String); pub struct ArgumentOutOfRange(pub String);
impl From<&ArgumentOutOfRange> for &str {
fn from(_: &ArgumentOutOfRange) -> Self {
"ArgumentOutOfRange"
}
}
impl From<ArgumentOutOfRange> for ApiError { impl From<ArgumentOutOfRange> for ApiError {
fn from(err: ArgumentOutOfRange) -> Self { fn from(err: ArgumentOutOfRange) -> Self {
Self::new( Self {
StatusCode::BAD_REQUEST, status: StatusCode::BAD_REQUEST,
format!("Argument out of range: {}", err.0), code: err.error_code(),
err, message: format!("Argument out of range: {}", err.0),
) }
}
}
impl From<ApHttpPublicKeyParseError> for ApiError {
fn from(err: ApHttpPublicKeyParseError) -> Self {
Self::internal(
"User public key parse error",
miette!(code = "mag::ap_http_public_key_parse_error", "{}", err),
)
}
}
impl From<ApHttpPrivateKeyParseError> for ApiError {
fn from(err: ApHttpPrivateKeyParseError) -> Self {
Self::internal(
"User private key parse error",
miette!(code = "mag::ap_http_private_key_parse_error", "{}", err),
)
}
}
impl From<ApSigningError> for ApiError {
fn from(err: ApSigningError) -> Self {
Self::internal(
"ActivityPub HTTP signing error",
miette!(code = "mag::ap_signing_error", "{}", err),
)
}
}
impl From<ApClientError> for ApiError {
fn from(err: ApClientError) -> Self {
Self::internal(
"ActivityPub client error",
miette!(code = "mag::ap_client_error", "{}", err),
)
} }
} }

View File

@ -1,6 +1,6 @@
use crate::service::MagnetarService; use crate::service::MagnetarService;
use crate::util::serialize_as_urlenc; use crate::util::serialize_as_urlenc;
use crate::web::ApiError; use crate::web::{ApiError, IntoErrorCode};
use axum::extract::rejection::QueryRejection; use axum::extract::rejection::QueryRejection;
use axum::extract::{FromRequestParts, OriginalUri, Query}; use axum::extract::{FromRequestParts, OriginalUri, Query};
use axum::http::header::InvalidHeaderValue; use axum::http::header::InvalidHeaderValue;
@ -14,10 +14,10 @@ use magnetar_core::web_model::rel::{RelNext, RelPrev};
use magnetar_model::sea_orm::prelude::async_trait::async_trait; use magnetar_model::sea_orm::prelude::async_trait::async_trait;
use magnetar_sdk::types::{PaginationShape, SpanFilter}; use magnetar_sdk::types::{PaginationShape, SpanFilter};
use magnetar_sdk::util_types::U64Range; use magnetar_sdk::util_types::U64Range;
use miette::{miette, Diagnostic};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use strum::IntoStaticStr;
use thiserror::Error; use thiserror::Error;
use tracing::error; use tracing::error;
@ -39,32 +39,32 @@ struct PaginationQuery {
query_rest: HashMap<String, String>, query_rest: HashMap<String, String>,
} }
#[derive(Debug, Error, Diagnostic)] #[derive(Debug, Error, IntoStaticStr)]
#[error("Pagination builder error: {}")]
pub enum PaginationBuilderError { pub enum PaginationBuilderError {
#[error("Query rejection: {0}")] #[error("Query rejection: {0}")]
#[diagnostic(code(mag::pagination_builder_error::query_rejection))]
QueryRejection(#[from] QueryRejection), QueryRejection(#[from] QueryRejection),
#[error("HTTP error: {0}")] #[error("HTTP error: {0}")]
#[diagnostic(code(mag::pagination_builder_error::http_error))]
HttpError(#[from] axum::http::Error), HttpError(#[from] axum::http::Error),
#[error("Value of out of range error")]
OutOfRange,
#[error("Invalid header value")] #[error("Invalid header value")]
#[diagnostic(code(mag::pagination_builder_error::invalid_header_value))]
InvalidHeaderValue(#[from] InvalidHeaderValue), InvalidHeaderValue(#[from] InvalidHeaderValue),
#[error("Query string serialization error: {0}")] #[error("Query string serialization error: {0}")]
#[diagnostic(code(mag::pagination_builder_error::serialization_error_query))]
SerializationErrorQuery(#[from] serde_urlencoded::ser::Error), SerializationErrorQuery(#[from] serde_urlencoded::ser::Error),
#[error("Query string serialization error: {0}")] #[error("Query string serialization error: {0}")]
#[diagnostic(code(mag::pagination_builder_error::serialization_error_json))]
SerializationErrorJson(#[from] serde_json::Error), SerializationErrorJson(#[from] serde_json::Error),
} }
impl From<PaginationBuilderError> for ApiError { impl From<PaginationBuilderError> for ApiError {
fn from(err: PaginationBuilderError) -> Self { fn from(err: PaginationBuilderError) -> Self {
if matches!(err, PaginationBuilderError::QueryRejection(_)) { Self {
Self::new(StatusCode::BAD_REQUEST, "Invalid pagination query", miette!(err)) status: StatusCode::INTERNAL_SERVER_ERROR,
code: err.error_code(),
message: if cfg!(debug_assertions) {
format!("Pagination builder error: {}", err)
} else { } else {
Self::internal("Pagination error", miette!(err)) "Pagination builder error".to_string()
},
} }
} }
} }