diff --git a/Cargo.lock b/Cargo.lock index 6016bdc..343d5ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1923,6 +1923,7 @@ dependencies = [ "miette 7.2.0", "percent-encoding", "quick-xml", + "reqwest", "rmp-serde", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index f14cfd6..50145eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ axum-extra = { workspace = true, features = ["typed-header"] } async-stream = { workspace = true } headers = { workspace = true } hyper = { workspace = true, features = ["full"] } +reqwest = { workspace = true, features = ["hickory-dns"] } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tower = { workspace = true } diff --git a/src/model/activity/delivery.rs b/src/model/activity/delivery.rs new file mode 100644 index 0000000..abb4710 --- /dev/null +++ b/src/model/activity/delivery.rs @@ -0,0 +1,247 @@ +use hyper::StatusCode; +use magnetar_federation::{ + ap_client::ApClientError, client::federation_client::FederationClientError, + crypto::ApSigningError, +}; +use serde::Serialize; +use thiserror::Error; + +use crate::{service::local_user_cache::UserCacheError, web::ObjectNotFound}; + +#[derive(Debug, Clone, Copy, Serialize)] +pub enum DeliveryErrorClass { + Retriable, + RetriableLater, + Unrecovarable, +} + +#[derive(Debug, Error, Serialize)] +#[error("Invalid delivery task definition: {}")] +#[serde(tag = "type")] +pub enum InvalidDeliveryTaskError { + #[error("No signer with ID: {0}")] + NoSuchSigner(String), + #[error("Invalid URL: {0}")] + UrlParse(String), + #[error("JSON serialization error: {0}")] + JsonSerialization(String), + #[error("Invalid header value: {0}")] + InvalidHeaderValue(String), + #[error("Reqwest builder error: {0}")] + Reqwest(String), +} + +#[derive(Debug, Error, Serialize)] +#[error("Retriable task delivery failure: {}")] +#[serde(tag = "type")] +pub enum RetriableLocalDeliveryTaskError { + #[error("Signing error: {0}")] + Signing(String), + #[error("JSON serialization I/O error: {0}")] + JsonSerialization(String), + #[error("User cache error: {0}")] + UserCache(String), +} + +#[derive(Debug, Error, Serialize)] +#[error("Retriable remote delivery error: {}")] +#[serde(tag = "type")] +pub enum RetriableRemoteDeliveryError { + #[error("Timeout")] + Timeout, + #[error("Response body too large")] + BodyLimitExceeded, + #[error("Rate limited")] + RateLimited, + #[error("Bad Gateway")] + BadGateway, + #[error("Service Unavailable")] + ServiceUnavailable, + #[error("Internal Server Error")] + InternalServerError, + #[error("UTF-8 decode error: {0}")] + Utf8(String), + #[error("JSON deserialization error: {0}")] + Json(String), + #[error("Reqwest connect error: {0}")] + Connect(String), + #[error("Reqwest decode error: {0}")] + ResponseParse(String), + #[error("Reqwest error: {0}")] + GenericReqwest(String), + #[error("Other 5xx error: {0}")] + StatusOther(u16), +} + +#[derive(Debug, Error, Serialize)] +#[error("Hard remote delivery error: {}")] +#[serde(tag = "type")] +pub enum HardRemoteDeliveryError { + #[error("Gone")] + Gone, + #[error("Not found")] + NotFound, + #[error("BadRequest")] + BadRequest, + #[error("Forbidden")] + Forbidden, + #[error("Other 4xx error: {0}")] + StatusOther(u16), +} + +#[derive(Debug, Error, Serialize)] +#[error("Delivery error: {kind}")] +pub struct DeliveryError { + kind: DeliveryErrorKind, + message: String, + retry_class: DeliveryErrorClass, +} + +#[derive(Debug, Error, Serialize)] +#[serde(tag = "type")] +pub enum DeliveryErrorKind { + #[error(transparent)] + InvalidTask(#[from] InvalidDeliveryTaskError), + #[error(transparent)] + RetriableLocalDeliveryTask(#[from] RetriableLocalDeliveryTaskError), + #[error(transparent)] + RetriableRemoteDelivery(#[from] RetriableRemoteDeliveryError), + #[error(transparent)] + HardRemoteDeliveryError(#[from] HardRemoteDeliveryError), + #[error("Other: {0}")] + Other(String), +} + +impl From for DeliveryError +where + D: Into, +{ + fn from(value: D) -> Self { + let err: DeliveryErrorKind = value.into(); + DeliveryError { + retry_class: err.get_class(), + message: err.to_string(), + kind: err, + } + } +} + +impl DeliveryErrorKind { + pub fn get_class(&self) -> DeliveryErrorClass { + match self { + DeliveryErrorKind::InvalidTask(_) => DeliveryErrorClass::Unrecovarable, + DeliveryErrorKind::RetriableLocalDeliveryTask( + RetriableLocalDeliveryTaskError::UserCache(_), + ) => DeliveryErrorClass::RetriableLater, + DeliveryErrorKind::RetriableLocalDeliveryTask(_) => DeliveryErrorClass::Retriable, + DeliveryErrorKind::RetriableRemoteDelivery(_) => DeliveryErrorClass::RetriableLater, + DeliveryErrorKind::HardRemoteDeliveryError(HardRemoteDeliveryError::NotFound) => { + DeliveryErrorClass::RetriableLater + } + DeliveryErrorKind::HardRemoteDeliveryError(_) => DeliveryErrorClass::Unrecovarable, + DeliveryErrorKind::Other(_) => DeliveryErrorClass::RetriableLater, + } + } +} + +impl From for DeliveryErrorKind { + fn from(ObjectNotFound(id): ObjectNotFound) -> Self { + InvalidDeliveryTaskError::NoSuchSigner(id).into() + } +} + +impl From for DeliveryErrorKind { + fn from(err: ApSigningError) -> Self { + RetriableLocalDeliveryTaskError::Signing(err.to_string()).into() + } +} + +impl From for DeliveryErrorKind { + fn from(value: ApClientError) -> Self { + match value { + ApClientError::FederationClientError(client_error) => client_error.into(), + ApClientError::SigningError(e) => e.into(), + ApClientError::UrlParseError(err) => { + InvalidDeliveryTaskError::UrlParse(err.to_string()).into() + } + ApClientError::SerializerError(err) if err.is_io() => { + RetriableLocalDeliveryTaskError::JsonSerialization(err.to_string()).into() + } + ApClientError::SerializerError(err) => { + InvalidDeliveryTaskError::JsonSerialization(err.to_string()).into() + } + ApClientError::InvalidHeaderValue(err) => { + InvalidDeliveryTaskError::InvalidHeaderValue(err.to_string()).into() + } + ApClientError::Utf8ParseError(e) => { + RetriableRemoteDeliveryError::Utf8(e.to_string()).into() + } + } + } +} + +impl From for DeliveryErrorKind { + fn from(value: FederationClientError) -> Self { + match value { + FederationClientError::TimeoutError => RetriableRemoteDeliveryError::Timeout.into(), + FederationClientError::ReqwestError(e) => e.into(), + FederationClientError::JsonError(e) => { + RetriableRemoteDeliveryError::Json(e.to_string()).into() + } + FederationClientError::XmlError(_) => { + unreachable!("No XML parsing occurs during ActivityPub fetches") + } + FederationClientError::BodyLimitExceededError => { + RetriableRemoteDeliveryError::BodyLimitExceeded.into() + } + FederationClientError::InvalidUrl(err) => { + InvalidDeliveryTaskError::UrlParse(err.to_string()).into() + } + FederationClientError::Other(msg) => DeliveryErrorKind::Other(msg), + } + } +} + +impl From for DeliveryErrorKind { + fn from(value: reqwest::Error) -> Self { + match value.status() { + Some(StatusCode::GONE) => return HardRemoteDeliveryError::Gone.into(), + Some(StatusCode::NOT_FOUND) => return HardRemoteDeliveryError::NotFound.into(), + Some(StatusCode::FORBIDDEN) => return HardRemoteDeliveryError::Forbidden.into(), + Some(StatusCode::BAD_REQUEST) => return HardRemoteDeliveryError::BadRequest.into(), + Some(StatusCode::SERVICE_UNAVAILABLE) => { + return RetriableRemoteDeliveryError::ServiceUnavailable.into() + } + Some(StatusCode::BAD_GATEWAY) => { + return RetriableRemoteDeliveryError::BadGateway.into() + } + Some(StatusCode::INTERNAL_SERVER_ERROR) => { + return RetriableRemoteDeliveryError::InternalServerError.into() + } + Some(StatusCode::TOO_MANY_REQUESTS) => { + return RetriableRemoteDeliveryError::RateLimited.into() + } + Some(s) if s.is_client_error() => { + return HardRemoteDeliveryError::StatusOther(s.as_u16()).into(); + } + Some(s) if s.is_server_error() => { + return RetriableRemoteDeliveryError::StatusOther(s.as_u16()).into(); + } + _ => {} + } + + match value { + e if e.is_connect() => RetriableRemoteDeliveryError::Connect(e.to_string()).into(), + e if e.is_timeout() => RetriableRemoteDeliveryError::Timeout.into(), + e if e.is_decode() => RetriableRemoteDeliveryError::ResponseParse(e.to_string()).into(), + e if e.is_builder() => InvalidDeliveryTaskError::Reqwest(e.to_string()).into(), + e => RetriableRemoteDeliveryError::GenericReqwest(e.to_string()).into(), + } + } +} + +impl From for DeliveryErrorKind { + fn from(err: UserCacheError) -> Self { + RetriableLocalDeliveryTaskError::UserCache(err.to_string()).into() + } +} diff --git a/src/model/activity/mod.rs b/src/model/activity/mod.rs new file mode 100644 index 0000000..4be6e14 --- /dev/null +++ b/src/model/activity/mod.rs @@ -0,0 +1 @@ +pub mod delivery; diff --git a/src/model/mod.rs b/src/model/mod.rs index f31a618..f979a40 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; +pub mod activity; pub mod data; pub mod processing; diff --git a/src/rpc_v1/mod.rs b/src/rpc_v1/mod.rs index 783e308..6ddb1ce 100644 --- a/src/rpc_v1/mod.rs +++ b/src/rpc_v1/mod.rs @@ -6,7 +6,7 @@ use serde::Deserialize; use tracing::debug; use crate::{ - model::{processing::note::NoteModel, PackingContext}, + model::{activity::delivery::DeliveryError, processing::note::NoteModel, PackingContext}, service::MagnetarService, web::{ApiError, ObjectNotFound}, }; @@ -70,7 +70,7 @@ pub fn create_rpc_router() -> MagRpc { .signed_get(signing_key, SigningAlgorithm::RsaSha256, None, &url) .await?; - Result::<_, ApiError>::Ok(result) + Result::<_, DeliveryError>::Ok(result) }, ) .handle( @@ -92,7 +92,7 @@ pub fn create_rpc_router() -> MagRpc { .ap_client .signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body) .await?; - Result::<_, ApiError>::Ok(result) + Result::<_, DeliveryError>::Ok(result) }, ) } diff --git a/src/rpc_v1/proto.rs b/src/rpc_v1/proto.rs index 1ca166b..9e9cb59 100644 --- a/src/rpc_v1/proto.rs +++ b/src/rpc_v1/proto.rs @@ -1,6 +1,7 @@ use crate::service::MagnetarService; +use either::Either; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; -use miette::{miette, Error, IntoDiagnostic}; +use miette::{miette, IntoDiagnostic}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::any::Any; @@ -14,7 +15,7 @@ use std::sync::Arc; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, UnixSocket}; use tokio::select; -use tokio::task::{JoinError, JoinSet}; +use tokio::task::JoinSet; use tracing::{debug, error, info, warn, Instrument}; #[derive(Debug, Clone)] @@ -77,6 +78,17 @@ impl IntoR } } +impl IntoRpcResponse + for Either +{ + fn into_rpc_response(self) -> Option { + match self { + Either::Left(data) => data.into_rpc_response(), + Either::Right(data) => data.into_rpc_response(), + } + } +} + pub trait RpcHandler: Send + Sync + 'static where T: Send + 'static,