Instance meta cache and initial pack processing for users
This commit is contained in:
parent
fc86f0e29c
commit
18d526cf8c
|
@ -1,5 +1,6 @@
|
|||
pub use ck;
|
||||
use ck::*;
|
||||
pub use sea_orm;
|
||||
|
||||
use chrono::Utc;
|
||||
use ext_calckey_model_migration::{Migrator, MigratorTrait};
|
||||
|
@ -124,6 +125,23 @@ impl CalckeyModel {
|
|||
.await?)
|
||||
}
|
||||
|
||||
pub async fn fetch_many_emojis(
|
||||
&self,
|
||||
shortcodes: &[String],
|
||||
host: Option<&str>,
|
||||
) -> Result<Vec<emoji::Model>, CalckeyDbError> {
|
||||
let host_filter = if let Some(host) = host {
|
||||
emoji::Column::Host.eq(host)
|
||||
} else {
|
||||
emoji::Column::Host.is_null()
|
||||
};
|
||||
|
||||
let name_filter = emoji::Column::Name.is_in(shortcodes);
|
||||
let filter = host_filter.and(name_filter);
|
||||
|
||||
Ok(emoji::Entity::find().filter(filter).all(&self.0).await?)
|
||||
}
|
||||
|
||||
pub async fn get_access_token(
|
||||
&self,
|
||||
token: &str,
|
||||
|
|
|
@ -233,6 +233,30 @@ impl Token {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn walk_map_collect<T>(&self, func: impl Fn(&Token) -> Option<T>, out: &mut Vec<T>) {
|
||||
if let Some(v) = func(self) {
|
||||
out.push(v)
|
||||
}
|
||||
|
||||
match self {
|
||||
Token::Sequence(items) => {
|
||||
items
|
||||
.iter()
|
||||
.for_each(|tok| tok.walk_map_collect(&func, out));
|
||||
}
|
||||
Token::Quote(inner)
|
||||
| Token::Small(inner)
|
||||
| Token::BoldItalic(inner)
|
||||
| Token::Bold(inner)
|
||||
| Token::Italic(inner)
|
||||
| Token::Center(inner)
|
||||
| Token::Function { inner, .. }
|
||||
| Token::Link { label: inner, .. }
|
||||
| Token::Strikethrough(inner) => inner.walk_map_collect(func, out),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn write<T: Write>(&self, writer: &mut quick_xml::Writer<T>) -> quick_xml::Result<()> {
|
||||
match self {
|
||||
Token::PlainText(plain) => {
|
||||
|
|
|
@ -23,16 +23,18 @@ pub struct Id {
|
|||
pub id: String,
|
||||
}
|
||||
|
||||
impl From<&str> for Id {
|
||||
fn from(id: &str) -> Self {
|
||||
Self { id: id.to_string() }
|
||||
impl<T: AsRef<str>> From<T> for Id {
|
||||
fn from(id: T) -> Self {
|
||||
Self {
|
||||
id: id.as_ref().to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
|
||||
#[ts(export)]
|
||||
#[repr(transparent)]
|
||||
pub struct MmXml(String);
|
||||
pub struct MmXml(pub String);
|
||||
|
||||
#[derive(Copy, Clone, Debug, Deserialize, Serialize, TS)]
|
||||
#[ts(export)]
|
||||
|
|
|
@ -39,7 +39,7 @@ pub struct UserBase {
|
|||
pub acct: String,
|
||||
pub username: String,
|
||||
pub display_name: String,
|
||||
pub display_name_mm: MmXml,
|
||||
pub display_name_mm: Option<MmXml>,
|
||||
pub host: Option<String>,
|
||||
pub speech_transform: SpeechTransform,
|
||||
pub created_at: DateTime<Utc>,
|
||||
|
|
|
@ -19,7 +19,7 @@ impl PackType<&ck::note_reaction::Model> for ReactionBase {
|
|||
user_id: reaction.user_id.clone(),
|
||||
reaction: Reaction::guess_from(
|
||||
&reaction.reaction,
|
||||
&context.instance_info.default_reaction,
|
||||
&context.instance_meta.default_reaction,
|
||||
)
|
||||
.unwrap_or_else(|| /* Shouldn't happen */ Reaction::Unicode("👍".to_string())),
|
||||
}
|
||||
|
|
|
@ -16,11 +16,11 @@ impl PackType<&[PackEmojiBase]> for EmojiContext {
|
|||
}
|
||||
}
|
||||
|
||||
struct UserBaseSource<'a> {
|
||||
user: &'a ck::user::Model,
|
||||
username_mm: &'a MmXml,
|
||||
avatar: &'a Option<ck::drive_file::Model>,
|
||||
emoji_context: &'a EmojiContext,
|
||||
pub struct UserBaseSource<'a> {
|
||||
pub user: &'a ck::user::Model,
|
||||
pub username_mm: Option<&'a MmXml>,
|
||||
pub avatar: &'a Option<ck::drive_file::Model>,
|
||||
pub emoji_context: &'a EmojiContext,
|
||||
}
|
||||
|
||||
impl PackType<UserBaseSource<'_>> for UserBase {
|
||||
|
@ -41,7 +41,7 @@ impl PackType<UserBaseSource<'_>> for UserBase {
|
|||
.unwrap_or_else(|| format!("@{}", user.username)),
|
||||
username: user.username.clone(),
|
||||
display_name: user.name.clone().unwrap_or_else(|| user.username.clone()),
|
||||
display_name_mm: username_mm.clone(),
|
||||
display_name_mm: username_mm.cloned(),
|
||||
host: user.host.clone(),
|
||||
speech_transform: if user.is_cat && user.speak_as_cat {
|
||||
SpeechTransform::Cat
|
||||
|
@ -65,13 +65,13 @@ impl PackType<UserBaseSource<'_>> for UserBase {
|
|||
}
|
||||
}
|
||||
|
||||
struct UserProfileExtSource<'a> {
|
||||
user: &'a ck::user::Model,
|
||||
profile: &'a ck::user_profile::Model,
|
||||
profile_fields: &'a Vec<ProfileField>,
|
||||
description_mm: Option<&'a MmXml>,
|
||||
relation: Option<&'a UserRelationExt>,
|
||||
emoji_context: &'a EmojiContext,
|
||||
pub struct UserProfileExtSource<'a> {
|
||||
pub user: &'a ck::user::Model,
|
||||
pub profile: &'a ck::user_profile::Model,
|
||||
pub profile_fields: &'a Vec<ProfileField>,
|
||||
pub description_mm: Option<&'a MmXml>,
|
||||
pub relation: Option<&'a UserRelationExt>,
|
||||
pub emoji_context: &'a EmojiContext,
|
||||
}
|
||||
|
||||
impl PackType<UserProfileExtSource<'_>> for UserProfileExt {
|
||||
|
@ -130,12 +130,12 @@ impl PackType<&ck::user::Model> for UserDetailExt {
|
|||
}
|
||||
|
||||
struct UserRelationExtSource<'a> {
|
||||
follow_out: Option<&'a ck::following::Model>,
|
||||
follow_in: Option<&'a ck::following::Model>,
|
||||
block_out: Option<&'a ck::blocking::Model>,
|
||||
block_in: Option<&'a ck::blocking::Model>,
|
||||
mute: Option<&'a ck::muting::Model>,
|
||||
renote_mute: Option<&'a ck::renote_muting::Model>,
|
||||
pub follow_out: Option<&'a ck::following::Model>,
|
||||
pub follow_in: Option<&'a ck::following::Model>,
|
||||
pub block_out: Option<&'a ck::blocking::Model>,
|
||||
pub block_in: Option<&'a ck::blocking::Model>,
|
||||
pub mute: Option<&'a ck::muting::Model>,
|
||||
pub renote_mute: Option<&'a ck::renote_muting::Model>,
|
||||
}
|
||||
|
||||
impl PackType<UserRelationExtSource<'_>> for UserRelationExt {
|
||||
|
|
|
@ -2,10 +2,11 @@ use magnetar_calckey_model::ck;
|
|||
use std::sync::Arc;
|
||||
|
||||
pub mod data;
|
||||
pub mod processing;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PackingContext {
|
||||
instance_info: Arc<ck::meta::Model>,
|
||||
instance_meta: Arc<ck::meta::Model>,
|
||||
self_user: Option<Arc<ck::user::Model>>,
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
use crate::model::processing::PackResult;
|
||||
use crate::model::{PackType, PackingContext};
|
||||
use magnetar_calckey_model::{ck, CalckeyModel};
|
||||
use magnetar_sdk::types::emoji::{EmojiBase, PackEmojiBase};
|
||||
use magnetar_sdk::types::Id;
|
||||
use magnetar_sdk::{Packed, Required};
|
||||
|
||||
pub struct EmojiModel(CalckeyModel);
|
||||
|
||||
impl EmojiModel {
|
||||
pub fn new(model: CalckeyModel) -> Self {
|
||||
EmojiModel(model)
|
||||
}
|
||||
|
||||
pub fn pack_existing(&self, ctx: &PackingContext, emoji: &ck::emoji::Model) -> PackEmojiBase {
|
||||
PackEmojiBase::pack_from((
|
||||
Required(Id::from(&emoji.id)),
|
||||
Required(EmojiBase::extract(ctx, &emoji)),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn fetch_many_emojis(
|
||||
&self,
|
||||
ctx: &PackingContext,
|
||||
shortcodes: &[String],
|
||||
host: Option<&str>,
|
||||
) -> PackResult<Vec<PackEmojiBase>> {
|
||||
let emojis = self.0.fetch_many_emojis(shortcodes, host).await?;
|
||||
let packed_emojis = emojis.iter().map(|e| self.pack_existing(ctx, &e)).collect();
|
||||
|
||||
Ok(packed_emojis)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
use magnetar_calckey_model::sea_orm::DbErr;
|
||||
use magnetar_calckey_model::CalckeyDbError;
|
||||
use magnetar_sdk::mmm::Token;
|
||||
use thiserror::Error;
|
||||
|
||||
pub mod emoji;
|
||||
pub mod user;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum PackError {
|
||||
#[error("Database error: {0}")]
|
||||
DbError(#[from] DbErr),
|
||||
#[error("Calckey database wrapper error: {0}")]
|
||||
CalckeyDbError(#[from] CalckeyDbError),
|
||||
}
|
||||
|
||||
pub type PackResult<T> = Result<T, PackError>;
|
||||
|
||||
fn get_mm_token_emoji(token: &Token) -> Vec<String> {
|
||||
let mut v = Vec::new();
|
||||
token.walk_map_collect(
|
||||
|t| {
|
||||
if let Token::ShortcodeEmoji(e) = t {
|
||||
Some(e.to_owned())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
},
|
||||
&mut v,
|
||||
);
|
||||
v
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
use crate::model::data::user::UserBaseSource;
|
||||
use crate::model::processing::emoji::EmojiModel;
|
||||
use crate::model::processing::{get_mm_token_emoji, PackResult};
|
||||
use crate::model::{PackType, PackingContext};
|
||||
use magnetar_calckey_model::sea_orm::EntityTrait;
|
||||
use magnetar_calckey_model::{ck, CalckeyModel};
|
||||
use magnetar_sdk::types::emoji::EmojiContext;
|
||||
use magnetar_sdk::types::user::{PackUserBase, UserBase};
|
||||
use magnetar_sdk::types::{Id, MmXml};
|
||||
use magnetar_sdk::{mmm, Packed, Required};
|
||||
|
||||
pub struct UserModel(CalckeyModel);
|
||||
|
||||
impl UserModel {
|
||||
pub async fn base_from_existing(
|
||||
&self,
|
||||
ctx: &PackingContext,
|
||||
user: &ck::user::Model,
|
||||
) -> PackResult<PackUserBase> {
|
||||
let avatar = if let Some(avatar_id) = user.avatar_id.as_ref() {
|
||||
ck::drive_file::Entity::find_by_id(avatar_id)
|
||||
.one(self.0.inner())
|
||||
.await?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let username_mm =
|
||||
mmm::Context::default().parse_ui(user.name.as_deref().unwrap_or(&user.username));
|
||||
let emoji_model = EmojiModel::new(self.0.clone());
|
||||
let emojis = emoji_model
|
||||
.fetch_many_emojis(ctx, &get_mm_token_emoji(&username_mm), user.host.as_deref())
|
||||
.await?;
|
||||
let emoji_context = EmojiContext(emojis);
|
||||
|
||||
let base = UserBase::extract(
|
||||
ctx,
|
||||
UserBaseSource {
|
||||
user,
|
||||
username_mm: mmm::to_xml_string(&username_mm).map(MmXml).as_ref().ok(),
|
||||
avatar: &avatar,
|
||||
emoji_context: &emoji_context,
|
||||
},
|
||||
);
|
||||
|
||||
Ok(PackUserBase::pack_from((
|
||||
Required(Id::from(&user.id)),
|
||||
Required(base),
|
||||
)))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
use crate::web::ApiError;
|
||||
use magnetar_calckey_model::{ck, CalckeyDbError, CalckeyModel};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use strum::EnumVariantNames;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Debug, Error, EnumVariantNames)]
|
||||
pub enum InstanceMetaCacheError {
|
||||
#[error("Database error: {0}")]
|
||||
DbError(#[from] CalckeyDbError),
|
||||
#[error("Cache channel closed")]
|
||||
ChannelClosed,
|
||||
}
|
||||
|
||||
impl From<InstanceMetaCacheError> for ApiError {
|
||||
fn from(err: InstanceMetaCacheError) -> Self {
|
||||
let mut api_error: ApiError = match err {
|
||||
InstanceMetaCacheError::DbError(err) => err.into(),
|
||||
InstanceMetaCacheError::ChannelClosed => err.into(),
|
||||
};
|
||||
|
||||
api_error.message = format!("Instance meta cache error: {}", api_error.message);
|
||||
|
||||
api_error
|
||||
}
|
||||
}
|
||||
|
||||
struct Entry {
|
||||
value: Option<Arc<ck::meta::Model>>,
|
||||
last_fetched: Option<Instant>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
enum CacheRequest {
|
||||
Get,
|
||||
Fetch,
|
||||
}
|
||||
|
||||
type Callback = oneshot::Sender<Result<Arc<ck::meta::Model>, InstanceMetaCacheError>>;
|
||||
|
||||
struct InstanceMetaCache {
|
||||
sender: mpsc::UnboundedSender<(CacheRequest, Callback)>,
|
||||
}
|
||||
|
||||
impl InstanceMetaCache {
|
||||
fn new(db: CalckeyModel) -> Self {
|
||||
const REFRESH_INTERVAL_SEC: u64 = 10;
|
||||
let stale_threshold = Duration::from_secs(REFRESH_INTERVAL_SEC);
|
||||
|
||||
let mut state = Entry {
|
||||
value: None,
|
||||
last_fetched: None,
|
||||
};
|
||||
|
||||
let (req_tx, mut req_rx) = mpsc::unbounded_channel::<(CacheRequest, Callback)>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some((req, res_tx)) = req_rx.recv().await {
|
||||
if let Some(val) = &state.value {
|
||||
if state
|
||||
.last_fetched
|
||||
.is_some_and(|i| Instant::now() - i < stale_threshold)
|
||||
&& !matches!(req, CacheRequest::Fetch)
|
||||
{
|
||||
res_tx.send(Ok(val.clone())).ok();
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let res = db.get_instance_meta().await.map(Arc::new);
|
||||
|
||||
if let Ok(ref data) = res {
|
||||
state.value = Some(data.clone());
|
||||
state.last_fetched = Some(Instant::now());
|
||||
}
|
||||
|
||||
res_tx.send(res.map_err(CalckeyDbError::into)).ok();
|
||||
}
|
||||
});
|
||||
|
||||
Self { sender: req_tx }
|
||||
}
|
||||
|
||||
async fn get(&self, req: CacheRequest) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.sender
|
||||
.send((req, tx))
|
||||
.map_err(|_| InstanceMetaCacheError::ChannelClosed)?;
|
||||
rx.await
|
||||
.map_err(|_| InstanceMetaCacheError::ChannelClosed)?
|
||||
}
|
||||
}
|
||||
|
||||
pub struct InstanceMetaCacheService {
|
||||
cache: InstanceMetaCache,
|
||||
}
|
||||
|
||||
impl InstanceMetaCacheService {
|
||||
pub(super) fn new(db: CalckeyModel) -> Self {
|
||||
Self {
|
||||
cache: InstanceMetaCache::new(db),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn fetch(&self) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
|
||||
self.cache.get(CacheRequest::Fetch).await
|
||||
}
|
||||
|
||||
pub async fn get(&self) -> Result<Arc<ck::meta::Model>, InstanceMetaCacheError> {
|
||||
self.cache.get(CacheRequest::Get).await
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ use magnetar_calckey_model::{CalckeyCache, CalckeyModel};
|
|||
use magnetar_common::config::MagnetarConfig;
|
||||
use thiserror::Error;
|
||||
|
||||
pub mod instance_meta_cache;
|
||||
pub mod user_cache;
|
||||
|
||||
pub struct MagnetarService {
|
||||
|
@ -9,6 +10,7 @@ pub struct MagnetarService {
|
|||
pub cache: CalckeyCache,
|
||||
pub config: &'static MagnetarConfig,
|
||||
pub auth_cache: user_cache::UserCacheService,
|
||||
pub instance_meta_cache: instance_meta_cache::InstanceMetaCacheService,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -25,12 +27,14 @@ impl MagnetarService {
|
|||
) -> Result<Self, ServiceInitError> {
|
||||
let auth_cache =
|
||||
user_cache::UserCacheService::new(config, db.clone(), cache.clone()).await?;
|
||||
let instance_meta_cache = instance_meta_cache::InstanceMetaCacheService::new(db.clone());
|
||||
|
||||
Ok(Self {
|
||||
db,
|
||||
cache,
|
||||
config,
|
||||
auth_cache,
|
||||
instance_meta_cache,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue