2024-04-30 14:02:35 +00:00
|
|
|
use std::future::Future;
|
2023-09-22 17:57:53 +00:00
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
use chrono::Utc;
|
2024-04-30 14:02:35 +00:00
|
|
|
use futures_util::{SinkExt, StreamExt};
|
2023-08-02 01:10:53 +00:00
|
|
|
use redis::IntoConnectionInfo;
|
2024-04-30 14:02:35 +00:00
|
|
|
pub use sea_orm;
|
2024-04-15 02:45:44 +00:00
|
|
|
use sea_orm::{ActiveValue::Set, ConnectionTrait};
|
2023-08-02 01:10:53 +00:00
|
|
|
use sea_orm::{
|
2024-01-08 19:15:51 +00:00
|
|
|
ColumnTrait, ConnectOptions, DatabaseConnection, DbErr, EntityTrait, QueryFilter,
|
|
|
|
TransactionTrait,
|
2023-08-02 01:10:53 +00:00
|
|
|
};
|
2024-01-16 23:41:32 +00:00
|
|
|
use serde::{Deserialize, Deserializer, Serialize};
|
2024-04-30 14:02:35 +00:00
|
|
|
use serde::de::Error;
|
2024-01-16 23:41:32 +00:00
|
|
|
use serde_json::Value;
|
2023-08-02 01:10:53 +00:00
|
|
|
use strum::IntoStaticStr;
|
2023-07-07 19:22:30 +00:00
|
|
|
use thiserror::Error;
|
2023-08-02 01:10:53 +00:00
|
|
|
use tokio::select;
|
|
|
|
use tokio_util::sync::CancellationToken;
|
2024-01-16 23:41:32 +00:00
|
|
|
use tracing::{error, info, trace, warn};
|
2024-04-30 14:02:35 +00:00
|
|
|
use tracing::log::LevelFilter;
|
|
|
|
use url::Host;
|
|
|
|
|
|
|
|
pub use ck;
|
|
|
|
use ck::*;
|
|
|
|
use ext_model_migration::{Migrator, MigratorTrait};
|
|
|
|
use user_model::UserResolver;
|
|
|
|
|
|
|
|
use crate::model_ext::IdShape;
|
|
|
|
use crate::note_model::NoteResolver;
|
|
|
|
use crate::notification_model::NotificationResolver;
|
|
|
|
|
|
|
|
pub mod emoji;
|
|
|
|
pub mod model_ext;
|
|
|
|
pub mod note_model;
|
|
|
|
pub mod notification_model;
|
|
|
|
pub mod poll;
|
|
|
|
pub mod user_model;
|
2023-04-21 23:39:52 +00:00
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct ConnectorConfig {
|
|
|
|
pub url: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct CalckeyModel(DatabaseConnection);
|
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
#[derive(Debug, Error, IntoStaticStr)]
|
2023-07-07 19:22:30 +00:00
|
|
|
pub enum CalckeyDbError {
|
|
|
|
#[error("Database error: {0}")]
|
2023-08-02 01:10:53 +00:00
|
|
|
DbError(#[from] DbErr),
|
2023-07-07 19:22:30 +00:00
|
|
|
}
|
|
|
|
|
2023-04-21 23:39:52 +00:00
|
|
|
impl CalckeyModel {
|
2023-07-07 19:22:30 +00:00
|
|
|
pub async fn new(config: ConnectorConfig) -> Result<Self, CalckeyDbError> {
|
2023-04-21 23:39:52 +00:00
|
|
|
let opt = ConnectOptions::new(config.url)
|
|
|
|
.max_connections(64)
|
|
|
|
.min_connections(8)
|
|
|
|
.sqlx_logging(true)
|
|
|
|
.sqlx_logging_level(LevelFilter::Debug)
|
|
|
|
.to_owned();
|
|
|
|
|
2023-11-05 14:23:48 +00:00
|
|
|
info!("Attempting database connection...");
|
2023-04-21 23:39:52 +00:00
|
|
|
Ok(CalckeyModel(sea_orm::Database::connect(opt).await?))
|
|
|
|
}
|
|
|
|
|
2023-07-29 03:20:00 +00:00
|
|
|
pub async fn migrate(&self) -> Result<(), CalckeyDbError> {
|
|
|
|
Migrator::up(&self.0, None).await?;
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
pub fn inner(&self) -> &DatabaseConnection {
|
|
|
|
&self.0
|
|
|
|
}
|
|
|
|
|
2023-04-21 23:39:52 +00:00
|
|
|
pub async fn get_user_by_tag(
|
|
|
|
&self,
|
|
|
|
name: &str,
|
2024-03-31 23:33:58 +00:00
|
|
|
instance: Option<&Host>,
|
2023-07-07 19:22:30 +00:00
|
|
|
) -> Result<Option<user::Model>, CalckeyDbError> {
|
2023-04-21 23:39:52 +00:00
|
|
|
let name = name.to_lowercase();
|
2024-03-31 23:33:58 +00:00
|
|
|
let instance = instance.map(Host::to_string);
|
2023-04-21 23:39:52 +00:00
|
|
|
|
|
|
|
let user = if let Some(instance) = instance {
|
|
|
|
user::Entity::find()
|
|
|
|
.filter(user::Column::UsernameLower.eq(name))
|
|
|
|
.filter(user::Column::Host.eq(instance))
|
|
|
|
} else {
|
2023-04-22 00:37:18 +00:00
|
|
|
user::Entity::find().filter(
|
|
|
|
user::Column::UsernameLower
|
|
|
|
.eq(name)
|
|
|
|
.and(user::Column::Host.is_null()),
|
|
|
|
)
|
2023-04-21 23:39:52 +00:00
|
|
|
}
|
2024-04-30 14:02:35 +00:00
|
|
|
.one(&self.0)
|
|
|
|
.await?;
|
2023-04-21 23:39:52 +00:00
|
|
|
|
|
|
|
Ok(user)
|
|
|
|
}
|
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
pub async fn get_note_by_id(&self, id: &str) -> Result<Option<note::Model>, CalckeyDbError> {
|
|
|
|
Ok(note::Entity::find()
|
|
|
|
.filter(note::Column::Id.eq(id))
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_user_by_id(&self, id: &str) -> Result<Option<user::Model>, CalckeyDbError> {
|
|
|
|
Ok(user::Entity::find()
|
|
|
|
.filter(user::Column::Id.eq(id))
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-11-09 20:35:55 +00:00
|
|
|
pub async fn get_user_profile_by_id(
|
|
|
|
&self,
|
|
|
|
id: &str,
|
|
|
|
) -> Result<Option<user_profile::Model>, CalckeyDbError> {
|
|
|
|
Ok(user_profile::Entity::find()
|
|
|
|
.filter(user_profile::Column::UserId.eq(id))
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2024-04-30 14:02:35 +00:00
|
|
|
pub async fn get_user_and_profile_by_id(&self, id: &str) -> Result<Option<(user::Model, user_profile::Model)>, CalckeyDbError> {
|
|
|
|
Ok(user::Entity::find()
|
|
|
|
.filter(user::Column::Id.eq(id))
|
|
|
|
.find_also_related(user_profile::Entity)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?
|
|
|
|
.and_then(|(u, p)| p.map(|pp| (u, pp))))
|
|
|
|
}
|
|
|
|
|
2023-11-09 20:35:55 +00:00
|
|
|
pub async fn get_user_security_keys_by_id(
|
|
|
|
&self,
|
|
|
|
id: &str,
|
|
|
|
) -> Result<Vec<user_security_key::Model>, CalckeyDbError> {
|
|
|
|
Ok(user_security_key::Entity::find()
|
|
|
|
.filter(user_security_key::Column::UserId.eq(id))
|
|
|
|
.all(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-11-07 20:15:35 +00:00
|
|
|
pub async fn get_many_users_by_id(
|
|
|
|
&self,
|
|
|
|
id: &[String],
|
|
|
|
) -> Result<Vec<user::Model>, CalckeyDbError> {
|
|
|
|
Ok(user::Entity::find()
|
|
|
|
.filter(user::Column::Id.is_in(id))
|
|
|
|
.all(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
pub async fn get_user_by_token(
|
|
|
|
&self,
|
|
|
|
token: &str,
|
|
|
|
) -> Result<Option<user::Model>, CalckeyDbError> {
|
|
|
|
Ok(user::Entity::find()
|
|
|
|
.filter(
|
|
|
|
user::Column::Token
|
|
|
|
.eq(token)
|
|
|
|
.and(user::Column::Host.is_null()),
|
|
|
|
)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2024-04-30 14:02:35 +00:00
|
|
|
pub async fn get_user_and_profile_by_token(
|
|
|
|
&self,
|
|
|
|
token: &str,
|
|
|
|
) -> Result<Option<(user::Model, user_profile::Model)>, CalckeyDbError> {
|
|
|
|
Ok(user::Entity::find()
|
|
|
|
.filter(
|
|
|
|
user::Column::Token
|
|
|
|
.eq(token)
|
|
|
|
.and(user::Column::Host.is_null()),
|
|
|
|
)
|
|
|
|
.find_also_related(user_profile::Entity)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?
|
|
|
|
.and_then(|(u, p)| p.map(|pp| (u, pp))))
|
|
|
|
}
|
|
|
|
|
2023-07-07 19:22:30 +00:00
|
|
|
pub async fn get_user_by_uri(&self, uri: &str) -> Result<Option<user::Model>, CalckeyDbError> {
|
2023-04-21 23:39:52 +00:00
|
|
|
Ok(user::Entity::find()
|
|
|
|
.filter(user::Column::Uri.eq(uri))
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
2023-08-02 01:10:53 +00:00
|
|
|
|
2023-10-28 23:27:32 +00:00
|
|
|
pub async fn get_follower_status(
|
|
|
|
&self,
|
|
|
|
from: &str,
|
|
|
|
to: &str,
|
|
|
|
) -> Result<Option<following::Model>, CalckeyDbError> {
|
|
|
|
Ok(following::Entity::find()
|
|
|
|
.filter(
|
|
|
|
following::Column::FollowerId
|
|
|
|
.eq(from)
|
|
|
|
.and(following::Column::FolloweeId.eq(to)),
|
|
|
|
)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-11-09 20:35:55 +00:00
|
|
|
pub async fn get_follow_request_status(
|
|
|
|
&self,
|
|
|
|
from: &str,
|
|
|
|
to: &str,
|
|
|
|
) -> Result<Option<follow_request::Model>, CalckeyDbError> {
|
|
|
|
Ok(follow_request::Entity::find()
|
|
|
|
.filter(
|
|
|
|
follow_request::Column::FollowerId
|
|
|
|
.eq(from)
|
|
|
|
.and(follow_request::Column::FolloweeId.eq(to)),
|
|
|
|
)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-10-28 23:27:32 +00:00
|
|
|
pub async fn get_block_status(
|
|
|
|
&self,
|
|
|
|
from: &str,
|
|
|
|
to: &str,
|
|
|
|
) -> Result<Option<blocking::Model>, CalckeyDbError> {
|
|
|
|
Ok(blocking::Entity::find()
|
|
|
|
.filter(
|
|
|
|
blocking::Column::BlockerId
|
|
|
|
.eq(from)
|
|
|
|
.and(blocking::Column::BlockeeId.eq(to)),
|
|
|
|
)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_mute_status(
|
|
|
|
&self,
|
|
|
|
from: &str,
|
|
|
|
to: &str,
|
|
|
|
) -> Result<Option<muting::Model>, CalckeyDbError> {
|
|
|
|
Ok(muting::Entity::find()
|
|
|
|
.filter(
|
|
|
|
muting::Column::MuterId
|
|
|
|
.eq(from)
|
|
|
|
.and(muting::Column::MuteeId.eq(to)),
|
|
|
|
)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_renote_mute_status(
|
|
|
|
&self,
|
|
|
|
from: &str,
|
|
|
|
to: &str,
|
|
|
|
) -> Result<Option<renote_muting::Model>, CalckeyDbError> {
|
|
|
|
Ok(renote_muting::Entity::find()
|
|
|
|
.filter(
|
|
|
|
renote_muting::Column::MuterId
|
|
|
|
.eq(from)
|
|
|
|
.and(renote_muting::Column::MuteeId.eq(to)),
|
|
|
|
)
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
pub async fn get_access_token(
|
|
|
|
&self,
|
|
|
|
token: &str,
|
|
|
|
) -> Result<Option<access_token::Model>, CalckeyDbError> {
|
|
|
|
let token = access_token::Entity::update(access_token::ActiveModel {
|
|
|
|
last_used_at: Set(Some(Utc::now().into())),
|
|
|
|
..Default::default()
|
|
|
|
})
|
2024-04-30 14:02:35 +00:00
|
|
|
.filter(
|
|
|
|
access_token::Column::Token
|
|
|
|
.eq(token)
|
|
|
|
.or(access_token::Column::Hash.eq(token.to_lowercase())),
|
|
|
|
)
|
|
|
|
.exec(&self.0)
|
|
|
|
.await;
|
2023-08-02 01:10:53 +00:00
|
|
|
|
|
|
|
if let Err(DbErr::RecordNotUpdated) = token {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Some(token?))
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_app_by_id(&self, id: &str) -> Result<Option<app::Model>, CalckeyDbError> {
|
|
|
|
Ok(app::Entity::find()
|
|
|
|
.filter(app::Column::Id.eq(id))
|
|
|
|
.one(&self.0)
|
|
|
|
.await?)
|
|
|
|
}
|
|
|
|
|
2023-11-05 14:23:48 +00:00
|
|
|
pub async fn get_instance(
|
|
|
|
&self,
|
|
|
|
host: &str,
|
|
|
|
) -> Result<Option<instance::Model>, CalckeyDbError> {
|
|
|
|
let instance = instance::Entity::find()
|
|
|
|
.filter(instance::Column::Host.eq(host))
|
|
|
|
.one(&self.0)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
Ok(instance)
|
|
|
|
}
|
|
|
|
|
2023-08-02 01:10:53 +00:00
|
|
|
pub async fn get_instance_meta(&self) -> Result<meta::Model, CalckeyDbError> {
|
|
|
|
let txn = self.0.begin().await?;
|
|
|
|
|
|
|
|
let meta = meta::Entity::find().one(&txn).await?;
|
|
|
|
|
|
|
|
if let Some(meta) = meta {
|
|
|
|
txn.commit().await?;
|
|
|
|
return Ok(meta);
|
|
|
|
}
|
|
|
|
|
|
|
|
let model = meta::ActiveModel {
|
|
|
|
id: Set("x".to_string()),
|
|
|
|
..Default::default()
|
|
|
|
};
|
|
|
|
|
|
|
|
let meta = meta::Entity::insert(model)
|
|
|
|
.exec_with_returning(&txn)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
txn.commit().await?;
|
|
|
|
|
|
|
|
Ok(meta)
|
|
|
|
}
|
2023-10-29 01:10:48 +00:00
|
|
|
|
2024-01-15 00:46:08 +00:00
|
|
|
pub fn get_notification_resolver(&self) -> NotificationResolver {
|
|
|
|
NotificationResolver::new(
|
|
|
|
self.clone(),
|
|
|
|
self.get_user_resolver(),
|
|
|
|
self.get_note_resolver(),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2023-10-29 01:10:48 +00:00
|
|
|
pub fn get_note_resolver(&self) -> NoteResolver {
|
2024-01-07 22:28:53 +00:00
|
|
|
NoteResolver::new(self.clone(), self.get_user_resolver())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get_user_resolver(&self) -> UserResolver {
|
|
|
|
UserResolver::new(self.clone())
|
2023-10-29 01:10:48 +00:00
|
|
|
}
|
2023-08-02 01:10:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct CacheConnectorConfig {
|
|
|
|
pub url: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct CalckeyCache(redis::Client);
|
|
|
|
|
|
|
|
#[derive(Debug, Error, IntoStaticStr)]
|
|
|
|
pub enum CalckeyCacheError {
|
|
|
|
#[error("Redis error: {0}")]
|
|
|
|
RedisError(#[from] redis::RedisError),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl CalckeyCache {
|
|
|
|
pub fn new(config: CacheConnectorConfig) -> Result<Self, CalckeyCacheError> {
|
|
|
|
let conn_info = config.url.into_connection_info()?;
|
|
|
|
|
|
|
|
// TODO: Allow overriding redis config with individual options (maybe)
|
|
|
|
let redis_config = redis::ConnectionInfo {
|
|
|
|
addr: conn_info.addr,
|
|
|
|
redis: conn_info.redis,
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(CalckeyCache(redis::Client::open(redis_config)?))
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn inner(&self) -> &redis::Client {
|
|
|
|
&self.0
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn conn(&self) -> Result<CalckeyCacheClient, CalckeyCacheError> {
|
|
|
|
Ok(CalckeyCacheClient(self.0.get_async_connection().await?))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct CalckeyCacheClient(redis::aio::Connection);
|
|
|
|
|
2024-01-16 23:41:32 +00:00
|
|
|
#[derive(Clone, Debug)]
|
2023-08-02 01:10:53 +00:00
|
|
|
pub enum SubMessage {
|
|
|
|
Internal(InternalStreamMessage),
|
2024-01-16 23:41:32 +00:00
|
|
|
MainStream(String, MainStreamMessage),
|
|
|
|
Other(String, Value),
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct RawMessage<'a> {
|
|
|
|
channel: &'a str,
|
|
|
|
message: Value,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<'de> Deserialize<'de> for SubMessage {
|
|
|
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
2024-04-30 14:02:35 +00:00
|
|
|
where
|
|
|
|
D: Deserializer<'de>,
|
2024-01-16 23:41:32 +00:00
|
|
|
{
|
|
|
|
let raw = RawMessage::deserialize(deserializer)?;
|
|
|
|
|
|
|
|
Ok(match raw.channel {
|
|
|
|
"internal" => SubMessage::Internal(
|
|
|
|
InternalStreamMessage::deserialize(raw.message).map_err(Error::custom)?,
|
|
|
|
),
|
|
|
|
c if c.starts_with("mainStream") => SubMessage::MainStream(
|
|
|
|
c.strip_prefix("mainStream:")
|
|
|
|
.ok_or_else(|| Error::custom("Invalid mainStream prefix"))?
|
|
|
|
.to_string(),
|
|
|
|
MainStreamMessage::deserialize(raw.message).map_err(Error::custom)?,
|
|
|
|
),
|
|
|
|
_ => SubMessage::Other(raw.channel.to_string(), raw.message),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
|
|
#[serde(tag = "type", content = "body")]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
pub enum MainStreamMessage {
|
|
|
|
Notification(IdShape),
|
2023-08-02 01:10:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
|
|
#[serde(tag = "type", content = "body")]
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
pub enum InternalStreamMessage {
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
UserChangeSuspendedState {
|
|
|
|
id: String,
|
|
|
|
is_suspended: bool,
|
|
|
|
},
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
UserChangeSilencedState {
|
|
|
|
id: String,
|
|
|
|
is_silenced: bool,
|
|
|
|
},
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
UserChangeModeratorState {
|
|
|
|
id: String,
|
|
|
|
is_moderator: bool,
|
|
|
|
},
|
|
|
|
#[serde(rename_all = "camelCase")]
|
|
|
|
UserTokenRegenerated {
|
|
|
|
id: String,
|
|
|
|
old_token: String,
|
|
|
|
new_token: String,
|
|
|
|
},
|
|
|
|
LocalUserUpdated {
|
|
|
|
id: String,
|
|
|
|
},
|
|
|
|
RemoteUserUpdated {
|
|
|
|
id: String,
|
|
|
|
},
|
|
|
|
WebhookCreated(webhook::Model),
|
|
|
|
WebhookDeleted(webhook::Model),
|
|
|
|
WebhookUpdated(webhook::Model),
|
|
|
|
AntennaCreated(antenna::Model),
|
|
|
|
AntennaDeleted(antenna::Model),
|
|
|
|
AntennaUpdated(antenna::Model),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl CalckeyCacheClient {
|
2024-04-30 14:02:35 +00:00
|
|
|
pub async fn subscribe<F: Future<Output=()> + Send + 'static>(
|
2023-08-02 01:10:53 +00:00
|
|
|
self,
|
|
|
|
prefix: &str,
|
|
|
|
handler: impl Fn(SubMessage) -> F + Send + Sync + 'static,
|
|
|
|
) -> Result<CalckeySub, CalckeyCacheError> {
|
|
|
|
CalckeySub::new(self.0, prefix, handler).await
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct CalckeySub(CancellationToken);
|
|
|
|
|
|
|
|
impl CalckeySub {
|
2024-04-30 14:02:35 +00:00
|
|
|
async fn new<F: Future<Output=()> + Send + 'static>(
|
2023-08-02 01:10:53 +00:00
|
|
|
conn: redis::aio::Connection,
|
|
|
|
prefix: &str,
|
|
|
|
handler: impl Fn(SubMessage) -> F + Send + Sync + 'static,
|
|
|
|
) -> Result<Self, CalckeyCacheError> {
|
|
|
|
let mut pub_sub = conn.into_pubsub();
|
|
|
|
pub_sub.subscribe(prefix).await?;
|
|
|
|
|
|
|
|
let token = CancellationToken::new();
|
|
|
|
let token_rx = token.clone();
|
|
|
|
let prefix = prefix.to_string();
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
2024-01-16 23:41:32 +00:00
|
|
|
trace!("Redis subscriber spawned");
|
2023-08-02 01:10:53 +00:00
|
|
|
let mut on_message = pub_sub.on_message();
|
|
|
|
|
|
|
|
while let Some(msg) = select! {
|
|
|
|
msg = on_message.next() => msg,
|
|
|
|
_ = token_rx.cancelled() => {
|
|
|
|
drop(on_message);
|
|
|
|
if let Err(e) = pub_sub.unsubscribe(prefix).await {
|
2024-01-16 23:41:32 +00:00
|
|
|
warn!("Redis error: {:?}", e);
|
2023-08-02 01:10:53 +00:00
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
} {
|
|
|
|
let data = &match msg.get_payload::<String>() {
|
|
|
|
Ok(val) => val,
|
|
|
|
Err(e) => {
|
2024-01-16 23:41:32 +00:00
|
|
|
warn!("Redis error: {:?}", e);
|
2023-08-02 01:10:53 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let parsed = match serde_json::from_str::<SubMessage>(data) {
|
|
|
|
Ok(val) => val,
|
|
|
|
Err(e) => {
|
2024-01-16 23:41:32 +00:00
|
|
|
warn!("Message parse error: {:?}", e);
|
2023-08-02 01:10:53 +00:00
|
|
|
continue;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2024-01-09 21:29:06 +00:00
|
|
|
trace!("Got message: {:#?}", parsed);
|
2023-08-02 01:10:53 +00:00
|
|
|
|
|
|
|
handler(parsed).await;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(CalckeySub(token))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for CalckeySub {
|
|
|
|
fn drop(&mut self) {
|
2024-01-16 23:41:32 +00:00
|
|
|
trace!("Redis subscriber dropped");
|
2023-08-02 01:10:53 +00:00
|
|
|
self.0.cancel();
|
|
|
|
}
|
2023-04-21 23:39:52 +00:00
|
|
|
}
|