diff --git a/ext_federation/src/client/federation_client.rs b/ext_federation/src/client/federation_client.rs index e4e5d4e..c1c2084 100644 --- a/ext_federation/src/client/federation_client.rs +++ b/ext_federation/src/client/federation_client.rs @@ -1,12 +1,11 @@ use async_stream::stream; -use futures_util::{select, stream::StreamExt, FutureExt, Stream, TryStreamExt}; +use futures_util::{stream::StreamExt, Stream, TryStreamExt}; use headers::UserAgent; use hyper::body::Bytes; use reqwest::{redirect::Policy, Client, RequestBuilder}; use serde_json::Value; use std::time::Duration; use thiserror::Error; -use tokio::pin; use url::Url; use magnetar_core::web_model::ContentType; @@ -59,7 +58,7 @@ impl FederationClient { let client = Client::builder() .https_only(force_https) .redirect(Policy::limited(5)) - .connect_timeout(Duration::from_secs(timeout_seconds)) + .timeout(Duration::from_secs(timeout_seconds)) .build()?; Ok(FederationClient { @@ -146,29 +145,13 @@ impl FederationRequestBuilder<'_> { } pub async fn send(self) -> Result, FederationClientError> { - let sleep = tokio::time::sleep(tokio::time::Duration::from_secs( - self.client.timeout_seconds, - )) - .fuse(); - tokio::pin!(sleep); - - let body = async move { - self.send_stream() - .await? - .try_fold(Vec::new(), |mut acc, b| async move { - acc.extend_from_slice(&b); - Ok(acc) - }) - .await - } - .fuse(); - - pin!(body); - - select! { - b = body => b, - _ = sleep => Err(FederationClientError::TimeoutError) - } + self.send_stream() + .await? + .try_fold(Vec::new(), |mut acc, b| async move { + acc.extend_from_slice(&b); + Ok(acc) + }) + .await } pub async fn json(self) -> Result { diff --git a/src/rpc_v1/proto.rs b/src/rpc_v1/proto.rs index 45b33d1..7a7d0b9 100644 --- a/src/rpc_v1/proto.rs +++ b/src/rpc_v1/proto.rs @@ -19,6 +19,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, B use tokio::net::{TcpListener, UnixSocket}; use tokio::select; use tokio::task::JoinSet; +use tokio::time::Instant; use tracing::{debug, error, info, warn, Instrument}; #[derive(Debug, Clone)] @@ -377,7 +378,16 @@ fn process( ) -> BoxFuture<'static, miette::Result<(u64, Option)>> { move |(serial, payload, listener)| { let ctx = context.clone(); - tokio::task::spawn(async move { (serial, listener(ctx, payload).await) }) + tokio::task::spawn(async move { + let start = Instant::now(); + let res = listener(ctx, payload).await; + let took = start.elapsed(); + if took.as_secs_f64() > 50.0 { + warn!("Handler took long: {} sec", took.as_secs_f64()); + } + + (serial, res) + }.instrument(tracing::info_span!("Request", ?serial))) .map_err(|e| miette!(e)) .boxed() } diff --git a/src/service/federation_client.rs b/src/service/federation_client.rs index d702548..0c353fc 100644 --- a/src/service/federation_client.rs +++ b/src/service/federation_client.rs @@ -15,7 +15,7 @@ pub(super) fn new_federation_client_service( FederationClient::new( true, 256000, - 25, + 35, UserAgent::from_str(&format!( "magnetar/{} (https://{})", config.branding.version, config.networking.host