212 lines
6.1 KiB
Rust
212 lines
6.1 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
|
|
use cached::{Cached, TimedCache};
|
|
use strum::EnumVariantNames;
|
|
use thiserror::Error;
|
|
use tokio::sync::Mutex;
|
|
use tracing::error;
|
|
|
|
use magnetar_common::config::MagnetarConfig;
|
|
use magnetar_model::{
|
|
CalckeyCache, CalckeyCacheError, CalckeyDbError, CalckeyModel, CalckeySub, ck,
|
|
InternalStreamMessage, SubMessage,
|
|
};
|
|
|
|
use crate::web::ApiError;
|
|
|
|
#[derive(Debug, Error, EnumVariantNames)]
|
|
pub enum UserCacheError {
|
|
#[error("Database error: {0}")]
|
|
DbError(#[from] CalckeyDbError),
|
|
#[error("Redis error: {0}")]
|
|
RedisError(#[from] CalckeyCacheError),
|
|
}
|
|
|
|
impl From<UserCacheError> for ApiError {
|
|
fn from(err: UserCacheError) -> Self {
|
|
let mut api_error: ApiError = match err {
|
|
UserCacheError::DbError(err) => err.into(),
|
|
UserCacheError::RedisError(err) => err.into(),
|
|
};
|
|
|
|
api_error.message = format!("Local user cache error: {}", api_error.message);
|
|
|
|
api_error
|
|
}
|
|
}
|
|
|
|
struct LocalUserCache {
|
|
lifetime: TimedCache<String, ()>,
|
|
id_to_user: HashMap<String, Arc<ck::user::Model>>,
|
|
token_to_user: HashMap<String, Arc<ck::user::Model>>,
|
|
}
|
|
|
|
impl LocalUserCache {
|
|
fn purge(&mut self, user: impl AsRef<ck::user::Model>) {
|
|
let user = user.as_ref();
|
|
|
|
self.lifetime.cache_remove(&user.id);
|
|
|
|
if let Some(user) = self.id_to_user.remove(&user.id) {
|
|
if let Some(token) = user.token.clone() {
|
|
self.token_to_user.remove(&token);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn refresh(&mut self, user: Arc<ck::user::Model>) {
|
|
self.purge(&user);
|
|
|
|
self.lifetime.cache_set(user.id.clone(), ());
|
|
|
|
self.id_to_user.insert(user.id.clone(), user.clone());
|
|
|
|
if let Some(token) = user.token.clone() {
|
|
self.token_to_user.insert(token, user.clone());
|
|
}
|
|
}
|
|
|
|
/// Low-priority refresh. Only refreshes the cache if the user is not there.
|
|
/// Used mostly for getters that would otherwise data race with more important refreshes.
|
|
fn maybe_refresh(&mut self, user: &Arc<ck::user::Model>) {
|
|
if self.lifetime.cache_get(&user.id).is_none() {
|
|
self.refresh(user.clone());
|
|
}
|
|
}
|
|
|
|
fn get_by_id(&mut self, id: &str) -> Option<Arc<ck::user::Model>> {
|
|
if let Some(user) = self.id_to_user.get(id).cloned() {
|
|
if self.lifetime.cache_get(id).is_none() {
|
|
self.purge(&user);
|
|
return None;
|
|
}
|
|
|
|
return Some(user);
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
fn get_by_token(&mut self, token: &str) -> Option<Arc<ck::user::Model>> {
|
|
if let Some(user) = self.token_to_user.get(token).cloned() {
|
|
if self.lifetime.cache_get(&user.id).is_none() {
|
|
self.purge(&user);
|
|
return None;
|
|
}
|
|
|
|
return Some(user);
|
|
}
|
|
|
|
None
|
|
}
|
|
}
|
|
|
|
pub struct LocalUserCacheService {
|
|
db: CalckeyModel,
|
|
#[allow(dead_code)]
|
|
token_watch: CalckeySub,
|
|
cache: Arc<Mutex<LocalUserCache>>,
|
|
}
|
|
|
|
impl LocalUserCacheService {
|
|
pub(super) async fn new(
|
|
config: &MagnetarConfig,
|
|
db: CalckeyModel,
|
|
redis: CalckeyCache,
|
|
) -> Result<Self, UserCacheError> {
|
|
let cache = Arc::new(Mutex::new(LocalUserCache {
|
|
lifetime: TimedCache::with_lifespan(60 * 5),
|
|
id_to_user: HashMap::new(),
|
|
token_to_user: HashMap::new(),
|
|
}));
|
|
|
|
let cache_clone = cache.clone();
|
|
let db_clone = db.clone();
|
|
|
|
let token_watch = redis
|
|
.conn()
|
|
.await?
|
|
.subscribe(&config.networking.host, move |message| {
|
|
let cache = cache_clone.clone();
|
|
let db = db_clone.clone();
|
|
|
|
async move {
|
|
let SubMessage::Internal(internal) = message else {
|
|
return;
|
|
};
|
|
|
|
match internal {
|
|
InternalStreamMessage::LocalUserUpdated { id }
|
|
| InternalStreamMessage::UserChangeModeratorState { id, .. }
|
|
| InternalStreamMessage::UserChangeSilencedState { id, .. }
|
|
| InternalStreamMessage::UserChangeSuspendedState { id, .. }
|
|
| InternalStreamMessage::RemoteUserUpdated { id }
|
|
| InternalStreamMessage::UserTokenRegenerated { id, .. } => {
|
|
let user = match db.get_user_by_id(&id).await {
|
|
Ok(Some(user)) => user,
|
|
Ok(None) => return,
|
|
Err(e) => {
|
|
error!("Error fetching user from database: {}", e);
|
|
return;
|
|
}
|
|
};
|
|
|
|
cache.lock().await.refresh(Arc::new(user));
|
|
}
|
|
_ => {}
|
|
};
|
|
}
|
|
})
|
|
.await?;
|
|
|
|
Ok(Self {
|
|
cache,
|
|
db,
|
|
token_watch,
|
|
})
|
|
}
|
|
|
|
async fn map_cache_user(
|
|
&self,
|
|
user: Option<ck::user::Model>,
|
|
) -> Result<Option<Arc<ck::user::Model>>, UserCacheError> {
|
|
if let Some(user) = user {
|
|
let user = Arc::new(user);
|
|
self.cache.lock().await.maybe_refresh(&user);
|
|
return Ok(Some(user));
|
|
}
|
|
|
|
Ok(None)
|
|
}
|
|
|
|
pub async fn get_by_token(
|
|
&self,
|
|
token: &str,
|
|
) -> Result<Option<Arc<ck::user::Model>>, UserCacheError> {
|
|
let result = self.cache.lock().await.get_by_token(token);
|
|
|
|
if let Some(user) = result {
|
|
return Ok(Some(user));
|
|
}
|
|
|
|
self.map_cache_user(self.db.get_user_by_token(token).await?)
|
|
.await
|
|
}
|
|
|
|
pub async fn get_by_id(
|
|
&self,
|
|
id: &str,
|
|
) -> Result<Option<Arc<ck::user::Model>>, UserCacheError> {
|
|
let result = self.cache.lock().await.get_by_id(id);
|
|
|
|
if let Some(user) = result {
|
|
return Ok(Some(user));
|
|
}
|
|
|
|
let user = self.db.get_user_by_id(id).await?;
|
|
|
|
self.map_cache_user(user).await
|
|
}
|
|
}
|