Report proper RPC errors
ci/woodpecker/push/ociImagePush Pipeline was successful Details

This commit is contained in:
Natty 2024-11-17 03:56:46 +01:00
parent 2acc41587a
commit 0a23953ae9
Signed by: natty
GPG Key ID: BF6CB659ADEE60EC
7 changed files with 268 additions and 5 deletions

1
Cargo.lock generated
View File

@ -1923,6 +1923,7 @@ dependencies = [
"miette 7.2.0", "miette 7.2.0",
"percent-encoding", "percent-encoding",
"quick-xml", "quick-xml",
"reqwest",
"rmp-serde", "rmp-serde",
"serde", "serde",
"serde_json", "serde_json",

View File

@ -110,6 +110,7 @@ axum-extra = { workspace = true, features = ["typed-header"] }
async-stream = { workspace = true } async-stream = { workspace = true }
headers = { workspace = true } headers = { workspace = true }
hyper = { workspace = true, features = ["full"] } hyper = { workspace = true, features = ["full"] }
reqwest = { workspace = true, features = ["hickory-dns"] }
tokio = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true } tokio-stream = { workspace = true }
tower = { workspace = true } tower = { workspace = true }

View File

@ -0,0 +1,247 @@
use hyper::StatusCode;
use magnetar_federation::{
ap_client::ApClientError, client::federation_client::FederationClientError,
crypto::ApSigningError,
};
use serde::Serialize;
use thiserror::Error;
use crate::{service::local_user_cache::UserCacheError, web::ObjectNotFound};
#[derive(Debug, Clone, Copy, Serialize)]
pub enum DeliveryErrorClass {
Retriable,
RetriableLater,
Unrecovarable,
}
#[derive(Debug, Error, Serialize)]
#[error("Invalid delivery task definition: {}")]
#[serde(tag = "type")]
pub enum InvalidDeliveryTaskError {
#[error("No signer with ID: {0}")]
NoSuchSigner(String),
#[error("Invalid URL: {0}")]
UrlParse(String),
#[error("JSON serialization error: {0}")]
JsonSerialization(String),
#[error("Invalid header value: {0}")]
InvalidHeaderValue(String),
#[error("Reqwest builder error: {0}")]
Reqwest(String),
}
#[derive(Debug, Error, Serialize)]
#[error("Retriable task delivery failure: {}")]
#[serde(tag = "type")]
pub enum RetriableLocalDeliveryTaskError {
#[error("Signing error: {0}")]
Signing(String),
#[error("JSON serialization I/O error: {0}")]
JsonSerialization(String),
#[error("User cache error: {0}")]
UserCache(String),
}
#[derive(Debug, Error, Serialize)]
#[error("Retriable remote delivery error: {}")]
#[serde(tag = "type")]
pub enum RetriableRemoteDeliveryError {
#[error("Timeout")]
Timeout,
#[error("Response body too large")]
BodyLimitExceeded,
#[error("Rate limited")]
RateLimited,
#[error("Bad Gateway")]
BadGateway,
#[error("Service Unavailable")]
ServiceUnavailable,
#[error("Internal Server Error")]
InternalServerError,
#[error("UTF-8 decode error: {0}")]
Utf8(String),
#[error("JSON deserialization error: {0}")]
Json(String),
#[error("Reqwest connect error: {0}")]
Connect(String),
#[error("Reqwest decode error: {0}")]
ResponseParse(String),
#[error("Reqwest error: {0}")]
GenericReqwest(String),
#[error("Other 5xx error: {0}")]
StatusOther(u16),
}
#[derive(Debug, Error, Serialize)]
#[error("Hard remote delivery error: {}")]
#[serde(tag = "type")]
pub enum HardRemoteDeliveryError {
#[error("Gone")]
Gone,
#[error("Not found")]
NotFound,
#[error("BadRequest")]
BadRequest,
#[error("Forbidden")]
Forbidden,
#[error("Other 4xx error: {0}")]
StatusOther(u16),
}
#[derive(Debug, Error, Serialize)]
#[error("Delivery error: {kind}")]
pub struct DeliveryError {
kind: DeliveryErrorKind,
message: String,
retry_class: DeliveryErrorClass,
}
#[derive(Debug, Error, Serialize)]
#[serde(tag = "type")]
pub enum DeliveryErrorKind {
#[error(transparent)]
InvalidTask(#[from] InvalidDeliveryTaskError),
#[error(transparent)]
RetriableLocalDeliveryTask(#[from] RetriableLocalDeliveryTaskError),
#[error(transparent)]
RetriableRemoteDelivery(#[from] RetriableRemoteDeliveryError),
#[error(transparent)]
HardRemoteDeliveryError(#[from] HardRemoteDeliveryError),
#[error("Other: {0}")]
Other(String),
}
impl<D> From<D> for DeliveryError
where
D: Into<DeliveryErrorKind>,
{
fn from(value: D) -> Self {
let err: DeliveryErrorKind = value.into();
DeliveryError {
retry_class: err.get_class(),
message: err.to_string(),
kind: err,
}
}
}
impl DeliveryErrorKind {
pub fn get_class(&self) -> DeliveryErrorClass {
match self {
DeliveryErrorKind::InvalidTask(_) => DeliveryErrorClass::Unrecovarable,
DeliveryErrorKind::RetriableLocalDeliveryTask(
RetriableLocalDeliveryTaskError::UserCache(_),
) => DeliveryErrorClass::RetriableLater,
DeliveryErrorKind::RetriableLocalDeliveryTask(_) => DeliveryErrorClass::Retriable,
DeliveryErrorKind::RetriableRemoteDelivery(_) => DeliveryErrorClass::RetriableLater,
DeliveryErrorKind::HardRemoteDeliveryError(HardRemoteDeliveryError::NotFound) => {
DeliveryErrorClass::RetriableLater
}
DeliveryErrorKind::HardRemoteDeliveryError(_) => DeliveryErrorClass::Unrecovarable,
DeliveryErrorKind::Other(_) => DeliveryErrorClass::RetriableLater,
}
}
}
impl From<ObjectNotFound> for DeliveryErrorKind {
fn from(ObjectNotFound(id): ObjectNotFound) -> Self {
InvalidDeliveryTaskError::NoSuchSigner(id).into()
}
}
impl From<ApSigningError> for DeliveryErrorKind {
fn from(err: ApSigningError) -> Self {
RetriableLocalDeliveryTaskError::Signing(err.to_string()).into()
}
}
impl From<ApClientError> for DeliveryErrorKind {
fn from(value: ApClientError) -> Self {
match value {
ApClientError::FederationClientError(client_error) => client_error.into(),
ApClientError::SigningError(e) => e.into(),
ApClientError::UrlParseError(err) => {
InvalidDeliveryTaskError::UrlParse(err.to_string()).into()
}
ApClientError::SerializerError(err) if err.is_io() => {
RetriableLocalDeliveryTaskError::JsonSerialization(err.to_string()).into()
}
ApClientError::SerializerError(err) => {
InvalidDeliveryTaskError::JsonSerialization(err.to_string()).into()
}
ApClientError::InvalidHeaderValue(err) => {
InvalidDeliveryTaskError::InvalidHeaderValue(err.to_string()).into()
}
ApClientError::Utf8ParseError(e) => {
RetriableRemoteDeliveryError::Utf8(e.to_string()).into()
}
}
}
}
impl From<FederationClientError> for DeliveryErrorKind {
fn from(value: FederationClientError) -> Self {
match value {
FederationClientError::TimeoutError => RetriableRemoteDeliveryError::Timeout.into(),
FederationClientError::ReqwestError(e) => e.into(),
FederationClientError::JsonError(e) => {
RetriableRemoteDeliveryError::Json(e.to_string()).into()
}
FederationClientError::XmlError(_) => {
unreachable!("No XML parsing occurs during ActivityPub fetches")
}
FederationClientError::BodyLimitExceededError => {
RetriableRemoteDeliveryError::BodyLimitExceeded.into()
}
FederationClientError::InvalidUrl(err) => {
InvalidDeliveryTaskError::UrlParse(err.to_string()).into()
}
FederationClientError::Other(msg) => DeliveryErrorKind::Other(msg),
}
}
}
impl From<reqwest::Error> for DeliveryErrorKind {
fn from(value: reqwest::Error) -> Self {
match value.status() {
Some(StatusCode::GONE) => return HardRemoteDeliveryError::Gone.into(),
Some(StatusCode::NOT_FOUND) => return HardRemoteDeliveryError::NotFound.into(),
Some(StatusCode::FORBIDDEN) => return HardRemoteDeliveryError::Forbidden.into(),
Some(StatusCode::BAD_REQUEST) => return HardRemoteDeliveryError::BadRequest.into(),
Some(StatusCode::SERVICE_UNAVAILABLE) => {
return RetriableRemoteDeliveryError::ServiceUnavailable.into()
}
Some(StatusCode::BAD_GATEWAY) => {
return RetriableRemoteDeliveryError::BadGateway.into()
}
Some(StatusCode::INTERNAL_SERVER_ERROR) => {
return RetriableRemoteDeliveryError::InternalServerError.into()
}
Some(StatusCode::TOO_MANY_REQUESTS) => {
return RetriableRemoteDeliveryError::RateLimited.into()
}
Some(s) if s.is_client_error() => {
return HardRemoteDeliveryError::StatusOther(s.as_u16()).into();
}
Some(s) if s.is_server_error() => {
return RetriableRemoteDeliveryError::StatusOther(s.as_u16()).into();
}
_ => {}
}
match value {
e if e.is_connect() => RetriableRemoteDeliveryError::Connect(e.to_string()).into(),
e if e.is_timeout() => RetriableRemoteDeliveryError::Timeout.into(),
e if e.is_decode() => RetriableRemoteDeliveryError::ResponseParse(e.to_string()).into(),
e if e.is_builder() => InvalidDeliveryTaskError::Reqwest(e.to_string()).into(),
e => RetriableRemoteDeliveryError::GenericReqwest(e.to_string()).into(),
}
}
}
impl From<UserCacheError> for DeliveryErrorKind {
fn from(err: UserCacheError) -> Self {
RetriableLocalDeliveryTaskError::UserCache(err.to_string()).into()
}
}

View File

@ -0,0 +1 @@
pub mod delivery;

View File

@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex; use tokio::sync::Mutex;
pub mod activity;
pub mod data; pub mod data;
pub mod processing; pub mod processing;

View File

@ -6,7 +6,7 @@ use serde::Deserialize;
use tracing::debug; use tracing::debug;
use crate::{ use crate::{
model::{processing::note::NoteModel, PackingContext}, model::{activity::delivery::DeliveryError, processing::note::NoteModel, PackingContext},
service::MagnetarService, service::MagnetarService,
web::{ApiError, ObjectNotFound}, web::{ApiError, ObjectNotFound},
}; };
@ -70,7 +70,7 @@ pub fn create_rpc_router() -> MagRpc {
.signed_get(signing_key, SigningAlgorithm::RsaSha256, None, &url) .signed_get(signing_key, SigningAlgorithm::RsaSha256, None, &url)
.await?; .await?;
Result::<_, ApiError>::Ok(result) Result::<_, DeliveryError>::Ok(result)
}, },
) )
.handle( .handle(
@ -92,7 +92,7 @@ pub fn create_rpc_router() -> MagRpc {
.ap_client .ap_client
.signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body) .signed_post(signing_key, SigningAlgorithm::RsaSha256, None, &url, &body)
.await?; .await?;
Result::<_, ApiError>::Ok(result) Result::<_, DeliveryError>::Ok(result)
}, },
) )
} }

