magnetar/src/service/user_cache.rs

240 lines
7.0 KiB
Rust

use crate::web::ApiError;
use cached::{Cached, TimedCache};
use magnetar_calckey_model::{
user, CalckeyCache, CalckeyCacheError, CalckeyDbError, CalckeyModel, CalckeySub,
InternalStreamMessage, SubMessage,
};
use magnetar_common::config::MagnetarConfig;
use std::collections::HashMap;
use std::sync::Arc;
use strum::EnumVariantNames;
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::error;
#[derive(Debug, Error, EnumVariantNames)]
pub enum UserCacheError {
#[error("Database error: {0}")]
DbError(#[from] CalckeyDbError),
#[error("Redis error: {0}")]
RedisError(#[from] CalckeyCacheError),
}
impl From<UserCacheError> for ApiError {
fn from(err: UserCacheError) -> Self {
let mut api_error: ApiError = match err {
UserCacheError::DbError(err) => err.into(),
UserCacheError::RedisError(err) => err.into(),
};
api_error.message = format!("User cache error: {}", api_error.message);
api_error
}
}
struct UserCache {
lifetime: TimedCache<String, ()>,
id_to_user: HashMap<String, Arc<user::Model>>,
token_to_user: HashMap<String, Arc<user::Model>>,
uri_to_user: HashMap<String, Arc<user::Model>>,
}
impl UserCache {
fn purge(&mut self, user: impl AsRef<user::Model>) {
let user = user.as_ref();
self.lifetime.cache_remove(&user.id);
if let Some(user) = self.id_to_user.remove(&user.id) {
if let Some(token) = user.token.clone() {
self.token_to_user.remove(&token);
}
if let Some(uri) = user.uri.clone() {
self.uri_to_user.remove(&uri);
}
}
}
fn refresh(&mut self, user: Arc<user::Model>) {
self.purge(&user);
self.lifetime.cache_set(user.id.clone(), ());
self.id_to_user.insert(user.id.clone(), user.clone());
if let Some(token) = user.token.clone() {
self.token_to_user.insert(token, user.clone());
}
if let Some(uri) = user.uri.clone() {
self.uri_to_user.insert(uri, user);
}
}
/// Low-priority refresh. Only refreshes the cache if the user is not there.
/// Used mostly for getters that would otherwise data race with more important refreshes.
fn maybe_refresh(&mut self, user: &Arc<user::Model>) {
if self.lifetime.cache_get(&user.id).is_none() {
self.refresh(user.clone());
}
}
fn get_by_id(&mut self, id: &str) -> Option<Arc<user::Model>> {
if let Some(user) = self.id_to_user.get(id).cloned() {
if self.lifetime.cache_get(id).is_none() {
self.purge(&user);
return None;
}
return Some(user);
}
None
}
fn get_by_token(&mut self, token: &str) -> Option<Arc<user::Model>> {
if let Some(user) = self.token_to_user.get(token).cloned() {
if self.lifetime.cache_get(&user.id).is_none() {
self.purge(&user);
return None;
}
return Some(user);
}
None
}
fn get_by_uri(&mut self, uri: &str) -> Option<Arc<user::Model>> {
if let Some(user) = self.uri_to_user.get(uri).cloned() {
if self.lifetime.cache_get(&user.id).is_none() {
self.purge(&user);
return None;
}
return Some(user);
}
None
}
}
pub struct UserCacheService {
db: CalckeyModel,
#[allow(dead_code)]
token_watch: CalckeySub,
cache: Arc<Mutex<UserCache>>,
}
impl UserCacheService {
pub(super) async fn new(
config: &MagnetarConfig,
db: CalckeyModel,
redis: CalckeyCache,
) -> Result<Self, UserCacheError> {
let cache = Arc::new(Mutex::new(UserCache {
lifetime: TimedCache::with_lifespan(60 * 5),
id_to_user: HashMap::new(),
token_to_user: HashMap::new(),
uri_to_user: HashMap::new(),
}));
let cache_clone = cache.clone();
let db_clone = db.clone();
let token_watch = redis
.conn()
.await?
.subscribe(&config.networking.host, move |message| {
let cache = cache_clone.clone();
let db = db_clone.clone();
async move {
let SubMessage::Internal(internal) = message else {
return;
};
match internal {
InternalStreamMessage::LocalUserUpdated { id }
| InternalStreamMessage::UserChangeModeratorState { id, .. }
| InternalStreamMessage::UserChangeSilencedState { id, .. }
| InternalStreamMessage::UserChangeSuspendedState { id, .. }
| InternalStreamMessage::RemoteUserUpdated { id }
| InternalStreamMessage::UserTokenRegenerated { id, .. } => {
let user = match db.get_user_by_id(&id).await {
Ok(Some(user)) => user,
Ok(None) => return,
Err(e) => {
error!("Error fetching user from database: {}", e);
return;
}
};
cache.lock().await.refresh(Arc::new(user));
}
_ => {}
};
}
})
.await?;
Ok(Self {
cache,
db,
token_watch,
})
}
async fn map_cache_user(
&self,
user: Option<user::Model>,
) -> Result<Option<Arc<user::Model>>, UserCacheError> {
if let Some(user) = user {
let user = Arc::new(user);
self.cache.lock().await.maybe_refresh(&user);
return Ok(Some(user));
}
Ok(None)
}
pub async fn get_by_token(
&self,
token: &str,
) -> Result<Option<Arc<user::Model>>, UserCacheError> {
let result = self.cache.lock().await.get_by_token(token);
if let Some(user) = result {
return Ok(Some(user));
}
self.map_cache_user(self.db.get_user_by_token(token).await?)
.await
}
pub async fn get_by_uri(&self, uri: &str) -> Result<Option<Arc<user::Model>>, UserCacheError> {
let result = self.cache.lock().await.get_by_uri(uri);
if let Some(user) = result {
return Ok(Some(user));
}
let user = self.db.get_user_by_uri(uri).await?;
self.map_cache_user(user).await
}
pub async fn get_by_id(&self, id: &str) -> Result<Option<Arc<user::Model>>, UserCacheError> {
let result = self.cache.lock().await.get_by_id(id);
if let Some(user) = result {
return Ok(Some(user));
}
let user = self.db.get_user_by_id(id).await?;
self.map_cache_user(user).await
}
}