Better timeout handling
ci/woodpecker/push/ociImagePush Pipeline was successful Details

This commit is contained in:
Natty 2024-12-24 13:46:49 +01:00
parent 277dcb5e3e
commit 221aa69a18
3 changed files with 21 additions and 28 deletions

View File

@ -1,12 +1,11 @@
use async_stream::stream; use async_stream::stream;
use futures_util::{select, stream::StreamExt, FutureExt, Stream, TryStreamExt}; use futures_util::{stream::StreamExt, Stream, TryStreamExt};
use headers::UserAgent; use headers::UserAgent;
use hyper::body::Bytes; use hyper::body::Bytes;
use reqwest::{redirect::Policy, Client, RequestBuilder}; use reqwest::{redirect::Policy, Client, RequestBuilder};
use serde_json::Value; use serde_json::Value;
use std::time::Duration; use std::time::Duration;
use thiserror::Error; use thiserror::Error;
use tokio::pin;
use url::Url; use url::Url;
use magnetar_core::web_model::ContentType; use magnetar_core::web_model::ContentType;
@ -59,7 +58,7 @@ impl FederationClient {
let client = Client::builder() let client = Client::builder()
.https_only(force_https) .https_only(force_https)
.redirect(Policy::limited(5)) .redirect(Policy::limited(5))
.connect_timeout(Duration::from_secs(timeout_seconds)) .timeout(Duration::from_secs(timeout_seconds))
.build()?; .build()?;
Ok(FederationClient { Ok(FederationClient {
@ -146,13 +145,6 @@ impl FederationRequestBuilder<'_> {
} }
pub async fn send(self) -> Result<Vec<u8>, FederationClientError> { pub async fn send(self) -> Result<Vec<u8>, FederationClientError> {
let sleep = tokio::time::sleep(tokio::time::Duration::from_secs(
self.client.timeout_seconds,
))
.fuse();
tokio::pin!(sleep);
let body = async move {
self.send_stream() self.send_stream()
.await? .await?
.try_fold(Vec::new(), |mut acc, b| async move { .try_fold(Vec::new(), |mut acc, b| async move {
@ -161,15 +153,6 @@ impl FederationRequestBuilder<'_> {
}) })
.await .await
} }
.fuse();
pin!(body);
select! {
b = body => b,
_ = sleep => Err(FederationClientError::TimeoutError)
}
}
pub async fn json(self) -> Result<Value, FederationClientError> { pub async fn json(self) -> Result<Value, FederationClientError> {
let data = self.send().await?; let data = self.send().await?;

View File

@ -19,6 +19,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, B
use tokio::net::{TcpListener, UnixSocket}; use tokio::net::{TcpListener, UnixSocket};
use tokio::select; use tokio::select;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::{debug, error, info, warn, Instrument}; use tracing::{debug, error, info, warn, Instrument};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -377,7 +378,16 @@ fn process(
) -> BoxFuture<'static, miette::Result<(u64, Option<RpcResponse>)>> { ) -> BoxFuture<'static, miette::Result<(u64, Option<RpcResponse>)>> {
move |(serial, payload, listener)| { move |(serial, payload, listener)| {
let ctx = context.clone(); let ctx = context.clone();
tokio::task::spawn(async move { (serial, listener(ctx, payload).await) }) tokio::task::spawn(async move {
let start = Instant::now();
let res = listener(ctx, payload).await;
let took = start.elapsed();
if took.as_secs_f64() > 50.0 {
warn!("Handler took long: {} sec", took.as_secs_f64());
}
(serial, res)
}.instrument(tracing::info_span!("Request", ?serial)))
.map_err(|e| miette!(e)) .map_err(|e| miette!(e))
.boxed() .boxed()
} }

View File

@ -15,7 +15,7 @@ pub(super) fn new_federation_client_service(
FederationClient::new( FederationClient::new(
true, true,
256000, 256000,
25, 35,
UserAgent::from_str(&format!( UserAgent::from_str(&format!(
"magnetar/{} (https://{})", "magnetar/{} (https://{})",
config.branding.version, config.networking.host config.branding.version, config.networking.host