magnetar/src/service/local_user_cache.rs

212 lines
6.1 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use cached::{Cached, TimedCache};
use strum::EnumVariantNames;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::error;
use magnetar_common::config::MagnetarConfig;
use magnetar_model::{
CalckeyCache, CalckeyCacheError, CalckeyDbError, CalckeyModel, CalckeySub, ck,
InternalStreamMessage, SubMessage,
};
use crate::web::ApiError;
#[derive(Debug, Error, EnumVariantNames)]
pub enum UserCacheError {
#[error("Database error: {0}")]
DbError(#[from] CalckeyDbError),
#[error("Redis error: {0}")]
RedisError(#[from] CalckeyCacheError),
}
impl From<UserCacheError> for ApiError {
fn from(err: UserCacheError) -> Self {
let mut api_error: ApiError = match err {
UserCacheError::DbError(err) => err.into(),
UserCacheError::RedisError(err) => err.into(),
};
api_error.message = format!("Local user cache error: {}", api_error.message);
api_error
}
}
struct LocalUserCache {
lifetime: TimedCache<String, ()>,
id_to_user: HashMap<String, Arc<ck::user::Model>>,
token_to_user: HashMap<String, Arc<ck::user::Model>>,
}
impl LocalUserCache {
fn purge(&mut self, user: impl AsRef<ck::user::Model>) {
let user = user.as_ref();
self.lifetime.cache_remove(&user.id);
if let Some(user) = self.id_to_user.remove(&user.id) {
if let Some(token) = user.token.clone() {
self.token_to_user.remove(&token);
}
}
}
fn refresh(&mut self, user: Arc<ck::user::Model>) {
self.purge(&user);
self.lifetime.cache_set(user.id.clone(), ());
self.id_to_user.insert(user.id.clone(), user.clone());
if let Some(token) = user.token.clone() {
self.token_to_user.insert(token, user.clone());
}
}
/// Low-priority refresh. Only refreshes the cache if the user is not there.
/// Used mostly for getters that would otherwise data race with more important refreshes.
fn maybe_refresh(&mut self, user: &Arc<ck::user::Model>) {
if self.lifetime.cache_get(&user.id).is_none() {
self.refresh(user.clone());
}
}
fn get_by_id(&mut self, id: &str) -> Option<Arc<ck::user::Model>> {
if let Some(user) = self.id_to_user.get(id).cloned() {
if self.lifetime.cache_get(id).is_none() {
self.purge(&user);
return None;
}
return Some(user);
}
None
}
fn get_by_token(&mut self, token: &str) -> Option<Arc<ck::user::Model>> {
if let Some(user) = self.token_to_user.get(token).cloned() {
if self.lifetime.cache_get(&user.id).is_none() {
self.purge(&user);
return None;
}
return Some(user);
}
None
}
}
pub struct LocalUserCacheService {
db: CalckeyModel,
#[allow(dead_code)]
token_watch: CalckeySub,
cache: Arc<Mutex<LocalUserCache>>,
}
impl LocalUserCacheService {
pub(super) async fn new(
config: &MagnetarConfig,
db: CalckeyModel,
redis: CalckeyCache,
) -> Result<Self, UserCacheError> {
let cache = Arc::new(Mutex::new(LocalUserCache {
lifetime: TimedCache::with_lifespan(60 * 5),
id_to_user: HashMap::new(),
token_to_user: HashMap::new(),
}));
let cache_clone = cache.clone();
let db_clone = db.clone();
let token_watch = redis
.conn()
.await?
.subscribe(&config.networking.host, move |message| {
let cache = cache_clone.clone();
let db = db_clone.clone();
async move {
let SubMessage::Internal(internal) = message else {
return;
};
match internal {
InternalStreamMessage::LocalUserUpdated { id }
| InternalStreamMessage::UserChangeModeratorState { id, .. }
| InternalStreamMessage::UserChangeSilencedState { id, .. }
| InternalStreamMessage::UserChangeSuspendedState { id, .. }
| InternalStreamMessage::RemoteUserUpdated { id }
| InternalStreamMessage::UserTokenRegenerated { id, .. } => {
let user = match db.get_user_by_id(&id).await {
Ok(Some(user)) => user,
Ok(None) => return,
Err(e) => {
error!("Error fetching user from database: {}", e);
return;
}
};
cache.lock().await.refresh(Arc::new(user));
}
_ => {}
};
}
})
.await?;
Ok(Self {
cache,
db,
token_watch,
})
}
async fn map_cache_user(
&self,
user: Option<ck::user::Model>,
) -> Result<Option<Arc<ck::user::Model>>, UserCacheError> {
if let Some(user) = user {
let user = Arc::new(user);
self.cache.lock().await.maybe_refresh(&user);
return Ok(Some(user));
}
Ok(None)
}
pub async fn get_by_token(
&self,
token: &str,
) -> Result<Option<Arc<ck::user::Model>>, UserCacheError> {
let result = self.cache.lock().await.get_by_token(token);
if let Some(user) = result {
return Ok(Some(user));
}
self.map_cache_user(self.db.get_user_by_token(token).await?)
.await
}
pub async fn get_by_id(
&self,
id: &str,
) -> Result<Option<Arc<ck::user::Model>>, UserCacheError> {
let result = self.cache.lock().await.get_by_id(id);
if let Some(user) = result {
return Ok(Some(user));
}
let user = self.db.get_user_by_id(id).await?;
self.map_cache_user(user).await
}
}