From 18d526cf8c85fef15da17d532f04fbfc5863e13a Mon Sep 17 00:00:00 2001 From: Natty Date: Fri, 27 Oct 2023 01:41:48 +0200 Subject: [PATCH] Instance meta cache and initial pack processing for users --- ext_calckey_model/src/lib.rs | 18 +++++ magnetar_mmm_parser/src/lib.rs | 24 ++++++ magnetar_sdk/src/types/mod.rs | 10 ++- magnetar_sdk/src/types/user.rs | 2 +- src/model/data/note.rs | 2 +- src/model/data/user.rs | 38 +++++----- src/model/mod.rs | 3 +- src/model/processing/emoji.rs | 33 +++++++++ src/model/processing/mod.rs | 32 ++++++++ src/model/processing/user.rs | 51 +++++++++++++ src/service/instance_meta_cache.rs | 115 +++++++++++++++++++++++++++++ src/service/mod.rs | 4 + 12 files changed, 306 insertions(+), 26 deletions(-) create mode 100644 src/model/processing/emoji.rs create mode 100644 src/model/processing/mod.rs create mode 100644 src/model/processing/user.rs create mode 100644 src/service/instance_meta_cache.rs diff --git a/ext_calckey_model/src/lib.rs b/ext_calckey_model/src/lib.rs index 1fb0704..40fdb1e 100644 --- a/ext_calckey_model/src/lib.rs +++ b/ext_calckey_model/src/lib.rs @@ -1,5 +1,6 @@ pub use ck; use ck::*; +pub use sea_orm; use chrono::Utc; use ext_calckey_model_migration::{Migrator, MigratorTrait}; @@ -124,6 +125,23 @@ impl CalckeyModel { .await?) } + pub async fn fetch_many_emojis( + &self, + shortcodes: &[String], + host: Option<&str>, + ) -> Result, CalckeyDbError> { + let host_filter = if let Some(host) = host { + emoji::Column::Host.eq(host) + } else { + emoji::Column::Host.is_null() + }; + + let name_filter = emoji::Column::Name.is_in(shortcodes); + let filter = host_filter.and(name_filter); + + Ok(emoji::Entity::find().filter(filter).all(&self.0).await?) + } + pub async fn get_access_token( &self, token: &str, diff --git a/magnetar_mmm_parser/src/lib.rs b/magnetar_mmm_parser/src/lib.rs index fc9b06d..3cf6da8 100644 --- a/magnetar_mmm_parser/src/lib.rs +++ b/magnetar_mmm_parser/src/lib.rs @@ -233,6 +233,30 @@ impl Token { } } + pub fn walk_map_collect(&self, func: impl Fn(&Token) -> Option, out: &mut Vec) { + if let Some(v) = func(self) { + out.push(v) + } + + match self { + Token::Sequence(items) => { + items + .iter() + .for_each(|tok| tok.walk_map_collect(&func, out)); + } + Token::Quote(inner) + | Token::Small(inner) + | Token::BoldItalic(inner) + | Token::Bold(inner) + | Token::Italic(inner) + | Token::Center(inner) + | Token::Function { inner, .. } + | Token::Link { label: inner, .. } + | Token::Strikethrough(inner) => inner.walk_map_collect(func, out), + _ => {} + } + } + fn write(&self, writer: &mut quick_xml::Writer) -> quick_xml::Result<()> { match self { Token::PlainText(plain) => { diff --git a/magnetar_sdk/src/types/mod.rs b/magnetar_sdk/src/types/mod.rs index 6920a5b..215c045 100644 --- a/magnetar_sdk/src/types/mod.rs +++ b/magnetar_sdk/src/types/mod.rs @@ -23,16 +23,18 @@ pub struct Id { pub id: String, } -impl From<&str> for Id { - fn from(id: &str) -> Self { - Self { id: id.to_string() } +impl> From for Id { + fn from(id: T) -> Self { + Self { + id: id.as_ref().to_string(), + } } } #[derive(Clone, Debug, Deserialize, Serialize, TS)] #[ts(export)] #[repr(transparent)] -pub struct MmXml(String); +pub struct MmXml(pub String); #[derive(Copy, Clone, Debug, Deserialize, Serialize, TS)] #[ts(export)] diff --git a/magnetar_sdk/src/types/user.rs b/magnetar_sdk/src/types/user.rs index 851a04a..5a4f8d4 100644 --- a/magnetar_sdk/src/types/user.rs +++ b/magnetar_sdk/src/types/user.rs @@ -39,7 +39,7 @@ pub struct UserBase { pub acct: String, pub username: String, pub display_name: String, - pub display_name_mm: MmXml, + pub display_name_mm: Option, pub host: Option, pub speech_transform: SpeechTransform, pub created_at: DateTime, diff --git a/src/model/data/note.rs b/src/model/data/note.rs index 987566b..147bb78 100644 --- a/src/model/data/note.rs +++ b/src/model/data/note.rs @@ -19,7 +19,7 @@ impl PackType<&ck::note_reaction::Model> for ReactionBase { user_id: reaction.user_id.clone(), reaction: Reaction::guess_from( &reaction.reaction, - &context.instance_info.default_reaction, + &context.instance_meta.default_reaction, ) .unwrap_or_else(|| /* Shouldn't happen */ Reaction::Unicode("👍".to_string())), } diff --git a/src/model/data/user.rs b/src/model/data/user.rs index 03f033c..2a9edcf 100644 --- a/src/model/data/user.rs +++ b/src/model/data/user.rs @@ -16,11 +16,11 @@ impl PackType<&[PackEmojiBase]> for EmojiContext { } } -struct UserBaseSource<'a> { - user: &'a ck::user::Model, - username_mm: &'a MmXml, - avatar: &'a Option, - emoji_context: &'a EmojiContext, +pub struct UserBaseSource<'a> { + pub user: &'a ck::user::Model, + pub username_mm: Option<&'a MmXml>, + pub avatar: &'a Option, + pub emoji_context: &'a EmojiContext, } impl PackType> for UserBase { @@ -41,7 +41,7 @@ impl PackType> for UserBase { .unwrap_or_else(|| format!("@{}", user.username)), username: user.username.clone(), display_name: user.name.clone().unwrap_or_else(|| user.username.clone()), - display_name_mm: username_mm.clone(), + display_name_mm: username_mm.cloned(), host: user.host.clone(), speech_transform: if user.is_cat && user.speak_as_cat { SpeechTransform::Cat @@ -65,13 +65,13 @@ impl PackType> for UserBase { } } -struct UserProfileExtSource<'a> { - user: &'a ck::user::Model, - profile: &'a ck::user_profile::Model, - profile_fields: &'a Vec, - description_mm: Option<&'a MmXml>, - relation: Option<&'a UserRelationExt>, - emoji_context: &'a EmojiContext, +pub struct UserProfileExtSource<'a> { + pub user: &'a ck::user::Model, + pub profile: &'a ck::user_profile::Model, + pub profile_fields: &'a Vec, + pub description_mm: Option<&'a MmXml>, + pub relation: Option<&'a UserRelationExt>, + pub emoji_context: &'a EmojiContext, } impl PackType> for UserProfileExt { @@ -130,12 +130,12 @@ impl PackType<&ck::user::Model> for UserDetailExt { } struct UserRelationExtSource<'a> { - follow_out: Option<&'a ck::following::Model>, - follow_in: Option<&'a ck::following::Model>, - block_out: Option<&'a ck::blocking::Model>, - block_in: Option<&'a ck::blocking::Model>, - mute: Option<&'a ck::muting::Model>, - renote_mute: Option<&'a ck::renote_muting::Model>, + pub follow_out: Option<&'a ck::following::Model>, + pub follow_in: Option<&'a ck::following::Model>, + pub block_out: Option<&'a ck::blocking::Model>, + pub block_in: Option<&'a ck::blocking::Model>, + pub mute: Option<&'a ck::muting::Model>, + pub renote_mute: Option<&'a ck::renote_muting::Model>, } impl PackType> for UserRelationExt { diff --git a/src/model/mod.rs b/src/model/mod.rs index 031dbaf..63f9486 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -2,10 +2,11 @@ use magnetar_calckey_model::ck; use std::sync::Arc; pub mod data; +pub mod processing; #[derive(Clone, Debug)] pub struct PackingContext { - instance_info: Arc, + instance_meta: Arc, self_user: Option>, } diff --git a/src/model/processing/emoji.rs b/src/model/processing/emoji.rs new file mode 100644 index 0000000..e99ad69 --- /dev/null +++ b/src/model/processing/emoji.rs @@ -0,0 +1,33 @@ +use crate::model::processing::PackResult; +use crate::model::{PackType, PackingContext}; +use magnetar_calckey_model::{ck, CalckeyModel}; +use magnetar_sdk::types::emoji::{EmojiBase, PackEmojiBase}; +use magnetar_sdk::types::Id; +use magnetar_sdk::{Packed, Required}; + +pub struct EmojiModel(CalckeyModel); + +impl EmojiModel { + pub fn new(model: CalckeyModel) -> Self { + EmojiModel(model) + } + + pub fn pack_existing(&self, ctx: &PackingContext, emoji: &ck::emoji::Model) -> PackEmojiBase { + PackEmojiBase::pack_from(( + Required(Id::from(&emoji.id)), + Required(EmojiBase::extract(ctx, &emoji)), + )) + } + + pub async fn fetch_many_emojis( + &self, + ctx: &PackingContext, + shortcodes: &[String], + host: Option<&str>, + ) -> PackResult> { + let emojis = self.0.fetch_many_emojis(shortcodes, host).await?; + let packed_emojis = emojis.iter().map(|e| self.pack_existing(ctx, &e)).collect(); + + Ok(packed_emojis) + } +} diff --git a/src/model/processing/mod.rs b/src/model/processing/mod.rs new file mode 100644 index 0000000..f43b528 --- /dev/null +++ b/src/model/processing/mod.rs @@ -0,0 +1,32 @@ +use magnetar_calckey_model::sea_orm::DbErr; +use magnetar_calckey_model::CalckeyDbError; +use magnetar_sdk::mmm::Token; +use thiserror::Error; + +pub mod emoji; +pub mod user; + +#[derive(Debug, Error)] +pub enum PackError { + #[error("Database error: {0}")] + DbError(#[from] DbErr), + #[error("Calckey database wrapper error: {0}")] + CalckeyDbError(#[from] CalckeyDbError), +} + +pub type PackResult = Result; + +fn get_mm_token_emoji(token: &Token) -> Vec { + let mut v = Vec::new(); + token.walk_map_collect( + |t| { + if let Token::ShortcodeEmoji(e) = t { + Some(e.to_owned()) + } else { + None + } + }, + &mut v, + ); + v +} diff --git a/src/model/processing/user.rs b/src/model/processing/user.rs new file mode 100644 index 0000000..f258005 --- /dev/null +++ b/src/model/processing/user.rs @@ -0,0 +1,51 @@ +use crate::model::data::user::UserBaseSource; +use crate::model::processing::emoji::EmojiModel; +use crate::model::processing::{get_mm_token_emoji, PackResult}; +use crate::model::{PackType, PackingContext}; +use magnetar_calckey_model::sea_orm::EntityTrait; +use magnetar_calckey_model::{ck, CalckeyModel}; +use magnetar_sdk::types::emoji::EmojiContext; +use magnetar_sdk::types::user::{PackUserBase, UserBase}; +use magnetar_sdk::types::{Id, MmXml}; +use magnetar_sdk::{mmm, Packed, Required}; + +pub struct UserModel(CalckeyModel); + +impl UserModel { + pub async fn base_from_existing( + &self, + ctx: &PackingContext, + user: &ck::user::Model, + ) -> PackResult { + let avatar = if let Some(avatar_id) = user.avatar_id.as_ref() { + ck::drive_file::Entity::find_by_id(avatar_id) + .one(self.0.inner()) + .await? + } else { + None + }; + + let username_mm = + mmm::Context::default().parse_ui(user.name.as_deref().unwrap_or(&user.username)); + let emoji_model = EmojiModel::new(self.0.clone()); + let emojis = emoji_model + .fetch_many_emojis(ctx, &get_mm_token_emoji(&username_mm), user.host.as_deref()) + .await?; + let emoji_context = EmojiContext(emojis); + + let base = UserBase::extract( + ctx, + UserBaseSource { + user, + username_mm: mmm::to_xml_string(&username_mm).map(MmXml).as_ref().ok(), + avatar: &avatar, + emoji_context: &emoji_context, + }, + ); + + Ok(PackUserBase::pack_from(( + Required(Id::from(&user.id)), + Required(base), + ))) + } +} diff --git a/src/service/instance_meta_cache.rs b/src/service/instance_meta_cache.rs new file mode 100644 index 0000000..437b4dd --- /dev/null +++ b/src/service/instance_meta_cache.rs @@ -0,0 +1,115 @@ +use crate::web::ApiError; +use magnetar_calckey_model::{ck, CalckeyDbError, CalckeyModel}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use strum::EnumVariantNames; +use thiserror::Error; +use tokio::sync::{mpsc, oneshot}; +use tracing::error; + +#[derive(Debug, Error, EnumVariantNames)] +pub enum InstanceMetaCacheError { + #[error("Database error: {0}")] + DbError(#[from] CalckeyDbError), + #[error("Cache channel closed")] + ChannelClosed, +} + +impl From for ApiError { + fn from(err: InstanceMetaCacheError) -> Self { + let mut api_error: ApiError = match err { + InstanceMetaCacheError::DbError(err) => err.into(), + InstanceMetaCacheError::ChannelClosed => err.into(), + }; + + api_error.message = format!("Instance meta cache error: {}", api_error.message); + + api_error + } +} + +struct Entry { + value: Option>, + last_fetched: Option, +} + +#[derive(Copy, Clone)] +enum CacheRequest { + Get, + Fetch, +} + +type Callback = oneshot::Sender, InstanceMetaCacheError>>; + +struct InstanceMetaCache { + sender: mpsc::UnboundedSender<(CacheRequest, Callback)>, +} + +impl InstanceMetaCache { + fn new(db: CalckeyModel) -> Self { + const REFRESH_INTERVAL_SEC: u64 = 10; + let stale_threshold = Duration::from_secs(REFRESH_INTERVAL_SEC); + + let mut state = Entry { + value: None, + last_fetched: None, + }; + + let (req_tx, mut req_rx) = mpsc::unbounded_channel::<(CacheRequest, Callback)>(); + + tokio::spawn(async move { + while let Some((req, res_tx)) = req_rx.recv().await { + if let Some(val) = &state.value { + if state + .last_fetched + .is_some_and(|i| Instant::now() - i < stale_threshold) + && !matches!(req, CacheRequest::Fetch) + { + res_tx.send(Ok(val.clone())).ok(); + continue; + } + } + + let res = db.get_instance_meta().await.map(Arc::new); + + if let Ok(ref data) = res { + state.value = Some(data.clone()); + state.last_fetched = Some(Instant::now()); + } + + res_tx.send(res.map_err(CalckeyDbError::into)).ok(); + } + }); + + Self { sender: req_tx } + } + + async fn get(&self, req: CacheRequest) -> Result, InstanceMetaCacheError> { + let (tx, rx) = oneshot::channel(); + self.sender + .send((req, tx)) + .map_err(|_| InstanceMetaCacheError::ChannelClosed)?; + rx.await + .map_err(|_| InstanceMetaCacheError::ChannelClosed)? + } +} + +pub struct InstanceMetaCacheService { + cache: InstanceMetaCache, +} + +impl InstanceMetaCacheService { + pub(super) fn new(db: CalckeyModel) -> Self { + Self { + cache: InstanceMetaCache::new(db), + } + } + + pub async fn fetch(&self) -> Result, InstanceMetaCacheError> { + self.cache.get(CacheRequest::Fetch).await + } + + pub async fn get(&self) -> Result, InstanceMetaCacheError> { + self.cache.get(CacheRequest::Get).await + } +} diff --git a/src/service/mod.rs b/src/service/mod.rs index 7a88653..027cae4 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -2,6 +2,7 @@ use magnetar_calckey_model::{CalckeyCache, CalckeyModel}; use magnetar_common::config::MagnetarConfig; use thiserror::Error; +pub mod instance_meta_cache; pub mod user_cache; pub struct MagnetarService { @@ -9,6 +10,7 @@ pub struct MagnetarService { pub cache: CalckeyCache, pub config: &'static MagnetarConfig, pub auth_cache: user_cache::UserCacheService, + pub instance_meta_cache: instance_meta_cache::InstanceMetaCacheService, } #[derive(Debug, Error)] @@ -25,12 +27,14 @@ impl MagnetarService { ) -> Result { let auth_cache = user_cache::UserCacheService::new(config, db.clone(), cache.clone()).await?; + let instance_meta_cache = instance_meta_cache::InstanceMetaCacheService::new(db.clone()); Ok(Self { db, cache, config, auth_cache, + instance_meta_cache, }) } }