Added a federation HTTP client and basic host-meta implementation
This commit is contained in:
parent
5430fc81f4
commit
c8d658f807
File diff suppressed because it is too large
Load Diff
|
@ -57,6 +57,7 @@ pub mod content_type {
|
|||
content_type!(pub ContentActivityStreams, "application/activity+json");
|
||||
content_type!(pub ContentHtml, "text/html");
|
||||
content_type!(pub ContentJson, "application/json");
|
||||
content_type!(pub ContentXrdXml, "application/xrd+xml");
|
||||
content_type!(pub ContentJrdJson, "application/jrd+json");
|
||||
content_type!(pub ContentMultipartFormData, "multipart/form-data");
|
||||
content_type!(pub ContentUrlEncoded, "application/x-www-form-urlencoded");
|
||||
|
@ -114,6 +115,7 @@ pub mod rel {
|
|||
link_rel!(pub RelSelf, "self");
|
||||
link_rel!(pub RelNext, "next");
|
||||
link_rel!(pub RelPrev, "prev");
|
||||
link_rel!(pub RelLrdd, "lrdd");
|
||||
link_rel!(pub RelOStatusSubscribe, "http://ostatus.org/schema/1.0/subscribe");
|
||||
link_rel!(pub RelNodeInfo20, "http://nodeinfo.diaspora.software/ns/schema/2.0");
|
||||
link_rel!(pub RelNodeInfo21, "http://nodeinfo.diaspora.software/ns/schema/2.1");
|
||||
|
@ -141,8 +143,7 @@ where
|
|||
let dt = data
|
||||
.iter()
|
||||
.filter_map(Value::as_str)
|
||||
.filter_map(|val| T::from_str(val).ok())
|
||||
.next();
|
||||
.find_map(|val| T::from_str(val).ok());
|
||||
|
||||
if let Some(value) = dt {
|
||||
Ok(ListContaining(value))
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
[package]
|
||||
name = "magnetar_host_meta"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[lib]
|
||||
crate-type = ["rlib"]
|
||||
|
||||
[dependencies]
|
||||
magnetar_core = { path = "../core" }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
|
|
@ -0,0 +1,87 @@
|
|||
use magnetar_core::web_model::{content_type::ContentXrdXml, rel::RelLrdd, Rel};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum XrdXml {
|
||||
#[serde(rename = "XRD")]
|
||||
Xrd(Xrd),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct Xrd {
|
||||
#[serde(rename = "@xmlns")]
|
||||
xmlns: String,
|
||||
#[serde(rename = "Link")]
|
||||
links: Vec<Link>,
|
||||
}
|
||||
|
||||
impl Xrd {
|
||||
pub fn default_host_meta(protocol: &str, domain: &str) -> Xrd {
|
||||
Xrd {
|
||||
xmlns: r#"http://docs.oasis-open.org/ns/xri/xrd-1.0"#.to_string(),
|
||||
links: vec![Link {
|
||||
rel: Some(RelLrdd.rel().to_string()),
|
||||
r#type: Some(ContentXrdXml.as_ref().to_string()),
|
||||
href: None,
|
||||
template: Some(format!(
|
||||
"{}://{}/.well-known/webfinger?resource={{uri}}",
|
||||
protocol, domain
|
||||
)),
|
||||
}],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename = "Link")]
|
||||
pub struct Link {
|
||||
#[serde(rename = "@rel")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
rel: Option<String>,
|
||||
#[serde(rename = "@type")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
r#type: Option<String>,
|
||||
#[serde(rename = "@href")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
href: Option<String>,
|
||||
#[serde(rename = "@template")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
template: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::error::Error;
|
||||
|
||||
use crate::{Xrd, XrdXml};
|
||||
|
||||
#[test]
|
||||
fn should_parse_host_meta() -> Result<(), Box<dyn Error>> {
|
||||
let xml = r#"
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<XRD xmlns="http://docs.oasis-open.org/ns/xri/xrd-1.0">
|
||||
<Link rel="lrdd" type="application/xrd+xml" template="https://example.org/.well-known/webfinger?resource={uri}"/>
|
||||
</XRD>
|
||||
"#;
|
||||
|
||||
let XrdXml::Xrd(xrd) = quick_xml::de::from_str::<XrdXml>(xml)?;
|
||||
|
||||
assert_eq!(xrd, Xrd::default_host_meta("https", "example.org"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_fail_on_invalid_root_tag() {
|
||||
let xml = r#"
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<ABC xmlns="http://docs.oasis-open.org/ns/xri/xrd-1.0">
|
||||
<Link rel="lrdd" type="application/xrd+xml" template="https://example.org/.well-known/webfinger?resource={uri}"/>
|
||||
</ABC>
|
||||
"#;
|
||||
|
||||
let xrd = quick_xml::de::from_str::<XrdXml>(xml);
|
||||
|
||||
assert!(xrd.is_err());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,192 @@
|
|||
use std::io::Cursor;
|
||||
|
||||
use async_stream::stream;
|
||||
use futures_util::{select, stream::StreamExt, FutureExt, Stream, TryStreamExt};
|
||||
use hyper::body::Bytes;
|
||||
use magnetar_common::config::MagnetarNetworkingProtocol;
|
||||
use magnetar_core::web_model::content_type::ContentActivityStreams;
|
||||
use magnetar_host_meta::Xrd;
|
||||
use reqwest::{Client, RequestBuilder};
|
||||
use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
use tokio::pin;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FederationClient {
|
||||
pub client: Client,
|
||||
pub body_limit: usize,
|
||||
pub timeout_seconds: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum FederationClientBuilderError {
|
||||
#[error("Reqwest error: {0}")]
|
||||
ReqwestError(#[from] reqwest::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum FederationClientError {
|
||||
#[error("Fetch timed out")]
|
||||
TimeoutError,
|
||||
#[error("Reqwest error: {0}")]
|
||||
ReqwestError(#[from] reqwest::Error),
|
||||
#[error("JSON error: {0}")]
|
||||
JsonError(#[from] serde_json::Error),
|
||||
#[error("XML error: {0}")]
|
||||
XmlError(#[from] quick_xml::de::DeError),
|
||||
#[error("Body limit exceeded error")]
|
||||
BodyLimitExceededError,
|
||||
#[error("Invalid URL: {0}")]
|
||||
InvalidUrl(#[from] url::ParseError),
|
||||
#[error("Client error: {0}")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FederationRequestBuilder<'a> {
|
||||
client: &'a FederationClient,
|
||||
builder: RequestBuilder,
|
||||
}
|
||||
|
||||
impl FederationClient {
|
||||
pub fn new(
|
||||
force_https: bool,
|
||||
body_limit: usize,
|
||||
timeout_seconds: u64,
|
||||
) -> Result<FederationClient, FederationClientBuilderError> {
|
||||
let client = Client::builder().https_only(force_https).build()?;
|
||||
|
||||
Ok(FederationClient {
|
||||
client,
|
||||
body_limit,
|
||||
timeout_seconds,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn builder(&self, method: reqwest::Method, url: Url) -> FederationRequestBuilder<'_> {
|
||||
FederationRequestBuilder {
|
||||
client: &self,
|
||||
builder: self.client.request(method, url),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, url: Url) -> FederationRequestBuilder<'_> {
|
||||
self.builder(reqwest::Method::GET, url)
|
||||
}
|
||||
|
||||
pub async fn host_meta(
|
||||
&self,
|
||||
protocol: MagnetarNetworkingProtocol,
|
||||
host: &str,
|
||||
) -> Result<Xrd, FederationClientError> {
|
||||
let host_meta_xml = self
|
||||
.get(Url::parse(&format!(
|
||||
"{}://{}/.well-known/host-meta",
|
||||
protocol.as_ref(),
|
||||
host
|
||||
))?)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let reader = quick_xml::de::from_reader(Cursor::new(host_meta_xml))?;
|
||||
|
||||
Ok(reader)
|
||||
}
|
||||
}
|
||||
|
||||
impl FederationRequestBuilder<'_> {
|
||||
pub fn headers(self, headers: reqwest::header::HeaderMap) -> Self {
|
||||
Self {
|
||||
client: self.client,
|
||||
builder: self.builder.headers(headers),
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_stream(
|
||||
self,
|
||||
) -> Result<impl Stream<Item = Result<Bytes, FederationClientError>>, FederationClientError>
|
||||
{
|
||||
let mut body = self
|
||||
.builder
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?
|
||||
.bytes_stream()
|
||||
.map(|b| b.map_err(FederationClientError::ReqwestError));
|
||||
|
||||
let body_limit = self.client.body_limit;
|
||||
let mut partial_length: usize = 0;
|
||||
Ok(stream! {
|
||||
while let Some(chunk) = body.next().await.transpose()? {
|
||||
if partial_length + chunk.len() > body_limit {
|
||||
yield Err(FederationClientError::BodyLimitExceededError);
|
||||
}
|
||||
|
||||
partial_length += chunk.len();
|
||||
yield Ok(chunk);
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn send(self) -> Result<Vec<u8>, FederationClientError> {
|
||||
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(
|
||||
self.client.timeout_seconds,
|
||||
))
|
||||
.fuse();
|
||||
tokio::pin!(sleep);
|
||||
|
||||
let body = async move {
|
||||
Ok(self
|
||||
.send_stream()
|
||||
.await?
|
||||
.try_fold(Vec::new(), |mut acc, b| async move {
|
||||
acc.extend_from_slice(&b);
|
||||
Ok(acc)
|
||||
})
|
||||
.await?)
|
||||
}
|
||||
.fuse();
|
||||
|
||||
pin!(body);
|
||||
|
||||
select! {
|
||||
b = body => b,
|
||||
_ = sleep => Err(FederationClientError::TimeoutError)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn dereference(self) -> Result<Value, FederationClientError> {
|
||||
let mut headers = reqwest::header::HeaderMap::new();
|
||||
|
||||
headers.insert(
|
||||
reqwest::header::ACCEPT,
|
||||
reqwest::header::HeaderValue::from_static(ContentActivityStreams.as_ref()),
|
||||
);
|
||||
|
||||
let data = self.send().await?;
|
||||
let json =
|
||||
serde_json::from_slice::<Value>(&data).map_err(FederationClientError::JsonError)?;
|
||||
|
||||
Ok(json)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use magnetar_common::config::MagnetarNetworkingProtocol;
|
||||
use miette::IntoDiagnostic;
|
||||
|
||||
use super::FederationClient;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test() -> miette::Result<()> {
|
||||
let client = FederationClient::new(true, 1024 * 1024, 30).into_diagnostic()?;
|
||||
let host_meta = client
|
||||
.host_meta(MagnetarNetworkingProtocol::Https, "astolfo.social")
|
||||
.await
|
||||
.into_diagnostic()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
pub mod federation_client;
|
Loading…
Reference in New Issue