pub use ck; use ck::*; use chrono::Utc; use ext_calckey_model_migration::{Migrator, MigratorTrait}; use futures_util::StreamExt; use redis::IntoConnectionInfo; use sea_orm::ActiveValue::Set; use sea_orm::{ ColumnTrait, ConnectOptions, DatabaseConnection, DbErr, EntityTrait, QueryFilter, QueryOrder, TransactionTrait, }; use serde::{Deserialize, Serialize}; use std::future::Future; use strum::IntoStaticStr; use thiserror::Error; use tokio::select; use tokio_util::sync::CancellationToken; use tracing::log::LevelFilter; use tracing::{error, info}; #[derive(Debug)] pub struct ConnectorConfig { pub url: String, } #[derive(Clone, Debug)] pub struct CalckeyModel(DatabaseConnection); #[derive(Debug, Error, IntoStaticStr)] pub enum CalckeyDbError { #[error("Database error: {0}")] DbError(#[from] DbErr), } impl CalckeyModel { pub async fn new(config: ConnectorConfig) -> Result { let opt = ConnectOptions::new(config.url) .max_connections(64) .min_connections(8) .sqlx_logging(true) .sqlx_logging_level(LevelFilter::Debug) .to_owned(); Ok(CalckeyModel(sea_orm::Database::connect(opt).await?)) } pub async fn migrate(&self) -> Result<(), CalckeyDbError> { Migrator::up(&self.0, None).await?; Ok(()) } pub fn inner(&self) -> &DatabaseConnection { &self.0 } pub async fn get_user_by_tag( &self, name: &str, instance: Option<&str>, ) -> Result, CalckeyDbError> { let name = name.to_lowercase(); let instance = instance.map(str::to_lowercase); let user = if let Some(instance) = instance { user::Entity::find() .filter(user::Column::UsernameLower.eq(name)) .filter(user::Column::Host.eq(instance)) } else { user::Entity::find().filter( user::Column::UsernameLower .eq(name) .and(user::Column::Host.is_null()), ) } .one(&self.0) .await?; Ok(user) } pub async fn get_note_by_id(&self, id: &str) -> Result, CalckeyDbError> { Ok(note::Entity::find() .filter(note::Column::Id.eq(id)) .one(&self.0) .await?) } pub async fn get_user_by_id(&self, id: &str) -> Result, CalckeyDbError> { Ok(user::Entity::find() .filter(user::Column::Id.eq(id)) .one(&self.0) .await?) } pub async fn get_user_by_token( &self, token: &str, ) -> Result, CalckeyDbError> { Ok(user::Entity::find() .filter( user::Column::Token .eq(token) .and(user::Column::Host.is_null()), ) .one(&self.0) .await?) } pub async fn get_user_by_uri(&self, uri: &str) -> Result, CalckeyDbError> { Ok(user::Entity::find() .filter(user::Column::Uri.eq(uri)) .one(&self.0) .await?) } pub async fn get_local_emoji(&self) -> Result, CalckeyDbError> { Ok(emoji::Entity::find() .filter(emoji::Column::Host.is_null()) .order_by_asc(emoji::Column::Category) .order_by_asc(emoji::Column::Name) .all(&self.0) .await?) } pub async fn get_access_token( &self, token: &str, ) -> Result, CalckeyDbError> { let token = access_token::Entity::update(access_token::ActiveModel { last_used_at: Set(Some(Utc::now().into())), ..Default::default() }) .filter( access_token::Column::Token .eq(token) .or(access_token::Column::Hash.eq(token.to_lowercase())), ) .exec(&self.0) .await; if let Err(DbErr::RecordNotUpdated) = token { return Ok(None); } Ok(Some(token?)) } pub async fn get_app_by_id(&self, id: &str) -> Result, CalckeyDbError> { Ok(app::Entity::find() .filter(app::Column::Id.eq(id)) .one(&self.0) .await?) } pub async fn get_instance_meta(&self) -> Result { let txn = self.0.begin().await?; let meta = meta::Entity::find().one(&txn).await?; if let Some(meta) = meta { txn.commit().await?; return Ok(meta); } let model = meta::ActiveModel { id: Set("x".to_string()), ..Default::default() }; let meta = meta::Entity::insert(model) .exec_with_returning(&txn) .await?; txn.commit().await?; Ok(meta) } } #[derive(Debug)] pub struct CacheConnectorConfig { pub url: String, } #[derive(Clone, Debug)] pub struct CalckeyCache(redis::Client); #[derive(Debug, Error, IntoStaticStr)] pub enum CalckeyCacheError { #[error("Redis error: {0}")] RedisError(#[from] redis::RedisError), } impl CalckeyCache { pub fn new(config: CacheConnectorConfig) -> Result { let conn_info = config.url.into_connection_info()?; // TODO: Allow overriding redis config with individual options (maybe) let redis_config = redis::ConnectionInfo { addr: conn_info.addr, redis: conn_info.redis, }; Ok(CalckeyCache(redis::Client::open(redis_config)?)) } pub fn inner(&self) -> &redis::Client { &self.0 } pub async fn conn(&self) -> Result { Ok(CalckeyCacheClient(self.0.get_async_connection().await?)) } } pub struct CalckeyCacheClient(redis::aio::Connection); #[derive(Clone, Debug, Deserialize)] #[serde(tag = "channel", content = "message")] pub enum SubMessage { Internal(InternalStreamMessage), #[serde(other)] Other, } #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "type", content = "body")] #[serde(rename_all = "camelCase")] pub enum InternalStreamMessage { #[serde(rename_all = "camelCase")] UserChangeSuspendedState { id: String, is_suspended: bool, }, #[serde(rename_all = "camelCase")] UserChangeSilencedState { id: String, is_silenced: bool, }, #[serde(rename_all = "camelCase")] UserChangeModeratorState { id: String, is_moderator: bool, }, #[serde(rename_all = "camelCase")] UserTokenRegenerated { id: String, old_token: String, new_token: String, }, LocalUserUpdated { id: String, }, RemoteUserUpdated { id: String, }, WebhookCreated(webhook::Model), WebhookDeleted(webhook::Model), WebhookUpdated(webhook::Model), AntennaCreated(antenna::Model), AntennaDeleted(antenna::Model), AntennaUpdated(antenna::Model), } impl CalckeyCacheClient { pub async fn subscribe + Send + 'static>( self, prefix: &str, handler: impl Fn(SubMessage) -> F + Send + Sync + 'static, ) -> Result { CalckeySub::new(self.0, prefix, handler).await } } pub struct CalckeySub(CancellationToken); impl CalckeySub { async fn new + Send + 'static>( conn: redis::aio::Connection, prefix: &str, handler: impl Fn(SubMessage) -> F + Send + Sync + 'static, ) -> Result { let mut pub_sub = conn.into_pubsub(); pub_sub.subscribe(prefix).await?; let token = CancellationToken::new(); let token_rx = token.clone(); let prefix = prefix.to_string(); tokio::spawn(async move { let mut on_message = pub_sub.on_message(); while let Some(msg) = select! { msg = on_message.next() => msg, _ = token_rx.cancelled() => { drop(on_message); if let Err(e) = pub_sub.unsubscribe(prefix).await { info!("Redis error: {:?}", e); } return; } } { let data = &match msg.get_payload::() { Ok(val) => val, Err(e) => { info!("Redis error: {:?}", e); continue; } }; let parsed = match serde_json::from_str::(data) { Ok(val) => val, Err(e) => { info!("Message parse error: {:?}", e); continue; } }; println!("Got message: {:#?}", parsed); handler(parsed).await; } }); Ok(CalckeySub(token)) } } impl Drop for CalckeySub { fn drop(&mut self) { self.0.cancel(); } }