magnetar/src/service/instance_meta_cache.rs

116 lines
3.3 KiB
Rust

use crate::web::ApiError;
use magnetar_model::{ck, CalckeyDbError, CalckeyModel};
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::EnumVariantNames;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use tracing::error;
#[derive(Debug, Error, EnumVariantNames)]
pub enum InstanceMetaCacheError {
#[error("Database error: {0}")]
DbError(#[from] CalckeyDbError),
#[error("Cache channel closed")]
ChannelClosed,
}
impl From<InstanceMetaCacheError> for ApiError {
fn from(err: InstanceMetaCacheError) -> Self {
let mut api_error: ApiError = match err {
InstanceMetaCacheError::DbError(err) => err.into(),
InstanceMetaCacheError::ChannelClosed => err.into(),
};
api_error.message = format!("Instance meta cache error: {}", api_error.message);
api_error
}
}
struct Entry {
value: Option<Arc<ck::meta::Model>>,
last_fetched: Option<Instant>,
}
#[derive(Copy, Clone)]
enum CacheRequest {
Get,
Fetch,
}
type Callback = oneshot::Sender<Result<Arc<ck::meta::Model>, InstanceMetaCacheError>>;
struct InstanceMetaCache {
sender: mpsc::UnboundedSender<(CacheRequest, Callback)>,
}
impl InstanceMetaCache {
fn new(db: CalckeyModel) -> Self {
const REFRESH_INTERVAL_SEC: u64 = 10;
let stale_threshold = Duration::from_secs(REFRESH_INTERVAL_SEC);
let mut state = Entry {
value: None,
last_fetched: None,
};
let (req_tx, mut req_rx) = mpsc::unbounded_channel::<(CacheRequest, Callback)>();
tokio::spawn(async move {
while let Some((req, res_tx)) = req_rx.recv().await {
if let Some(val) = &state.value {
if state
.last_fetched
.is_some_and(|i| Instant::now() - i < stale_threshold)
&& !matches!(req, CacheRequest::Fetch)
{
res_tx.send(Ok(val.clone())).ok();
continue;
}
}
let res = db.get_instance_meta().await.map(Arc::new);
if let Ok(ref data) = res {
state.value = Some(data.clone());
state.last_fetched = Some(Instant::now());
}
res_tx.send(res.map_err(CalckeyDbError::into)).ok();
}
});
Self { sender: req_tx }
}
async fn get(&self, req: CacheRequest) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
let (tx, rx) = oneshot::channel();
self.sender
.send((req, tx))
.map_err(|_| InstanceMetaCacheError::ChannelClosed)?;
rx.await
.map_err(|_| InstanceMetaCacheError::ChannelClosed)?
}
}
pub struct InstanceMetaCacheService {
cache: InstanceMetaCache,
}
impl InstanceMetaCacheService {
pub(super) fn new(db: CalckeyModel) -> Self {
Self {
cache: InstanceMetaCache::new(db),
}
}
pub async fn fetch(&self) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
self.cache.get(CacheRequest::Fetch).await
}
pub async fn get(&self) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
self.cache.get(CacheRequest::Get).await
}
}