116 lines
3.3 KiB
Rust
116 lines
3.3 KiB
Rust
use crate::web::ApiError;
|
|
use magnetar_model::{ck, CalckeyDbError, CalckeyModel};
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
use strum::EnumVariantNames;
|
|
use thiserror::Error;
|
|
use tokio::sync::{mpsc, oneshot};
|
|
use tracing::error;
|
|
|
|
#[derive(Debug, Error, EnumVariantNames)]
|
|
pub enum InstanceMetaCacheError {
|
|
#[error("Database error: {0}")]
|
|
DbError(#[from] CalckeyDbError),
|
|
#[error("Cache channel closed")]
|
|
ChannelClosed,
|
|
}
|
|
|
|
impl From<InstanceMetaCacheError> for ApiError {
|
|
fn from(err: InstanceMetaCacheError) -> Self {
|
|
let mut api_error: ApiError = match err {
|
|
InstanceMetaCacheError::DbError(err) => err.into(),
|
|
InstanceMetaCacheError::ChannelClosed => err.into(),
|
|
};
|
|
|
|
api_error.message = format!("Instance meta cache error: {}", api_error.message);
|
|
|
|
api_error
|
|
}
|
|
}
|
|
|
|
struct Entry {
|
|
value: Option<Arc<ck::meta::Model>>,
|
|
last_fetched: Option<Instant>,
|
|
}
|
|
|
|
#[derive(Copy, Clone)]
|
|
enum CacheRequest {
|
|
Get,
|
|
Fetch,
|
|
}
|
|
|
|
type Callback = oneshot::Sender<Result<Arc<ck::meta::Model>, InstanceMetaCacheError>>;
|
|
|
|
struct InstanceMetaCache {
|
|
sender: mpsc::UnboundedSender<(CacheRequest, Callback)>,
|
|
}
|
|
|
|
impl InstanceMetaCache {
|
|
fn new(db: CalckeyModel) -> Self {
|
|
const REFRESH_INTERVAL_SEC: u64 = 10;
|
|
let stale_threshold = Duration::from_secs(REFRESH_INTERVAL_SEC);
|
|
|
|
let mut state = Entry {
|
|
value: None,
|
|
last_fetched: None,
|
|
};
|
|
|
|
let (req_tx, mut req_rx) = mpsc::unbounded_channel::<(CacheRequest, Callback)>();
|
|
|
|
tokio::spawn(async move {
|
|
while let Some((req, res_tx)) = req_rx.recv().await {
|
|
if let Some(val) = &state.value {
|
|
if state
|
|
.last_fetched
|
|
.is_some_and(|i| Instant::now() - i < stale_threshold)
|
|
&& !matches!(req, CacheRequest::Fetch)
|
|
{
|
|
res_tx.send(Ok(val.clone())).ok();
|
|
continue;
|
|
}
|
|
}
|
|
|
|
let res = db.get_instance_meta().await.map(Arc::new);
|
|
|
|
if let Ok(ref data) = res {
|
|
state.value = Some(data.clone());
|
|
state.last_fetched = Some(Instant::now());
|
|
}
|
|
|
|
res_tx.send(res.map_err(CalckeyDbError::into)).ok();
|
|
}
|
|
});
|
|
|
|
Self { sender: req_tx }
|
|
}
|
|
|
|
async fn get(&self, req: CacheRequest) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
|
|
let (tx, rx) = oneshot::channel();
|
|
self.sender
|
|
.send((req, tx))
|
|
.map_err(|_| InstanceMetaCacheError::ChannelClosed)?;
|
|
rx.await
|
|
.map_err(|_| InstanceMetaCacheError::ChannelClosed)?
|
|
}
|
|
}
|
|
|
|
pub struct InstanceMetaCacheService {
|
|
cache: InstanceMetaCache,
|
|
}
|
|
|
|
impl InstanceMetaCacheService {
|
|
pub(super) fn new(db: CalckeyModel) -> Self {
|
|
Self {
|
|
cache: InstanceMetaCache::new(db),
|
|
}
|
|
}
|
|
|
|
pub async fn fetch(&self) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
|
|
self.cache.get(CacheRequest::Fetch).await
|
|
}
|
|
|
|
pub async fn get(&self) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
|
|
self.cache.get(CacheRequest::Get).await
|
|
}
|
|
}
|