Implemented AP fetching via RPC
ci/woodpecker/push/ociImagePush Pipeline failed Details

This commit is contained in:
Natty 2024-11-15 20:39:13 +01:00
parent 80c5bf8ae6
commit 5fb85e0db6
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
13 changed files with 374 additions and 78 deletions

47
Cargo.lock generated
View File

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 4
[[package]] [[package]]
name = "Inflector" name = "Inflector"
@ -1695,6 +1695,17 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "kdl"
version = "4.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "062c875482ccb676fd40c804a40e3824d4464c18c364547456d1c8e8e951ae47"
dependencies = [
"miette 5.10.0",
"nom",
"thiserror",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.5.0" version = "1.5.0"
@ -1783,6 +1794,7 @@ dependencies = [
"headers", "headers",
"hyper", "hyper",
"itertools", "itertools",
"kdl",
"lru", "lru",
"magnetar_common", "magnetar_common",
"magnetar_core", "magnetar_core",
@ -1793,7 +1805,7 @@ dependencies = [
"magnetar_runtime", "magnetar_runtime",
"magnetar_sdk", "magnetar_sdk",
"magnetar_webfinger", "magnetar_webfinger",
"miette", "miette 7.2.0",
"percent-encoding", "percent-encoding",
"quick-xml", "quick-xml",
"rmp-serde", "rmp-serde",
@ -1839,7 +1851,7 @@ dependencies = [
"headers", "headers",
"hyper", "hyper",
"magnetar_common", "magnetar_common",
"miette", "miette 7.2.0",
"percent-encoding", "percent-encoding",
"serde", "serde",
"serde_json", "serde_json",
@ -1897,7 +1909,7 @@ dependencies = [
"magnetar_core", "magnetar_core",
"magnetar_host_meta", "magnetar_host_meta",
"magnetar_webfinger", "magnetar_webfinger",
"miette", "miette 7.2.0",
"percent-encoding", "percent-encoding",
"quick-xml", "quick-xml",
"reqwest", "reqwest",
@ -1982,7 +1994,7 @@ dependencies = [
"itertools", "itertools",
"magnetar_core", "magnetar_core",
"magnetar_sdk", "magnetar_sdk",
"miette", "miette 7.2.0",
"thiserror", "thiserror",
"tracing", "tracing",
] ]
@ -2051,6 +2063,18 @@ version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miette"
version = "5.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e"
dependencies = [
"miette-derive 5.10.0",
"once_cell",
"thiserror",
"unicode-width",
]
[[package]] [[package]]
name = "miette" name = "miette"
version = "7.2.0" version = "7.2.0"
@ -2060,7 +2084,7 @@ dependencies = [
"backtrace", "backtrace",
"backtrace-ext", "backtrace-ext",
"cfg-if", "cfg-if",
"miette-derive", "miette-derive 7.2.0",
"owo-colors", "owo-colors",
"supports-color", "supports-color",
"supports-hyperlinks", "supports-hyperlinks",
@ -2071,6 +2095,17 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "miette-derive"
version = "5.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
]
[[package]] [[package]]
name = "miette-derive" name = "miette-derive"
version = "7.2.0" version = "7.2.0"

View File

@ -51,6 +51,7 @@ hyper = "1.1"
idna = "1" idna = "1"
indexmap = "2.2" indexmap = "2.2"
itertools = "0.13" itertools = "0.13"
kdl = "4"
lru = "0.12" lru = "0.12"
miette = "7" miette = "7"
nom = "7" nom = "7"
@ -133,6 +134,7 @@ thiserror = { workspace = true }
percent-encoding = { workspace = true } percent-encoding = { workspace = true }
kdl = { workspace = true }
rmp-serde = { workspace = true } rmp-serde = { workspace = true }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true } serde_json = { workspace = true }

1
config/.gitignore vendored
View File

@ -1,3 +1,4 @@
* *
!.gitignore !.gitignore
!default.toml !default.toml
!default-vars.kdl

34
config/default-vars.kdl Normal file
View File

@ -0,0 +1,34 @@
cache {
local-user-cache {
// Size is unlimited
lifetime 300min
}
emoji-cache {
size 4096
}
remote-instance-cache {
size 256
lifetime 100s
}
drive-file-cache {
size 128
lifetime 10s
}
}
api-model {
note {
buffer 10
}
notification {
buffer 10
}
}
activity-pub {
user-agent "magnetar/$version ($host)"
}

View File

@ -13,12 +13,19 @@ use magnetar_core::web_model::content_type::ContentActivityStreams;
use crate::{ use crate::{
client::federation_client::{FederationClient, FederationClientError}, client::federation_client::{FederationClient, FederationClientError},
crypto::{ApSigningError, ApSigningKey, SigningAlgorithm}, crypto::{ApSigningError, ApSigningKey, SigningAlgorithm},
ApClientService, ApSignature, ApSigningField, ApClientService, ApSignature, ApSigningField, ApSigningHeaders, SigningInput, SigningParts,
ApSigningHeaders, SigningInput, SigningParts,
}; };
pub struct ApClientServiceDefaultProvider { pub struct ApClientServiceDefaultProvider {
client: Arc<FederationClient>, client: Arc<dyn AsRef<FederationClient> + Send + Sync>,
}
impl ApClientServiceDefaultProvider {
pub fn new(client: impl AsRef<FederationClient> + Send + Sync + 'static) -> Self {
Self {
client: Arc::new(client),
}
}
} }
impl Display for ApSignature { impl Display for ApSignature {
@ -238,7 +245,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
&self, &self,
signing_key: ApSigningKey<'_>, signing_key: ApSigningKey<'_>,
signing_algorithm: SigningAlgorithm, signing_algorithm: SigningAlgorithm,
request: impl SigningInput, request: &dyn SigningInput,
) -> Result<ApSignature, Self::Error> { ) -> Result<ApSignature, Self::Error> {
let components = request.create_signing_input(); let components = request.create_signing_input();
@ -278,7 +285,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::RsaSha256 => self.sign_request( SigningAlgorithm::RsaSha256 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
SigningInputGetRsaSha256 { &SigningInputGetRsaSha256 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::GET, method: Method::GET,
@ -291,7 +298,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::Hs2019 => self.sign_request( SigningAlgorithm::Hs2019 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
SigningInputGetHs2019 { &SigningInputGetHs2019 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::GET, method: Method::GET,
@ -319,6 +326,8 @@ impl ApClientService for ApClientServiceDefaultProvider {
Ok(self Ok(self
.client .client
.as_ref()
.as_ref()
.get(url) .get(url)
.accept(ContentActivityStreams) .accept(ContentActivityStreams)
.headers(headers) .headers(headers)
@ -346,7 +355,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::RsaSha256 => self.sign_request( SigningAlgorithm::RsaSha256 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
SigningInputPostRsaSha256 { &SigningInputPostRsaSha256 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::POST, method: Method::POST,
@ -360,7 +369,7 @@ impl ApClientService for ApClientServiceDefaultProvider {
SigningAlgorithm::Hs2019 => self.sign_request( SigningAlgorithm::Hs2019 => self.sign_request(
signing_key, signing_key,
signing_algorithm, signing_algorithm,
SigningInputPostHs2019 { &SigningInputPostHs2019 {
request_target: RequestTarget { request_target: RequestTarget {
url: &url, url: &url,
method: Method::POST, method: Method::POST,
@ -395,6 +404,8 @@ impl ApClientService for ApClientServiceDefaultProvider {
Ok(self Ok(self
.client .client
.as_ref()
.as_ref()
.builder(Method::POST, url) .builder(Method::POST, url)
.accept(ContentActivityStreams) .accept(ContentActivityStreams)
.content_type(ContentActivityStreams) .content_type(ContentActivityStreams)
@ -428,7 +439,7 @@ mod test {
let rsa_key = rsa::RsaPrivateKey::from_pkcs8_pem(key.trim()).into_diagnostic()?; let rsa_key = rsa::RsaPrivateKey::from_pkcs8_pem(key.trim()).into_diagnostic()?;
let ap_client = ApClientServiceDefaultProvider { let ap_client = ApClientServiceDefaultProvider {
client: Arc::new( client: Arc::new(Box::new(
FederationClient::new( FederationClient::new(
true, true,
128_000, 128_000,
@ -436,7 +447,7 @@ mod test {
UserAgent::from_static("magnetar/0.42 (https://astolfo.social)"), UserAgent::from_static("magnetar/0.42 (https://astolfo.social)"),
) )
.into_diagnostic()?, .into_diagnostic()?,
), )),
}; };
let val = ap_client let val = ap_client

View File

@ -154,7 +154,7 @@ pub trait ApClientService: Send + Sync {
&self, &self,
signing_key: ApSigningKey<'_>, signing_key: ApSigningKey<'_>,
signing_algorithm: SigningAlgorithm, signing_algorithm: SigningAlgorithm,
request: impl SigningInput, request: &dyn SigningInput,
) -> Result<ApSignature, Self::Error>; ) -> Result<ApSignature, Self::Error>;
async fn signed_get( async fn signed_get(

View File

@ -5,6 +5,7 @@ pub mod nodeinfo;
mod rpc_v1; mod rpc_v1;
pub mod service; pub mod service;
pub mod util; pub mod util;
pub mod vars;
pub mod web; pub mod web;
pub mod webfinger; pub mod webfinger;
@ -19,7 +20,8 @@ use futures::{select, FutureExt};
use magnetar_common::config::{MagnetarConfig, MagnetarRpcSocketKind}; use magnetar_common::config::{MagnetarConfig, MagnetarRpcSocketKind};
use magnetar_model::{CacheConnectorConfig, CalckeyCache, CalckeyModel, ConnectorConfig}; use magnetar_model::{CacheConnectorConfig, CalckeyCache, CalckeyModel, ConnectorConfig};
use miette::{miette, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use rpc_v1::proto::{MagRpc, RpcMessage, RpcSockAddr}; use rpc_v1::create_rpc_router;
use rpc_v1::proto::RpcSockAddr;
use std::convert::Infallible; use std::convert::Infallible;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -28,8 +30,8 @@ use tokio::net::TcpListener;
use tokio::signal; use tokio::signal;
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use tracing::info;
use tracing::log::error; use tracing::log::error;
use tracing::{debug, info};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[tokio::main] #[tokio::main]
@ -63,11 +65,7 @@ async fn main() -> miette::Result<()> {
}) })
.into_diagnostic()?; .into_diagnostic()?;
let service = Arc::new( let service = Arc::new(MagnetarService::new(config, db.clone(), redis).await?);
MagnetarService::new(config, db.clone(), redis)
.await
.into_diagnostic()?,
);
let shutdown_signal = shutdown_signal().shared(); let shutdown_signal = shutdown_signal().shared();
@ -91,13 +89,7 @@ async fn run_rpc(
MagnetarRpcSocketKind::Tcp(ip) => RpcSockAddr::Ip(*ip), MagnetarRpcSocketKind::Tcp(ip) => RpcSockAddr::Ip(*ip),
}; };
let rpc = MagRpc::new().handle( let rpc = create_rpc_router();
"/ping",
|_, RpcMessage(message): RpcMessage<String>| async move {
debug!("Received RPC ping: {}", message);
RpcMessage("pong".to_owned())
},
);
rpc.run(service, rpc_bind_addr, Some(shutdown_signal)).await rpc.run(service, rpc_bind_addr, Some(shutdown_signal)).await
} }

View File

@ -1 +1,98 @@
use std::sync::Arc;
use magnetar_federation::crypto::SigningAlgorithm;
use proto::{MagRpc, RpcMessage};
use serde::Deserialize;
use tracing::{debug, info};
use crate::{
model::{processing::note::NoteModel, PackingContext},
service::MagnetarService,
web::{ApiError, ObjectNotFound},
};
pub mod proto; pub mod proto;
#[derive(Debug, Deserialize)]
struct RpcApGet {
user_id: String,
url: String,
}
#[derive(Debug, Deserialize)]
struct RpcApPost {
user_id: String,
url: String,
body: serde_json::Value,
}
pub fn create_rpc_router() -> MagRpc {
MagRpc::new()
.handle(
"/ping",
|_, RpcMessage(message): RpcMessage<String>| async move {
debug!("Received RPC ping: {}", message);
RpcMessage("pong".to_owned())
},
)
.handle(
"/note/by-id",
|service, RpcMessage(id): RpcMessage<String>| async move {
let ctx = PackingContext::new(service, None).await?;
let note = NoteModel {
attachments: true,
with_context: true,
}
.fetch_single(&ctx, &id)
.await?
.ok_or(ObjectNotFound(id))?;
Result::<_, ApiError>::Ok(note)
},
)
.handle(
"/ap/get",
|service: Arc<MagnetarService>,
RpcMessage(RpcApGet { user_id, url }): RpcMessage<RpcApGet>| async move {
let Some(user) = service.local_user_cache.get_by_id(&user_id).await? else {
return Err(ObjectNotFound(format!("LocalUserID:{user_id}")).into());
};
let key_id = format!(
"https://{}/users/{}#main-key",
service.config.networking.host, user_id
);
let signing_key = user
.private_key
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
let result = service
.ap_client
.signed_get(signing_key, SigningAlgorithm::RsaSha256, None, &url)
.await?;
Result::<_, ApiError>::Ok(result)
},
)
.handle(
"/ap/post",
|service: Arc<MagnetarService>,
RpcMessage(RpcApPost { user_id, url, body }): RpcMessage<RpcApPost>| async move {
let Some(user) = service.local_user_cache.get_by_id(&user_id).await? else {
return Err(ObjectNotFound(format!("LocalUserID:{user_id}")).into());
};
let key_id = format!(
"https://{}/users/{}#main-key",
service.config.networking.host, user_id
);
let signing_key = user
.private_key
.create_signing_key(&key_id, SigningAlgorithm::RsaSha256)?;
let result = service
.ap_client
.signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body)
.await?;
Result::<_, ApiError>::Ok(result)
},
)
}

View File

@ -1,7 +1,6 @@
use crate::service::MagnetarService; use crate::service::MagnetarService;
use bytes::BufMut; use bytes::BufMut;
use futures::{sink, FutureExt, Stream, StreamExt}; use futures::{FutureExt, Stream, StreamExt};
use futures_util::SinkExt;
use miette::{miette, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -37,7 +36,7 @@ pub struct RpcResponse(Vec<u8>);
impl<T: Serialize + Send + 'static> IntoRpcResponse for RpcMessage<T> { impl<T: Serialize + Send + 'static> IntoRpcResponse for RpcMessage<T> {
fn into_rpc_response(self) -> Option<RpcResponse> { fn into_rpc_response(self) -> Option<RpcResponse> {
rmp_serde::to_vec(&self) rmp_serde::to_vec_named(&self)
.inspect_err(|e| { .inspect_err(|e| {
error!( error!(
"Failed to serialize value of type {}: {}", "Failed to serialize value of type {}: {}",
@ -223,14 +222,16 @@ impl MagRpc {
r.ok() r.ok()
}) })
.filter_map(|(payload, listener)| { .filter_map(|(serial, payload, listener)| {
let ctx = context.clone(); let ctx = context.clone();
async move { listener(ctx, payload).await } async move { Some((serial, listener(ctx, payload).await?)) }
}); });
futures::pin_mut!(src); futures::pin_mut!(src);
while let Some(RpcResponse(bytes)) = src.next().await { while let Some((serial, RpcResponse(bytes))) = src.next().await {
write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?;
write_half write_half
.write_u32(bytes.len() as u32) .write_u32(bytes.len() as u32)
.await .await
@ -315,14 +316,16 @@ impl MagRpc {
r.ok() r.ok()
}) })
.filter_map(|(payload, listener)| { .filter_map(|(serial, payload, listener)| {
let ctx = context.clone(); let ctx = context.clone();
async move { listener(ctx, payload).await } async move { Some((serial, listener(ctx, payload).await?)) }
}); });
futures::pin_mut!(src); futures::pin_mut!(src);
while let Some(RpcResponse(bytes)) = src.next().await { while let Some((serial, RpcResponse(bytes))) = src.next().await {
write_half.write_u8(b'M').await.into_diagnostic()?;
write_half.write_u64(serial).await.into_diagnostic()?;
write_half write_half
.write_u32(bytes.len() as u32) .write_u32(bytes.len() as u32)
.await .await
@ -364,7 +367,7 @@ impl RpcCallDecoder {
&self, &self,
mut buf_read: BufReader<R>, mut buf_read: BufReader<R>,
mut cancel: tokio::sync::oneshot::Receiver<()>, mut cancel: tokio::sync::oneshot::Receiver<()>,
) -> impl Stream<Item = miette::Result<(MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static ) -> impl Stream<Item = miette::Result<(u64, MessageRaw, Arc<MagRpcHandlerMapped>)>> + Send + 'static
{ {
let decoders = self.payload_decoders.clone(); let decoders = self.payload_decoders.clone();
let listeners = self.listeners.clone(); let listeners = self.listeners.clone();
@ -395,6 +398,8 @@ impl RpcCallDecoder {
)).into_diagnostic(); )).into_diagnostic();
} }
let serial = buf_read.read_u64().await.into_diagnostic()?;
let name_len = buf_read.read_u32().await.into_diagnostic()? as usize; let name_len = buf_read.read_u32().await.into_diagnostic()? as usize;
if name_len > name_buf.capacity() { if name_len > name_buf.capacity() {
name_buf.reserve(name_len - name_buf.capacity()); name_buf.reserve(name_len - name_buf.capacity());
@ -428,10 +433,10 @@ impl RpcCallDecoder {
buf_read.read_buf(&mut buf_write).await.into_diagnostic()?; buf_read.read_buf(&mut buf_write).await.into_diagnostic()?;
} }
miette::Result::<_>::Ok(Some((name_buf_write, buf_write))) miette::Result::<_>::Ok(Some((serial, name_buf_write, buf_write)))
}; };
let Some((name_buf_write, payload)) = select! { let Some((serial, name_buf_write, payload)) = select! {
read_result = read_fut => read_result, read_result = read_fut => read_result,
_ = &mut cancel => { break; } _ = &mut cancel => { break; }
}? else { }? else {
@ -457,7 +462,7 @@ impl RpcCallDecoder {
} }
}; };
yield (packet, listener); yield (serial, packet, listener);
messages += 1; messages += 1;
} }
} }

View File

@ -0,0 +1,32 @@
use std::str::FromStr;
use headers::UserAgent;
use magnetar_common::config::MagnetarConfig;
use magnetar_federation::{
ap_client::{ApClientError, ApClientServiceDefaultProvider},
client::federation_client::FederationClient,
ApClientService,
};
use miette::IntoDiagnostic;
pub(super) fn new_federation_client_service(
config: &'static MagnetarConfig,
) -> miette::Result<FederationClient> {
FederationClient::new(
true,
256000,
20,
UserAgent::from_str(&format!(
"magnetar/{} (https://{})",
config.branding.version, config.networking.host
))
.into_diagnostic()?,
)
.into_diagnostic()
}
pub(super) fn new_ap_client_service(
federation_client: impl AsRef<FederationClient> + Send + Sync + 'static,
) -> impl ApClientService<Error = ApClientError> {
ApClientServiceDefaultProvider::new(federation_client)
}

View File

@ -1,17 +1,23 @@
use federation_client::{new_ap_client_service, new_federation_client_service};
use gen_id::GenIdService; use gen_id::GenIdService;
use magnetar_common::config::MagnetarConfig; use magnetar_common::config::MagnetarConfig;
use magnetar_federation::ap_client::ApClientError;
use magnetar_federation::client::federation_client::FederationClient;
use magnetar_federation::ApClientService;
use magnetar_model::{ck, CalckeyCache, CalckeyModel}; use magnetar_model::{ck, CalckeyCache, CalckeyModel};
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::time::Duration; use std::time::Duration;
use thiserror::Error;
pub mod emoji_cache; pub mod emoji_cache;
pub mod federation_client;
pub mod gen_id; pub mod gen_id;
pub mod generic_id_cache; pub mod generic_id_cache;
pub mod instance_cache; pub mod instance_cache;
pub mod instance_meta_cache; pub mod instance_meta_cache;
pub mod local_user_cache; pub mod local_user_cache;
pub type ApClient = dyn ApClientService<Error = ApClientError> + Send + Sync;
#[non_exhaustive] #[non_exhaustive]
pub struct MagnetarService { pub struct MagnetarService {
pub db: CalckeyModel, pub db: CalckeyModel,
@ -23,6 +29,8 @@ pub struct MagnetarService {
pub remote_instance_cache: instance_cache::RemoteInstanceCacheService, pub remote_instance_cache: instance_cache::RemoteInstanceCacheService,
pub emoji_cache: emoji_cache::EmojiCacheService, pub emoji_cache: emoji_cache::EmojiCacheService,
pub drive_file_cache: generic_id_cache::GenericIdCacheService<ck::drive_file::Entity>, pub drive_file_cache: generic_id_cache::GenericIdCacheService<ck::drive_file::Entity>,
pub federation_client: FederationClient,
pub ap_client: Box<ApClient>,
} }
impl Debug for MagnetarService { impl Debug for MagnetarService {
@ -35,18 +43,12 @@ impl Debug for MagnetarService {
} }
} }
#[derive(Debug, Error)]
pub enum ServiceInitError {
#[error("Authentication cache initialization error: {0}")]
AuthCacheError(#[from] local_user_cache::UserCacheError),
}
impl MagnetarService { impl MagnetarService {
pub async fn new( pub async fn new(
config: &'static MagnetarConfig, config: &'static MagnetarConfig,
db: CalckeyModel, db: CalckeyModel,
cache: CalckeyCache, cache: CalckeyCache,
) -> Result<Self, ServiceInitError> { ) -> miette::Result<Self> {
let local_user_cache = let local_user_cache =
local_user_cache::LocalUserCacheService::new(config, db.clone(), cache.clone()).await?; local_user_cache::LocalUserCacheService::new(config, db.clone(), cache.clone()).await?;
let instance_meta_cache = instance_meta_cache::InstanceMetaCacheService::new(db.clone()); let instance_meta_cache = instance_meta_cache::InstanceMetaCacheService::new(db.clone());
@ -60,6 +62,9 @@ impl MagnetarService {
let drive_file_cache = let drive_file_cache =
generic_id_cache::GenericIdCacheService::new(db.clone(), 128, Duration::from_secs(10)); generic_id_cache::GenericIdCacheService::new(db.clone(), 128, Duration::from_secs(10));
let federation_client = new_federation_client_service(config)?;
let ap_client = Box::new(new_ap_client_service(Box::new(federation_client.clone())));
Ok(Self { Ok(Self {
db, db,
cache, cache,
@ -70,6 +75,8 @@ impl MagnetarService {
emoji_cache, emoji_cache,
drive_file_cache, drive_file_cache,
gen_id: GenIdService, gen_id: GenIdService,
federation_client,
ap_client,
}) })
} }

18
src/vars.rs Normal file
View File

@ -0,0 +1,18 @@
//! Dynamic configuration variables that would be very messy to configure from the environment
//!
//! Most of these values were arbitrarily chosen, so your mileage may vary when adjusting these.
//!
//! Larger instances may benefit from higher cache sizes and increased parallelism, while smaller
//! ones may want to opt for memory savings.
use miette::IntoDiagnostic;
#[derive(Debug)]
pub struct MagVars {}
fn parse_vars() -> miette::Result<MagVars> {
let default_cfg = std::fs::read_to_string("config/default-vars.kdl").into_diagnostic()?;
let doc = default_cfg.parse::<kdl::KdlDocument>().into_diagnostic()?;
Ok(MagVars {})
}

View File

@ -3,9 +3,13 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use magnetar_common::util::FediverseTagParseError; use magnetar_common::util::FediverseTagParseError;
use magnetar_federation::crypto::{ApHttpPrivateKeyParseError, ApHttpPublicKeyParseError}; use magnetar_federation::ap_client::ApClientError;
use magnetar_federation::crypto::{
ApHttpPrivateKeyParseError, ApHttpPublicKeyParseError, ApSigningError,
};
use magnetar_model::{CalckeyCacheError, CalckeyDbError}; use magnetar_model::{CalckeyCacheError, CalckeyDbError};
use miette::{diagnostic, miette, Diagnostic, Report}; use miette::{diagnostic, miette, Diagnostic, Report};
use serde::Serialize;
use serde_json::json; use serde_json::json;
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt::Display; use std::fmt::Display;
@ -17,7 +21,6 @@ pub mod auth;
pub mod extractors; pub mod extractors;
pub mod pagination; pub mod pagination;
#[derive(Debug, Error)] #[derive(Debug, Error)]
#[error("API Error")] #[error("API Error")]
pub struct ApiError { pub struct ApiError {
@ -27,10 +30,33 @@ pub struct ApiError {
pub cause: miette::Report, pub cause: miette::Report,
} }
#[derive(Debug, Serialize)]
pub struct ApiErrorBare<'a> {
pub status: u16,
pub nonce: &'a str,
pub message: &'a str,
}
impl Serialize for ApiError {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
ApiErrorBare {
status: self.status.as_u16(),
nonce: &self.nonce.to_string(),
message: &self.message,
}
.serialize(serializer)
}
}
impl ApiError { impl ApiError {
pub fn new(status: StatusCode, pub fn new(
status: StatusCode,
message: impl Into<Cow<'static, str>>, message: impl Into<Cow<'static, str>>,
cause: impl Into<Report>) -> Self { cause: impl Into<Report>,
) -> Self {
Self { Self {
status, status,
nonce: Ulid::new(), nonce: Ulid::new(),
@ -39,8 +65,7 @@ impl ApiError {
} }
} }
pub fn internal(message: impl Into<Cow<'static, str>>, pub fn internal(message: impl Into<Cow<'static, str>>, cause: impl Into<Report>) -> Self {
cause: impl Into<Report>) -> Self {
Self::new(StatusCode::INTERNAL_SERVER_ERROR, message, cause) Self::new(StatusCode::INTERNAL_SERVER_ERROR, message, cause)
} }
} }
@ -50,10 +75,16 @@ impl IntoResponse for ApiError {
let mut buf = [0; ulid::ULID_LEN]; let mut buf = [0; ulid::ULID_LEN];
let nonce = self.nonce.array_to_str(&mut buf); let nonce = self.nonce.array_to_str(&mut buf);
warn!("[status={},nonce={}] {}", self.status.as_str(), nonce, self.cause); warn!(
"[status={},nonce={}] {}",
self.status.as_str(),
nonce,
self.cause
);
let code = self
let code = self.cause.code() .cause
.code()
.as_deref() .as_deref()
.map(<dyn Display as ToString>::to_string); .map(<dyn Display as ToString>::to_string);
@ -83,9 +114,11 @@ impl From<AccessForbidden> for ApiError {
impl From<FediverseTagParseError> for ApiError { impl From<FediverseTagParseError> for ApiError {
fn from(err: FediverseTagParseError) -> Self { fn from(err: FediverseTagParseError) -> Self {
Self::new(StatusCode::BAD_REQUEST, Self::new(
StatusCode::BAD_REQUEST,
"Fediverse tag parse error", "Fediverse tag parse error",
miette!(code = "mag::access_forbidden", "{}", err)) miette!(code = "mag::access_forbidden", "{}", err),
)
} }
} }
@ -103,7 +136,10 @@ impl From<CalckeyCacheError> for ApiError {
impl From<PackError> for ApiError { impl From<PackError> for ApiError {
fn from(err: PackError) -> Self { fn from(err: PackError) -> Self {
Self::internal("Data transformation error", miette!(code = "mag::pack_error", "{}", err)) Self::internal(
"Data transformation error",
miette!(code = "mag::pack_error", "{}", err),
)
} }
} }
@ -125,20 +161,46 @@ pub struct ArgumentOutOfRange(pub String);
impl From<ArgumentOutOfRange> for ApiError { impl From<ArgumentOutOfRange> for ApiError {
fn from(err: ArgumentOutOfRange) -> Self { fn from(err: ArgumentOutOfRange) -> Self {
Self::new(StatusCode::BAD_REQUEST, format!("Argument out of range: {}", err.0), err) Self::new(
StatusCode::BAD_REQUEST,
format!("Argument out of range: {}", err.0),
err,
)
} }
} }
impl From<ApHttpPublicKeyParseError> for ApiError { impl From<ApHttpPublicKeyParseError> for ApiError {
fn from(err: ApHttpPublicKeyParseError) -> Self { fn from(err: ApHttpPublicKeyParseError) -> Self {
Self::internal("User public key parse error", Self::internal(
miette!(code = "mag::ap_http_public_key_parse_error", "{}", err)) "User public key parse error",
miette!(code = "mag::ap_http_public_key_parse_error", "{}", err),
)
} }
} }
impl From<ApHttpPrivateKeyParseError> for ApiError { impl From<ApHttpPrivateKeyParseError> for ApiError {
fn from(err: ApHttpPrivateKeyParseError) -> Self { fn from(err: ApHttpPrivateKeyParseError) -> Self {
Self::internal("User private key parse error", Self::internal(
miette!(code = "mag::ap_http_private_key_parse_error", "{}", err)) "User private key parse error",
miette!(code = "mag::ap_http_private_key_parse_error", "{}", err),
)
}
}
impl From<ApSigningError> for ApiError {
fn from(err: ApSigningError) -> Self {
Self::internal(
"ActivityPub HTTP signing error",
miette!(code = "mag::ap_signing_error", "{}", err),
)
}
}
impl From<ApClientError> for ApiError {
fn from(err: ApClientError) -> Self {
Self::internal(
"ActivityPub client error",
miette!(code = "mag::ap_client_error", "{}", err),
)
} }
} }