View File

@ -1,6 +1,7 @@
use crate::service::MagnetarService; use crate::service::MagnetarService;
use either::Either;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use miette::{miette, Error, IntoDiagnostic}; use miette::{miette, IntoDiagnostic};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::any::Any; use std::any::Any;
@ -14,7 +15,7 @@ use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{TcpListener, UnixSocket}; use tokio::net::{TcpListener, UnixSocket};
use tokio::select; use tokio::select;
use tokio::task::{JoinError, JoinSet}; use tokio::task::JoinSet;
use tracing::{debug, error, info, warn, Instrument}; use tracing::{debug, error, info, warn, Instrument};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -77,6 +78,17 @@ impl<T: Serialize + Send + 'static, E: Serialize + Debug + Send + 'static> IntoR
} }
} }
impl<L: IntoRpcResponse + Send + 'static, R: IntoRpcResponse + Send + 'static> IntoRpcResponse
for Either<L, R>
{
fn into_rpc_response(self) -> Option<RpcResponse> {
match self {
Either::Left(data) => data.into_rpc_response(),
Either::Right(data) => data.into_rpc_response(),
}
}
}
pub trait RpcHandler<T>: Send + Sync + 'static pub trait RpcHandler<T>: Send + Sync + 'static
where where
T: Send + 'static, T: Send + 'static,