Compare commits

...

5 Commits

Author SHA1 Message Date
Natty f441de806f
Frontend: Basic notification receiving via SSE
ci/woodpecker/push/ociImagePush Pipeline is running Details
2024-01-18 03:26:26 +01:00
Natty 7b02f84271
Basic backend SSE notification implemention 2024-01-17 00:41:32 +01:00
Natty ad3528055f
Basic notification fetching via Magnetar 2024-01-16 18:10:56 +01:00
Natty 98fb2ef0d8
Drop map values that are undefined from API requests 2024-01-16 18:09:28 +01:00
Natty 94cff7c2c8
Skip non-cacheable scenarios 2024-01-16 13:04:53 +01:00
59 changed files with 1388 additions and 324 deletions

2
Cargo.lock generated
View File

@ -1472,6 +1472,7 @@ dependencies = [
name = "magnetar"
version = "0.3.0-alpha"
dependencies = [
"async-stream",
"axum",
"axum-extra",
"cached",
@ -1502,6 +1503,7 @@ dependencies = [
"strum",
"thiserror",
"tokio",
"tokio-stream",
"toml",
"tower",
"tower-http",

View File

@ -24,6 +24,7 @@ edition = "2021"
[workspace.dependencies]
async-trait = "0.1"
async-stream = "0.3"
axum = "0.7"
axum-extra = "0.9"
cached = "0.47"
@ -60,6 +61,7 @@ tera = { version = "1", default-features = false }
thiserror = "1"
tokio = "1.24"
tokio-util = "0.7"
tokio-stream = "0.1"
toml = "0.8"
tower = "0.4"
tower-http = "0.5"
@ -86,9 +88,11 @@ dotenvy = { workspace = true }
axum = { workspace = true, features = ["macros"] }
axum-extra = { workspace = true, features = ["typed-header"]}
async-stream = { workspace = true }
headers = { workspace = true }
hyper = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["cors", "trace", "fs"] }
url = { workspace = true }

View File

@ -98,14 +98,10 @@ pub enum NotificationTypeEnum {
Follow,
#[sea_orm(string_value = "followRequestAccepted")]
FollowRequestAccepted,
#[sea_orm(string_value = "groupInvited")]
GroupInvited,
#[sea_orm(string_value = "mention")]
Mention,
#[sea_orm(string_value = "pollEnded")]
PollEnded,
#[sea_orm(string_value = "pollVote")]
PollVote,
#[sea_orm(string_value = "quote")]
Quote,
#[sea_orm(string_value = "reaction")]

View File

@ -9,6 +9,7 @@ mod m20240107_220523_generated_is_quote;
mod m20240107_224446_generated_is_renote;
mod m20240112_215106_remove_pages;
mod m20240112_234759_remove_gallery;
mod m20240115_212109_remove_poll_vote_notification;
pub struct Migrator;
@ -25,6 +26,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240107_224446_generated_is_renote::Migration),
Box::new(m20240112_215106_remove_pages::Migration),
Box::new(m20240112_234759_remove_gallery::Migration),
Box::new(m20240115_212109_remove_poll_vote_notification::Migration),
]
}
}

View File

@ -0,0 +1,45 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::sea_orm::TransactionTrait;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
db.execute_unprepared(
r#"
DELETE FROM "notification" WHERE "type" = 'pollVote' OR "type" = 'groupInvited';
ALTER TYPE "notification_type_enum" RENAME TO "notification_type_enum_old";
CREATE TYPE "notification_type_enum" AS ENUM('follow', 'mention', 'reply', 'renote', 'quote', 'reaction', 'pollEnded', 'receiveFollowRequest', 'followRequestAccepted', 'app');
ALTER TABLE "notification" ALTER COLUMN "type" TYPE "notification_type_enum" USING "type"::text::notification_type_enum;
DROP TYPE "notification_type_enum_old";
"#,
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
let db = manager.get_connection();
let txn = db.begin().await?;
db.execute_unprepared(
r#"
ALTER TYPE "notification_type_enum" RENAME TO "notification_type_enum_old";
CREATE TYPE "notification_type_enum" AS ENUM('follow', 'mention', 'reply', 'renote', 'quote', 'reaction', 'pollVote', 'pollEnded', 'receiveFollowRequest', 'followRequestAccepted', 'groupInvited', 'app');
ALTER TABLE "notification" ALTER COLUMN "type" TYPE "notification_type_enum" USING "type"::text::notification_type_enum;
DROP TYPE "notification_type_enum_old";
"#,
)
.await?;
txn.commit().await?;
Ok(())
}
}

View File

@ -10,6 +10,7 @@ use ck::*;
pub use sea_orm;
use user_model::UserResolver;
use crate::model_ext::IdShape;
use crate::note_model::NoteResolver;
use crate::notification_model::NotificationResolver;
use chrono::Utc;
@ -21,14 +22,16 @@ use sea_orm::{
ColumnTrait, ConnectOptions, DatabaseConnection, DbErr, EntityTrait, QueryFilter,
TransactionTrait,
};
use serde::{Deserialize, Serialize};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use std::future::Future;
use strum::IntoStaticStr;
use thiserror::Error;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::log::LevelFilter;
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};
#[derive(Debug)]
pub struct ConnectorConfig {
@ -353,12 +356,46 @@ impl CalckeyCache {
pub struct CalckeyCacheClient(redis::aio::Connection);
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "channel", content = "message")]
#[derive(Clone, Debug)]
pub enum SubMessage {
Internal(InternalStreamMessage),
#[serde(other)]
Other,
MainStream(String, MainStreamMessage),
Other(String, Value),
}
#[derive(Deserialize)]
struct RawMessage<'a> {
channel: &'a str,
message: Value,
}
impl<'de> Deserialize<'de> for SubMessage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let raw = RawMessage::deserialize(deserializer)?;
Ok(match raw.channel {
"internal" => SubMessage::Internal(
InternalStreamMessage::deserialize(raw.message).map_err(Error::custom)?,
),
c if c.starts_with("mainStream") => SubMessage::MainStream(
c.strip_prefix("mainStream:")
.ok_or_else(|| Error::custom("Invalid mainStream prefix"))?
.to_string(),
MainStreamMessage::deserialize(raw.message).map_err(Error::custom)?,
),
_ => SubMessage::Other(raw.channel.to_string(), raw.message),
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "body")]
#[serde(rename_all = "camelCase")]
pub enum MainStreamMessage {
Notification(IdShape),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -426,6 +463,7 @@ impl CalckeySub {
let prefix = prefix.to_string();
tokio::spawn(async move {
trace!("Redis subscriber spawned");
let mut on_message = pub_sub.on_message();
while let Some(msg) = select! {
@ -433,7 +471,7 @@ impl CalckeySub {
_ = token_rx.cancelled() => {
drop(on_message);
if let Err(e) = pub_sub.unsubscribe(prefix).await {
info!("Redis error: {:?}", e);
warn!("Redis error: {:?}", e);
}
return;
}
@ -441,7 +479,7 @@ impl CalckeySub {
let data = &match msg.get_payload::<String>() {
Ok(val) => val,
Err(e) => {
info!("Redis error: {:?}", e);
warn!("Redis error: {:?}", e);
continue;
}
};
@ -449,7 +487,7 @@ impl CalckeySub {
let parsed = match serde_json::from_str::<SubMessage>(data) {
Ok(val) => val,
Err(e) => {
info!("Message parse error: {:?}", e);
warn!("Message parse error: {:?}", e);
continue;
}
};
@ -466,6 +504,7 @@ impl CalckeySub {
impl Drop for CalckeySub {
fn drop(&mut self) {
trace!("Redis subscriber dropped");
self.0.cancel();
}
}

View File

@ -5,9 +5,10 @@ use ext_calckey_model_migration::{
use magnetar_sdk::types::SpanFilter;
use sea_orm::{
ColumnTrait, Condition, ConnectionTrait, Cursor, DbErr, DynIden, EntityTrait, FromQueryResult,
Iden, IntoIdentity, Iterable, JoinType, RelationDef, RelationTrait, Select, SelectModel,
SelectorTrait,
Iden, IntoIdentity, Iterable, JoinType, QueryTrait, RelationDef, RelationTrait, Select,
SelectModel, SelectorTrait,
};
use serde::{Deserialize, Serialize};
use std::fmt::Write;
#[derive(Clone)]
@ -224,6 +225,7 @@ pub trait CursorPaginationExt<E> {
fn cursor_by_columns_and_span<C>(
self,
cursor_prefix_alias: Option<MagIden>,
order_columns: C,
pagination: &SpanFilter,
limit: Option<u64>,
@ -234,6 +236,7 @@ pub trait CursorPaginationExt<E> {
async fn get_paginated_model<M, C, T>(
self,
db: &T,
cursor_prefix_alias: Option<MagIden>,
columns: C,
curr: &SpanFilter,
prev: &mut Option<SpanFilter>,
@ -255,6 +258,7 @@ where
fn cursor_by_columns_and_span<C>(
self,
cursor_prefix_alias: Option<MagIden>,
order_columns: C,
pagination: &SpanFilter,
limit: Option<u64>,
@ -262,7 +266,11 @@ where
where
C: IntoIdentity,
{
let mut cursor = self.cursor_by(order_columns);
let mut cursor = Cursor::new(
self.into_query(),
cursor_prefix_alias.map_or_else(|| E::default().into_iden(), MagIden::into_iden),
order_columns,
);
if let Some(start) = pagination.start() {
cursor.after(start);
@ -286,6 +294,7 @@ where
async fn get_paginated_model<Q, C, T>(
self,
db: &T,
cursor_prefix_alias: Option<MagIden>,
columns: C,
curr: &SpanFilter,
prev: &mut Option<SpanFilter>,
@ -298,7 +307,7 @@ where
T: ConnectionTrait,
{
let mut result = self
.cursor_by_columns_and_span(columns, curr, Some(limit + 1))
.cursor_by_columns_and_span(cursor_prefix_alias, columns, curr, Some(limit + 1))
.into_model::<Q>()
.all(db)
.await?;
@ -324,3 +333,8 @@ pub trait ModelPagination {
fn id(&self) -> &str;
fn time(&self) -> DateTime<Utc>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IdShape {
pub id: String,
}

View File

@ -3,8 +3,8 @@ pub mod data;
use ext_calckey_model_migration::SelectStatement;
use sea_orm::sea_query::{Asterisk, Expr, IntoIden, Query, SelectExpr, SimpleExpr};
use sea_orm::{
Condition, EntityTrait, Iden, IntoSimpleExpr, JoinType, QueryFilter, QueryOrder, QuerySelect,
QueryTrait, Select,
Condition, EntityTrait, Iden, JoinType, QueryFilter, QueryOrder, QuerySelect, QueryTrait,
Select,
};
use std::sync::Arc;
@ -177,6 +177,7 @@ impl NoteResolver {
notes_select
.get_paginated_model::<NoteData, _, _>(
self.db.inner(),
Some(note::Entity.base_prefix()),
(note::Column::CreatedAt, note::Column::Id),
pagination,
&mut None,

View File

@ -7,10 +7,13 @@ use crate::note_model::{NoteResolveOptions, NoteResolver};
use crate::user_model::{UserData, UserResolveOptions, UserResolver};
use crate::{CalckeyDbError, CalckeyModel};
use chrono::{DateTime, Utc};
use ck::sea_orm_active_enums::NotificationTypeEnum;
use ck::{access_token, notification, user};
use ext_calckey_model_migration::{JoinType, SelectStatement};
use magnetar_sdk::types::SpanFilter;
use sea_orm::Iden;
use sea_orm::prelude::Expr;
use sea_orm::sea_query::{IntoCondition, Query};
use sea_orm::{ActiveEnum, Iden, QueryTrait};
use sea_orm::{DbErr, EntityTrait, FromQueryResult, QueryFilter, QueryResult, QuerySelect};
use serde::{Deserialize, Serialize};
@ -55,12 +58,14 @@ impl ModelPagination for NotificationData {
}
}
const NOTIFICATION: &str = "notification.";
const NOTIFIER: &str = "notifier.";
const NOTIFICATION_NOTE: &str = "note.";
const ACCESS_TOKEN: &str = "access_token.";
pub struct NotificationResolveOptions {}
pub struct NotificationResolveOptions {
pub note_options: NoteResolveOptions,
pub user_options: UserResolveOptions,
}
#[derive(Clone)]
pub struct NotificationResolver {
@ -86,11 +91,12 @@ impl NotificationResolver {
&self,
q: &mut SelectStatement,
notification_tbl: &MagIden,
note_options: &NoteResolveOptions,
user_options: &UserResolveOptions,
resolve_options: &NotificationResolveOptions,
note_resolver: &NoteResolver,
user_resolver: &UserResolver,
) {
q.add_aliased_columns::<notification::Entity>(&notification_tbl);
let notifier_tbl = notification_tbl.join_str(NOTIFIER);
q.add_aliased_columns::<user::Entity>(&notifier_tbl);
q.join_columns(
@ -98,7 +104,7 @@ impl NotificationResolver {
notification::Relation::User2.with_from_alias(notification_tbl),
&notifier_tbl,
);
user_resolver.resolve(q, &notifier_tbl, &user_options);
user_resolver.resolve(q, &notifier_tbl, &resolve_options.user_options);
let token_tbl = notification_tbl.join_str(ACCESS_TOKEN);
q.add_aliased_columns::<access_token::Entity>(&token_tbl);
@ -114,14 +120,56 @@ impl NotificationResolver {
notification::Relation::Note.with_from_alias(notification_tbl),
&note_tbl,
);
note_resolver.attach_note(q, &note_tbl, 1, 1, note_options, &self.user_resolver);
note_resolver.attach_note(
q,
&note_tbl,
1,
1,
&resolve_options.note_options,
&self.user_resolver,
);
}
pub async fn get_single(
&self,
resolve_options: &NotificationResolveOptions,
notification_id: &str,
) -> Result<Option<NotificationData>, CalckeyDbError> {
let notification_tbl = notification::Entity.base_prefix();
let mut query = Query::select();
query.from_as(notification::Entity, notification_tbl.clone());
self.resolve(
&mut query,
&notification_tbl,
&resolve_options,
&self.note_resolver,
&self.user_resolver,
);
let mut select = notification::Entity::find();
*QuerySelect::query(&mut select) = query;
let notifications = select
.filter(
notification_tbl
.col(notification::Column::Id)
.eq(notification_id),
)
.into_model::<NotificationData>()
.one(self.db.inner())
.await?;
Ok(notifications)
}
pub async fn get(
&self,
note_options: &NoteResolveOptions,
user_options: &UserResolveOptions,
resolve_options: &NotificationResolveOptions,
user_id: &str,
notification_types: &[NotificationTypeEnum],
unread_only: bool,
pagination: &SpanFilter,
prev: &mut Option<SpanFilter>,
next: &mut Option<SpanFilter>,
@ -129,26 +177,46 @@ impl NotificationResolver {
) -> Result<Vec<NotificationData>, CalckeyDbError> {
let notification_tbl = notification::Entity.base_prefix();
let mut select = notification::Entity::find();
let mut query = Query::select();
query.from_as(notification::Entity, notification_tbl.clone());
let query = QuerySelect::query(&mut select);
self.resolve(
query,
&mut query,
&notification_tbl,
note_options,
user_options,
&resolve_options,
&self.note_resolver,
&self.user_resolver,
);
let mut select = notification::Entity::find();
*QuerySelect::query(&mut select) = query;
let notifications = select
.filter(
notification_tbl
.col(notification::Column::NotifieeId)
.eq(user_id),
.eq(user_id)
.and(
notification_tbl.col(notification::Column::Type).is_in(
notification_types
.iter()
.copied()
.map(Expr::val)
.map(|e| e.cast_as(NotificationTypeEnum::name())),
),
),
)
.apply_if(unread_only.then_some(()), |s, _| {
s.filter(
notification_tbl
.col(notification::Column::IsRead)
.not()
.into_condition(),
)
})
.get_paginated_model::<NotificationData, _, _>(
&self.db.0,
Some(notification_tbl),
(notification::Column::CreatedAt, notification::Column::Id),
pagination,
prev,

View File

@ -200,6 +200,7 @@ impl UserResolver {
.filter(follow_request::Column::FolloweeId.eq(followee))
.get_paginated_model::<UserFollowRequestData, _, _>(
&self.db.0,
None,
(
follow_request::Column::CreatedAt,
follow_request::Column::Id,
@ -241,6 +242,7 @@ impl UserResolver {
.filter(following::Column::FollowerId.eq(follower))
.get_paginated_model::<UserFollowData, _, _>(
&self.db.0,
None,
(following::Column::CreatedAt, following::Column::Id),
pagination,
prev,
@ -279,6 +281,7 @@ impl UserResolver {
.filter(following::Column::FolloweeId.eq(followee))
.get_paginated_model::<UserFollowData, _, _>(
&self.db.0,
None,
(following::Column::CreatedAt, following::Column::Id),
pagination,
prev,

View File

@ -286,7 +286,7 @@ const reactButton = ref<HTMLElement | null>(null);
let appearNote = $computed(
() => magEffectiveNote(note) as packed.PackNoteMaybeFull
);
const isMyRenote = $i && $i.id === note.user.id;
const isMyRenote = $i && $i.id === appearNote.user.id;
const showContent = ref(false);
const isDeleted = ref(false);
const muted = ref(getWordSoftMute(note, $i, defaultStore.state.mutedWords));

View File

@ -6,13 +6,24 @@
:class="notification.type"
>
<div class="head">
<MagAvatarResolvingProxy
v-if="notification.type === 'pollEnded'"
<MagAvatar
v-if="
notification.type === 'Renote' ||
notification.type === 'Reply' ||
notification.type === 'Mention' ||
notification.type === 'Quote' ||
notification.type === 'PollEnd'
"
class="icon"
:user="notification.note.user"
/>
<MagAvatarResolvingProxy
v-else-if="notification.user"
<MagAvatar
v-else-if="
notification.type === 'Reaction' ||
notification.type === 'FollowRequestAccepted' ||
notification.type === 'Follow' ||
notification.type === 'FollowRequestReceived'
"
class="icon"
:user="notification.user"
/>
@ -24,55 +35,51 @@
/>
<div class="sub-icon" :class="notification.type">
<i
v-if="notification.type === 'follow'"
v-if="notification.type === 'Follow'"
class="ph-hand-waving ph-bold"
></i>
<i
v-else-if="notification.type === 'receiveFollowRequest'"
v-else-if="notification.type === 'FollowRequestReceived'"
class="ph-clock ph-bold"
></i>
<i
v-else-if="notification.type === 'followRequestAccepted'"
v-else-if="notification.type === 'FollowRequestAccepted'"
class="ph-check ph-bold"
></i>
<i
v-else-if="notification.type === 'renote'"
v-else-if="notification.type === 'Renote'"
class="ph-repeat ph-bold"
></i>
<i
v-else-if="notification.type === 'reply'"
v-else-if="notification.type === 'Reply'"
class="ph-arrow-bend-up-left ph-bold"
></i>
<i
v-else-if="notification.type === 'mention'"
v-else-if="notification.type === 'Mention'"
class="ph-at ph-bold"
></i>
<i
v-else-if="notification.type === 'quote'"
v-else-if="notification.type === 'Quote'"
class="ph-quotes ph-bold"
></i>
<i
v-else-if="notification.type === 'pollVote'"
class="ph-microphone-stage ph-bold"
></i>
<i
v-else-if="notification.type === 'pollEnded'"
v-else-if="notification.type === 'PollEnd'"
class="ph-microphone-stage ph-bold"
></i>
<!-- notification.reaction null になることはまずないがここでoptional chaining使うと一部ブラウザで刺さるので念の為 -->
<MagEmoji
v-else-if="
showEmojiReactions && notification.type === 'reaction'
showEmojiReactions && notification.type === 'Reaction'
"
ref="reactionRef"
:emoji="normalizeNotifReaction(notification)"
:emoji="notification.reaction"
:is-reaction="true"
:normal="true"
:no-style="true"
/>
<MagEmoji
v-else-if="
!showEmojiReactions && notification.type === 'reaction'
!showEmojiReactions && notification.type === 'Reaction'
"
:emoji="defaultReaction"
:is-reaction="true"
@ -83,11 +90,29 @@
</div>
<div class="tail">
<header>
<span v-if="notification.type === 'pollEnded'">{{
<span v-if="notification.type === 'PollEnd'">{{
i18n.ts._notification.pollEnded
}}</span>
<MkA
v-else-if="notification.user"
v-if="
notification.type === 'Renote' ||
notification.type === 'Reply' ||
notification.type === 'Mention' ||
notification.type === 'Quote' ||
notification.type === 'PollEnd'
"
v-user-preview="notification.note.user.id"
class="name"
:to="userPage(notification.note.user)"
><MkUserName :user="notification.note.user"
/></MkA>
<MkA
v-else-if="
notification.type === 'Reaction' ||
notification.type === 'FollowRequestAccepted' ||
notification.type === 'Follow' ||
notification.type === 'FollowRequestReceived'
"
v-user-preview="notification.user.id"
class="name"
:to="userPage(notification.user)"
@ -96,12 +121,12 @@
<span v-else>{{ notification.header }}</span>
<MkTime
v-if="withTime"
:time="notification.createdAt"
:time="notification.created_at"
class="time"
/>
</header>
<MkA
v-if="notification.type === 'reaction'"
v-if="notification.type === 'Reaction'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note)"
@ -117,23 +142,23 @@
<i class="ph-quotes ph-fill ph-lg"></i>
</MkA>
<MkA
v-if="notification.type === 'renote'"
v-if="notification.type === 'Renote'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note.renote!)"
:to="notePage(notification.note.renoted_note!)"
:title="getNoteSummary(notification.note.renoted_note!)"
>
<span>{{ i18n.ts._notification.renoted }}</span>
<i class="ph-quotes ph-fill ph-lg"></i>
<Mfm
:text="getNoteSummary(notification.note.renote!)"
:text="getNoteSummary(notification.note.renoted_note!)"
:plain="true"
:nowrap="!full"
:custom-emojis="notification.note.renote.emojis"
:custom-emojis="notification.note.renoted_note!.emojis"
/>
<i class="ph-quotes ph-fill ph-lg"></i>
</MkA>
<MkA
v-if="notification.type === 'reply'"
v-if="notification.type === 'Reply'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note)"
@ -146,7 +171,7 @@
/>
</MkA>
<MkA
v-if="notification.type === 'mention'"
v-if="notification.type === 'Mention'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note)"
@ -159,7 +184,7 @@
/>
</MkA>
<MkA
v-if="notification.type === 'quote'"
v-if="notification.type === 'Quote'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note)"
@ -172,23 +197,7 @@
/>
</MkA>
<MkA
v-if="notification.type === 'pollVote'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note)"
>
<span>{{ i18n.ts._notification.voted }}</span>
<i class="ph-quotes ph-fill ph-lg"></i>
<Mfm
:text="getNoteSummary(notification.note)"
:plain="true"
:nowrap="!full"
:custom-emojis="notification.note?.emojis"
/>
<i class="ph-quotes ph-fill ph-lg"></i>
</MkA>
<MkA
v-if="notification.type === 'pollEnded'"
v-if="notification.type === 'PollEnd'"
class="text"
:to="notePage(notification.note)"
:title="getNoteSummary(notification.note)"
@ -203,7 +212,7 @@
<i class="ph-quotes ph-fill ph-lg"></i>
</MkA>
<span
v-if="notification.type === 'follow'"
v-if="notification.type === 'Follow'"
class="text"
style="opacity: 0.7"
>{{ i18n.ts.youGotNewFollower }}
@ -216,13 +225,13 @@
</div>
</span>
<span
v-if="notification.type === 'followRequestAccepted'"
v-if="notification.type === 'FollowRequestAccepted'"
class="text"
style="opacity: 0.7"
>{{ i18n.ts.followRequestAccepted }}</span
>
<span
v-if="notification.type === 'receiveFollowRequest'"
v-if="notification.type === 'FollowRequestReceived'"
class="text"
style="opacity: 0.7"
>{{ i18n.ts.receiveFollowRequest }}
@ -233,7 +242,7 @@
/>
</div>
</span>
<span v-if="notification.type === 'app'" class="text">
<span v-if="notification.type === 'App'" class="text">
<Mfm :text="notification.body" :nowrap="!full" />
</span>
</div>
@ -242,25 +251,23 @@
<script lang="ts" setup>
import { onMounted, onUnmounted, ref, watch } from "vue";
import * as misskey from "calckey-js";
import MkFollowButton from "@/components/MkFollowButton.vue";
import XReactionTooltip from "@/components/MkReactionTooltip.vue";
import { getNoteSummary } from "@/scripts/get-note-summary";
import { notePage } from "@/filters/note";
import { userPage } from "@/filters/user";
import { i18n } from "@/i18n";
import * as os from "@/os";
import { stream } from "@/stream";
import { useTooltip } from "@/scripts/use-tooltip";
import { defaultStore } from "@/store";
import { instance } from "@/instance";
import MkFollowApproveButton from "@/components/MkFollowApproveButton.vue";
import { magConvertReaction, magIsCustomEmoji } from "@/scripts-mag/mag-util";
import { types } from "magnetar-common";
import { packed } from "magnetar-common";
import { i18n } from "@/i18n";
const props = withDefaults(
defineProps<{
notification: misskey.entities.Notification;
notification: packed.PackNotification;
withTime?: boolean;
full?: boolean;
}>(),
@ -281,32 +288,11 @@ const defaultReaction = ["⭐", "👍", "❤️"].includes(instance.defaultReact
? instance.defaultReaction
: "⭐";
function normalizeNotifReaction(
notification: misskey.entities.Notification & { type: "reaction" }
): types.Reaction {
return notification.reaction
? magConvertReaction(
notification.reaction,
(name, host) =>
notification.note.emojis.find((e) => {
const parsed = magConvertReaction(`:${e.name}:`, e.url);
if (!magIsCustomEmoji(parsed)) return false;
return (
parsed.name === name &&
(parsed.host ?? null) === (host ?? null)
);
})?.url!
)
: notification.reaction;
}
let readObserver: IntersectionObserver | undefined;
let connection;
onMounted(() => {
if (!props.notification.isRead) {
if (!props.notification.is_read) {
readObserver = new IntersectionObserver((entries, observer) => {
if (!entries.some((entry) => entry.isIntersecting)) return;
stream.send("readNotification", {
@ -318,11 +304,14 @@ onMounted(() => {
readObserver.observe(elRef.value);
connection = stream.useChannel("main");
connection.on("readAllNotifications", () => readObserver.disconnect());
connection.on("readAllNotifications", () => readObserver!.disconnect());
watch(props.notification.isRead, () => {
readObserver.disconnect();
});
watch(
() => props.notification.is_read,
() => {
readObserver!.disconnect();
}
);
}
});
@ -334,13 +323,13 @@ onUnmounted(() => {
const followRequestDone = ref(false);
useTooltip(reactionRef, (showing) => {
if (props.notification.type !== "reaction") return;
if (props.notification.type !== "Reaction") return;
os.popup(
XReactionTooltip,
{
showing,
reaction: normalizeNotifReaction(props.notification),
reaction: props.notification.reaction,
targetElement: reactionRef.value.$el,
},
{},
@ -406,46 +395,39 @@ useTooltip(reactionRef, (showing) => {
height: 100%;
}
&.follow,
&.followRequestAccepted,
&.receiveFollowRequest,
&.groupInvited {
&.Follow,
&.FollowRequestAccepted,
&.FollowRequestReceived {
padding: 3px;
background: #31748f;
pointer-events: none;
}
&.renote {
&.Renote {
padding: 3px;
background: #31748f;
pointer-events: none;
}
&.quote {
&.Quote {
padding: 3px;
background: #31748f;
pointer-events: none;
}
&.reply {
&.Reply {
padding: 3px;
background: #c4a7e7;
pointer-events: none;
}
&.mention {
&.Mention {
padding: 3px;
background: #908caa;
pointer-events: none;
}
&.pollVote {
padding: 3px;
background: #908caa;
pointer-events: none;
}
&.pollEnded {
&.PollEnd {
padding: 3px;
background: #908caa;
pointer-events: none;

View File

@ -16,7 +16,7 @@
<script lang="ts" setup>
import { onMounted } from "vue";
import XNotification from "@/components/MkNotification.vue";
import XNotification from "@/components/MagNotification.vue";
import * as os from "@/os";
defineProps<{

View File

@ -1,5 +1,5 @@
<template>
<MkPagination ref="pagingComponent" :pagination="pagination">
<MagPagination ref="pagingComponent" :pagination="pagination">
<template #empty>
<div class="_fullinfo">
<img
@ -11,26 +11,23 @@
</div>
</template>
<template #default="{ items: notifications }">
<template #items="{ items: notifications }">
<XList
v-slot="{ item: notification }"
class="elsfgstc"
:items="notifications"
:no-gap="true"
>
<XNoteResolvingProxy
<XNote
v-if="
['reply', 'quote', 'mention'].includes(
notification.type
)
notification.note &&
(notification.type === 'Quote' ||
notification.type === 'Mention' ||
notification.type === 'Reply')
"
:key="notification.id"
:note="notification.note.id"
:collapsedReply="
notification.type === 'reply' ||
(notification.type === 'mention' &&
notification.note.replyId != null)
"
:note="notification.note"
:collapsedReply="!!notification.note.parent_note"
/>
<XNotification
v-else
@ -42,37 +39,36 @@
/>
</XList>
</template>
</MkPagination>
</MagPagination>
</template>
<script lang="ts" setup>
import { computed, onMounted, onUnmounted, ref } from "vue";
import { notificationTypes } from "calckey-js";
import MkPagination, { Paging } from "@/components/MkPagination.vue";
import XNotification from "@/components/MkNotification.vue";
import { onMounted, onUnmounted, ref } from "vue";
import XNotification from "@/components/MagNotification.vue";
import XList from "@/components/MkDateSeparatedList.vue";
import XNoteResolvingProxy from "@/components/MagNoteResolvingProxy.vue";
import { stream } from "@/stream";
import XNote from "@/components/MagNote.vue";
import { magStream, stream } from "@/stream";
import { $i } from "@/account";
import MagPagination, { Paging } from "@/components/MagPagination.vue";
import { endpoints, types } from "magnetar-common";
import { i18n } from "@/i18n";
const props = defineProps<{
includeTypes?: (typeof notificationTypes)[number][];
includeTypes?: types.NotificationType[];
unreadOnly?: boolean;
}>();
const pagingComponent = ref<InstanceType<typeof MkPagination>>();
const pagingComponent = ref<InstanceType<typeof MagPagination>>();
const pagination: Paging = {
endpoint: "i/notifications" as const,
limit: 10,
params: computed(() => ({
includeTypes: props.includeTypes ?? undefined,
excludeTypes: props.includeTypes
endpoint: endpoints.GetNotifications,
params: {
include_types: props.includeTypes ?? undefined,
exclude_types: props.includeTypes
? undefined
: $i?.mutingNotificationTypes,
unreadOnly: props.unreadOnly,
})),
unread_only: props.unreadOnly,
} as types.NotificationsReq,
};
const onNotification = (notification) => {
@ -93,18 +89,20 @@ const onNotification = (notification) => {
}
};
let notifStream;
let connection;
onMounted(() => {
notifStream = magStream.useFiltered("Notification", onNotification);
connection = stream.useChannel("main");
connection.on("notification", onNotification);
connection.on("readAllNotifications", () => {
if (pagingComponent.value) {
for (const item of pagingComponent.value.queue) {
item.isRead = true;
item.is_read = true;
}
for (const item of pagingComponent.value.items) {
item.isRead = true;
item.is_read = true;
}
}
});
@ -114,7 +112,7 @@ onMounted(() => {
if (
notificationIds.includes(pagingComponent.value.queue[i].id)
) {
pagingComponent.value.queue[i].isRead = true;
pagingComponent.value.queue[i].is_read = true;
}
}
for (
@ -125,7 +123,7 @@ onMounted(() => {
if (
notificationIds.includes(pagingComponent.value.items[i].id)
) {
pagingComponent.value.items[i].isRead = true;
pagingComponent.value.items[i].is_read = true;
}
}
}

View File

@ -7,6 +7,7 @@ export const host = _HOST || address.host;
export const hostname = address.hostname;
export const url = _REMOTE_URL || address.origin;
export const apiUrl = `${url}/api`;
export const magStreamingUrl = `${url}/mag/v1`;
export const feApiUrl = `${url}/fe-api`;
export const wsUrl = `${url
.replace("http://", "ws://")

View File

@ -62,7 +62,7 @@ import { computed, ref, watch } from "vue";
import { Virtual } from "swiper";
import { Swiper, SwiperSlide } from "swiper/vue";
import { notificationTypes } from "calckey-js";
import XNotifications from "@/components/MkNotifications.vue";
import XNotifications from "@/components/MagNotifications.vue";
import XNotes from "@/components/MkNotes.vue";
import * as os from "@/os";
import { i18n } from "@/i18n";

View File

@ -1,7 +1,8 @@
import * as Misskey from "calckey-js";
import { markRaw } from "vue";
import { $i } from "@/account";
import { url } from "@/config";
import { magStreamingUrl, url } from "@/config";
import { MagEventChannel } from "magnetar-common";
export const stream = markRaw(
new Misskey.Stream(
@ -22,3 +23,7 @@ function heartbeat(): void {
}
window.setTimeout(heartbeat, 1000 * 60);
}
export const magStream = markRaw(
new MagEventChannel(magStreamingUrl, $i ? $i.token : null)
);

View File

@ -19,11 +19,12 @@
<script lang="ts" setup>
import { defineAsyncComponent } from "vue";
import { swInject } from "./sw-inject";
import { popup, popups, pendingApiRequestsCount } from "@/os";
import { popup, popups } from "@/os";
import { uploads } from "@/scripts/upload";
import * as sound from "@/scripts/sound";
import { $i } from "@/account";
import { stream } from "@/stream";
import { magStream, stream } from "@/stream";
import { PackNotification } from "magnetar-common/built/types/PackNotification";
const XStreamIndicator = defineAsyncComponent(
() => import("./stream-indicator.vue")
@ -32,7 +33,7 @@ const XUpload = defineAsyncComponent(() => import("./upload.vue"));
const dev = _DEV_;
const onNotification = (notification) => {
const onNotification = (notification: PackNotification) => {
if ($i.mutingNotificationTypes.includes(notification.type)) return;
if (document.visibilityState === "visible") {
@ -42,7 +43,7 @@ const onNotification = (notification) => {
popup(
defineAsyncComponent(
() => import("@/components/MkNotificationToast.vue")
() => import("@/components/MagNotificationToast.vue")
),
{
notification,
@ -56,8 +57,7 @@ const onNotification = (notification) => {
};
if ($i) {
const connection = stream.useChannel("main", null, "UI");
connection.on("notification", onNotification);
const connection = magStream.useFiltered("Notification", onNotification);
//#region Listen message from SW
if ("serviceWorker" in navigator) {

View File

@ -17,9 +17,9 @@
<script lang="ts" setup>
import { defineAsyncComponent } from "vue";
import XColumn from "./column.vue";
import { updateColumn } from "./deck-store";
import type { Column } from "./deck-store";
import XNotifications from "@/components/MkNotifications.vue";
import { updateColumn } from "./deck-store";
import XNotifications from "@/components/MagNotifications.vue";
import * as os from "@/os";
import { i18n } from "@/i18n";

View File

@ -26,16 +26,10 @@
<script lang="ts" setup>
import { defineAsyncComponent } from "vue";
import {
useWidgetPropsManager,
Widget,
WidgetComponentEmits,
WidgetComponentExpose,
WidgetComponentProps,
} from "./widget";
import { useWidgetPropsManager, Widget, WidgetComponentExpose } from "./widget";
import { GetFormResultType } from "@/scripts/form";
import MkContainer from "@/components/MkContainer.vue";
import XNotifications from "@/components/MkNotifications.vue";
import XNotifications from "@/components/MagNotifications.vue";
import * as os from "@/os";
import { i18n } from "@/i18n";

View File

@ -11,5 +11,8 @@
"description": "A library with common utilities for Magnetar application development",
"devDependencies": {
"typescript": "^5.1.6"
},
"dependencies": {
"eventemitter3": "^5.0.1"
}
}

View File

@ -36,10 +36,9 @@ function nestedUrlSearchParams(data: any, topLevel: boolean = true): string {
.map(encodeURIComponent)
.join("&");
const inner = Object.entries(data).map(([k, v]) => [
k,
nestedUrlSearchParams(v, false),
]);
const inner = Object.entries(data)
.filter(([_, v]) => typeof v !== "undefined")
.map(([k, v]) => [k, nestedUrlSearchParams(v, false)]);
return new URLSearchParams(inner).toString();

View File

@ -9,3 +9,4 @@ export { GetFollowersSelf } from "./types/endpoints/GetFollowersSelf";
export { GetFollowingById } from "./types/endpoints/GetFollowingById";
export { GetFollowingSelf } from "./types/endpoints/GetFollowingSelf";
export { GetFollowRequestsSelf } from "./types/endpoints/GetFollowRequestsSelf";
export { GetNotifications } from "./types/endpoints/GetNotifications";

View File

@ -13,9 +13,11 @@ import {
FrontendApiEndpoints,
} from "./fe-api";
export * as types from "./types";
export * as packed from "./packed";
export * as endpoints from "./endpoints";
import { MagEventChannel } from "./sse-listener";
import * as types from "./types";
import * as packed from "./packed";
import * as endpoints from "./endpoints";
export {
Method,
@ -27,4 +29,8 @@ export {
feEndpoints,
FrontendApiEndpoint,
FrontendApiEndpoints,
MagEventChannel,
types,
packed,
endpoints,
};

View File

@ -0,0 +1,142 @@
import { EventEmitter } from "eventemitter3";
import { ChannelEvent } from "./types/ChannelEvent";
export type MagChannelState = "connected" | "exponentialBackoff" | "failed";
export class MagEventChannel extends EventEmitter<{
stateChange: MagChannelState;
message: ChannelEvent;
close: "cancelled";
}> {
private readonly baseUrl: string;
private attempts = 0;
private readonly maxAttempts: number;
private readonly token: string | null;
private readonly backoffFactor: number;
private readonly backoffBase: number;
private readonly closePromise: Promise<"cancelled">;
public constructor(
baseUrl: string,
token: string | null,
maxReconnectAttempts: number = 12,
backoffFactor: number = 1.618,
backoffBase: number = 500.0
) {
super();
this.baseUrl = baseUrl;
this.token = token;
this.maxAttempts = maxReconnectAttempts;
this.backoffFactor = backoffFactor;
this.backoffBase = backoffBase;
this.closePromise = new Promise((resolve) => {
this.on("close", resolve);
});
this.connect().then();
}
public useFiltered<T extends ChannelEvent["type"]>(
messageType: T,
listener: (val: (ChannelEvent & { type: T })["body"]) => void
) {
const cb = (val: ChannelEvent) => {
if (val.type != messageType) return;
listener((val as ChannelEvent & { type: T }).body);
};
this.on("message", cb);
return cb;
}
private async connect() {
if (this.attempts >= this.maxAttempts) {
this.emit("stateChange", "failed");
return;
}
const authorization = this.token ? `Bearer ${this.token}` : undefined;
const baseUrl = this.baseUrl.replace(/\/+$/, "");
const response = await fetch(`${baseUrl}/streaming`, {
method: "GET",
headers: authorization ? { authorization } : {},
credentials: "omit",
cache: "no-cache",
}).catch((e) => {
console.error(e);
return null;
});
if (
response === null ||
response.status >= 500 ||
response.body === null
) {
this.emit("stateChange", "exponentialBackoff");
setTimeout(
() => this.connect(),
this.backoffBase * Math.pow(this.backoffFactor, this.attempts)
);
this.attempts++;
return;
}
if (response.status >= 400 && response.status < 500) {
this.emit("stateChange", "failed");
return;
}
this.attempts = 0;
const decoderStream = new TextDecoderStream();
const reader = response.body.pipeThrough(decoderStream).getReader();
this.emit("stateChange", "connected");
let buf = "";
while (true) {
const res = await Promise.race([reader.read(), this.closePromise]);
if (res === "cancelled") break;
if (res.done) {
this.emit("stateChange", "exponentialBackoff");
setTimeout(
() => this.connect(),
this.backoffBase *
Math.pow(this.backoffFactor, this.attempts)
);
this.attempts++;
break;
}
buf += res.value;
const splitIndex = buf.indexOf("\n\n");
if (splitIndex === -1) {
continue;
}
const rawValue = buf.substring(0, splitIndex);
buf = buf.substring(splitIndex + 2);
if (rawValue.startsWith(":")) continue;
const text = rawValue
.split("\n")
.filter((l) => l.startsWith("data: "))
.map((l) => l.substring("data: ".length))
.join("\n");
if (!text) continue;
const data = JSON.parse(text) as ChannelEvent;
this.emit("message", data);
}
}
public async close() {
this.emit("close", "cancelled");
}
}

View File

@ -54,3 +54,5 @@ export { NotificationAppExt } from "./types/NotificationAppExt";
export { NotificationNoteExt } from "./types/NotificationNoteExt";
export { NotificationReactionExt } from "./types/NotificationReactionExt";
export { NotificationUserExt } from "./types/NotificationUserExt";
export { NotificationsReq } from "./types/NotificationsReq";
export { ChannelEvent } from "./types/ChannelEvent";

View File

@ -0,0 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { PackNotification } from "./PackNotification";
export type ChannelEvent = { "type": "Notification", "body": PackNotification };

View File

@ -0,0 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { NotificationType } from "./NotificationType";
export interface NotificationsReq { include_types?: Array<NotificationType>, exclude_types?: Array<NotificationType>, unread_only?: boolean, }

View File

@ -0,0 +1,13 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { NotificationsReq } from "../NotificationsReq";
import type { PackNotification } from "../PackNotification";
export const GetNotifications = {
endpoint: "/users/@self/notifications",
pathParams: [] as [],
method: "GET" as "GET" | "POST" | "PUT" | "DELETE" | "PATCH",
request: undefined as unknown as NotificationsReq,
response: undefined as unknown as Array<PackNotification>,
paginated: true as true
}

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationAppExt } from "../NotificationAppExt";
import type { NotificationBase } from "../NotificationBase";
export type PackNotificationApp = Id & NotificationAppExt;
export type PackNotificationApp = Id & NotificationBase & NotificationAppExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationUserExt } from "../NotificationUserExt";
export type PackNotificationFollow = Id & NotificationUserExt;
export type PackNotificationFollow = Id & NotificationBase & NotificationUserExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationUserExt } from "../NotificationUserExt";
export type PackNotificationFollowRequestAccepted = Id & NotificationUserExt;
export type PackNotificationFollowRequestAccepted = Id & NotificationBase & NotificationUserExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationUserExt } from "../NotificationUserExt";
export type PackNotificationFollowRequestReceived = Id & NotificationUserExt;
export type PackNotificationFollowRequestReceived = Id & NotificationBase & NotificationUserExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationNoteExt } from "../NotificationNoteExt";
export type PackNotificationMention = Id & NotificationNoteExt;
export type PackNotificationMention = Id & NotificationBase & NotificationNoteExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationNoteExt } from "../NotificationNoteExt";
export type PackNotificationPollEnd = Id & NotificationNoteExt;
export type PackNotificationPollEnd = Id & NotificationBase & NotificationNoteExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationNoteExt } from "../NotificationNoteExt";
export type PackNotificationQuote = Id & NotificationNoteExt;
export type PackNotificationQuote = Id & NotificationBase & NotificationNoteExt;

View File

@ -1,6 +1,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationNoteExt } from "../NotificationNoteExt";
import type { NotificationReactionExt } from "../NotificationReactionExt";
import type { NotificationUserExt } from "../NotificationUserExt";
export type PackNotificationReaction = Id & NotificationReactionExt & NotificationUserExt;
export type PackNotificationReaction = Id & NotificationBase & NotificationNoteExt & NotificationReactionExt & NotificationUserExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationNoteExt } from "../NotificationNoteExt";
export type PackNotificationRenote = Id & NotificationNoteExt;
export type PackNotificationRenote = Id & NotificationBase & NotificationNoteExt;

View File

@ -1,5 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { Id } from "../Id";
import type { NotificationBase } from "../NotificationBase";
import type { NotificationNoteExt } from "../NotificationNoteExt";
export type PackNotificationReply = Id & NotificationNoteExt;
export type PackNotificationReply = Id & NotificationBase & NotificationNoteExt;

View File

@ -358,6 +358,10 @@ importers:
version: 4.1.0(vue@3.3.4)
magnetar-common:
dependencies:
eventemitter3:
specifier: ^5.0.1
version: 5.0.1
devDependencies:
typescript:
specifier: ^5.1.6
@ -3269,6 +3273,10 @@ packages:
/eventemitter3@4.0.7:
resolution: {integrity: sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==}
/eventemitter3@5.0.1:
resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==}
dev: false
/events@3.3.0:
resolution: {integrity: sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==}
engines: {node: '>=0.8.x'}

View File

@ -1,5 +1,6 @@
use crate::endpoints::{Empty, Endpoint};
use crate::util_types::deserialize_array_urlenc;
use crate::types::notification::{NotificationType, PackNotification};
use crate::util_types::{deserialize_array_urlenc, deserialize_opt_array_urlenc};
use http::Method;
use magnetar_sdk_macros::Endpoint;
use serde::{Deserialize, Serialize};
@ -83,6 +84,31 @@ pub struct GetManyUsersById;
)]
pub struct GetUserByAcct;
#[derive(Serialize, Deserialize, TS)]
#[ts(export)]
pub struct NotificationsReq {
#[ts(optional)]
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_array_urlenc")]
pub include_types: Option<Vec<NotificationType>>,
#[ts(optional)]
#[serde(default)]
#[serde(deserialize_with = "deserialize_opt_array_urlenc")]
pub exclude_types: Option<Vec<NotificationType>>,
#[ts(optional)]
pub unread_only: Option<bool>,
}
#[derive(Endpoint)]
#[endpoint(
endpoint = "/users/@self/notifications",
method = Method::GET,
request = "NotificationsReq",
response = "Vec<PackNotification>",
paginated = true
)]
pub struct GetNotifications;
#[derive(Endpoint)]
#[endpoint(
endpoint = "/users/:id/followers",

View File

@ -3,6 +3,7 @@ pub mod emoji;
pub mod instance;
pub mod note;
pub mod notification;
pub mod streaming;
pub mod timeline;
pub mod user;
@ -29,7 +30,7 @@ pub(crate) mod packed_time {
where
D: Deserializer<'de>,
{
Ok(DateTime::<Utc>::from_utc(
Ok(DateTime::<Utc>::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_millis(
String::deserialize(deserializer)?
.parse::<i64>()

View File

@ -1,13 +1,11 @@
use crate::types::note::{
PackNoteMaybeFull, Reaction, ReactionShortcode, ReactionUnicode, ReactionUnknown,
};
use crate::types::note::{PackNoteMaybeFull, Reaction};
use crate::types::user::PackUserBase;
use crate::types::Id;
use crate::{Packed, Required};
use chrono::{DateTime, Utc};
use magnetar_sdk_macros::pack;
use serde::{Deserialize, Serialize};
use strum::EnumDiscriminants;
use strum::{EnumDiscriminants, EnumIter};
use ts_rs::TS;
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
@ -44,21 +42,21 @@ pub struct NotificationAppExt {
pub icon: String,
}
pack!(PackNotificationFollow, Required<Id> as id & Required<NotificationUserExt> as follower);
pack!(PackNotificationFollowRequestReceived, Required<Id> as id & Required<NotificationUserExt> as follower);
pack!(PackNotificationFollowRequestAccepted, Required<Id> as id & Required<NotificationUserExt> as follower);
pack!(PackNotificationMention, Required<Id> as id & Required<NotificationNoteExt> as note);
pack!(PackNotificationReply, Required<Id> as id & Required<NotificationNoteExt> as note);
pack!(PackNotificationRenote, Required<Id> as id & Required<NotificationNoteExt> as note);
pack!(PackNotificationReaction, Required<Id> as id & Required<NotificationReactionExt> as reaction & Required<NotificationUserExt> as user);
pack!(PackNotificationQuote, Required<Id> as id & Required<NotificationNoteExt> as note);
pack!(PackNotificationPollEnd, Required<Id> as id & Required<NotificationNoteExt> as note);
pack!(PackNotificationApp, Required<Id> as id & Required<NotificationAppExt> as custom);
pack!(PackNotificationFollow, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationUserExt> as follower);
pack!(PackNotificationFollowRequestReceived, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationUserExt> as follower);
pack!(PackNotificationFollowRequestAccepted, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationUserExt> as follower);
pack!(PackNotificationMention, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationNoteExt> as note);
pack!(PackNotificationReply, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationNoteExt> as note);
pack!(PackNotificationRenote, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationNoteExt> as note);
pack!(PackNotificationReaction, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationNoteExt> as note & Required<NotificationReactionExt> as reaction & Required<NotificationUserExt> as user);
pack!(PackNotificationQuote, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationNoteExt> as note);
pack!(PackNotificationPollEnd, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationNoteExt> as note);
pack!(PackNotificationApp, Required<Id> as id & Required<NotificationBase> as notification & Required<NotificationAppExt> as custom);
#[derive(Clone, Debug, Deserialize, Serialize, TS, EnumDiscriminants)]
#[strum_discriminants(name(NotificationType))]
#[strum_discriminants(ts(export))]
#[strum_discriminants(derive(Deserialize, Serialize, TS))]
#[strum_discriminants(derive(Deserialize, Serialize, Hash, TS, EnumIter))]
#[ts(export)]
#[serde(tag = "type")]
pub enum PackNotification {

View File

@ -0,0 +1,10 @@
use crate::types::notification::PackNotification;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
#[derive(Clone, Debug, Deserialize, Serialize, TS)]
#[serde(tag = "type", content = "body")]
#[ts(export)]
pub enum ChannelEvent {
Notification(PackNotification),
}

View File

@ -95,3 +95,23 @@ where
Ok(parts)
}
pub(crate) fn deserialize_opt_array_urlenc<'de, D, T: Eq + Hash>(
deserializer: D,
) -> Result<Option<Vec<T>>, D::Error>
where
D: serde::Deserializer<'de>,
T: DeserializeOwned,
{
let Some(str_raw) = Option::<String>::deserialize(deserializer)? else {
return Ok(None);
};
let parts = serde_urlencoded::from_str::<Vec<(T, String)>>(&str_raw)
.map_err(serde::de::Error::custom)?
.into_iter()
.map(|(k, _)| k)
.collect::<Vec<_>>();
Ok(Some(parts))
}

View File

@ -1,11 +1,13 @@
mod note;
mod streaming;
mod user;
use crate::api_v1::note::handle_note;
use crate::api_v1::streaming::handle_streaming;
use crate::api_v1::user::{
handle_follow_requests_self, handle_followers, handle_followers_self, handle_following,
handle_following_self, handle_user_by_id_many, handle_user_info, handle_user_info_by_acct,
handle_user_info_self,
handle_following_self, handle_notifications, handle_user_by_id_many, handle_user_info,
handle_user_info_by_acct, handle_user_info_self,
};
use crate::service::MagnetarService;
use crate::web::auth;
@ -21,6 +23,7 @@ pub fn create_api_router(service: Arc<MagnetarService>) -> Router {
.route("/users/by-acct/:id", get(handle_user_info_by_acct))
.route("/users/lookup-many", get(handle_user_by_id_many))
.route("/users/:id", get(handle_user_info))
.route("/users/@self/notifications", get(handle_notifications))
.route(
"/users/@self/follow-requests",
get(handle_follow_requests_self),
@ -30,6 +33,7 @@ pub fn create_api_router(service: Arc<MagnetarService>) -> Router {
.route("/users/@self/followers", get(handle_followers_self))
.route("/users/:id/followers", get(handle_followers))
.route("/notes/:id", get(handle_note))
.route("/streaming", get(handle_streaming))
.layer(from_fn_with_state(
AuthState::new(service.clone()),
auth::auth,

117
src/api_v1/streaming.rs Normal file
View File

@ -0,0 +1,117 @@
use crate::model::processing::notification::NotificationModel;
use crate::model::PackingContext;
use crate::service::MagnetarService;
use crate::web::auth::AuthenticatedUser;
use crate::web::ApiError;
use axum::extract::State;
use axum::response::sse::{Event, KeepAlive};
use axum::response::Sse;
use futures::Stream;
use futures_util::StreamExt as _;
use magnetar_calckey_model::model_ext::IdShape;
use magnetar_calckey_model::{CalckeySub, MainStreamMessage, SubMessage};
use magnetar_sdk::types::streaming::ChannelEvent;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, trace, warn};
pub fn drop_on_close(sub: CalckeySub, tx: mpsc::Sender<MainStreamMessage>) {
tokio::spawn(async move {
tx.closed().await;
drop(sub);
debug!("Dropped the listener.");
});
}
pub async fn handle_streaming(
State(service): State<Arc<MagnetarService>>,
AuthenticatedUser(self_user): AuthenticatedUser,
) -> Result<Sse<impl Stream<Item = Result<Event, axum::Error>>>, ApiError> {
trace!("SSE connection from user `{}` start", self_user.username);
let (tx, rx) = mpsc::channel(1024);
let sub_tx = tx.clone();
let sub_user_id = self_user.id.clone();
let sub = service
.cache
.conn()
.await?
.subscribe(&service.config.networking.host, move |message| {
let user_id = sub_user_id.clone();
let tx = sub_tx.clone();
async move {
let SubMessage::MainStream(id, msg) = message else {
return;
};
if id != user_id {
trace!(
"Skipping message intended for {} in channel {}",
id,
user_id
);
return;
}
if let Err(e) = tx.send(msg).await {
warn!("Failed to send stream channel message: {e}");
}
}
})
.await?;
drop_on_close(sub, tx);
let event_counter = Arc::new(AtomicU64::default());
let stream = ReceiverStream::new(rx).filter_map(move |m| {
trace!("Processing raw message: {:?}", m);
let service = service.clone();
let self_user = self_user.clone();
let event_counter = event_counter.clone();
async move {
let message = match m {
MainStreamMessage::Notification(IdShape { id }) => {
let ctx = PackingContext::new(service, Some(self_user.clone()))
.await
.map_err(|e| {
error!("Failed to create notification packing context: {}", e);
e
})
.ok()?;
let notification_model = NotificationModel;
Some(
Event::default()
.id(event_counter.fetch_add(1, Ordering::Relaxed).to_string())
.event("message")
.json_data(ChannelEvent::Notification(
notification_model
.get_notification(&ctx, &id, &self_user.id)
.await
.map_err(|e| {
error!("Failed to fetch notification: {}", e);
e
})
.ok()
.flatten()?,
)),
)
}
};
trace!("Sending message: {:?}", message);
message
}
});
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(2))
.text("mag-keep-alive"),
))
}

View File

@ -1,3 +1,4 @@
use crate::model::processing::notification::NotificationModel;
use crate::model::processing::user::{UserBorrowedData, UserModel, UserShapedData};
use crate::model::PackingContext;
use crate::service::MagnetarService;
@ -10,11 +11,14 @@ use itertools::Itertools;
use magnetar_common::util::lenient_parse_tag_decode;
use magnetar_sdk::endpoints::user::{
GetFollowRequestsSelf, GetFollowersById, GetFollowersSelf, GetFollowingById, GetFollowingSelf,
GetManyUsersById, GetUserByAcct, GetUserById, GetUserSelf, ManyUsersByIdReq,
GetManyUsersById, GetNotifications, GetUserByAcct, GetUserById, GetUserSelf, ManyUsersByIdReq,
NotificationsReq,
};
use magnetar_sdk::endpoints::{Req, Res};
use magnetar_sdk::types::notification::NotificationType;
use std::collections::HashMap;
use std::sync::Arc;
use strum::IntoEnumIterator;
pub async fn handle_user_info_self(
Query(req): Query<Req<GetUserSelf>>,
@ -141,6 +145,40 @@ pub async fn handle_user_by_id_many(
Ok(Json(users_ordered))
}
pub async fn handle_notifications(
Query(NotificationsReq {
ref exclude_types,
include_types,
unread_only,
}): Query<Req<GetNotifications>>,
State(service): State<Arc<MagnetarService>>,
AuthenticatedUser(user): AuthenticatedUser,
mut pagination: Pagination,
) -> Result<(Pagination, Json<Res<GetNotifications>>), ApiError> {
let notification_types = include_types
.unwrap_or_else(|| NotificationType::iter().collect::<Vec<_>>())
.iter()
.filter(|t| {
exclude_types.is_none() || !exclude_types.as_ref().is_some_and(|tt| tt.contains(t))
})
.copied()
.collect::<Vec<_>>();
let ctx = PackingContext::new(service, Some(user.clone())).await?;
let notification_model = NotificationModel;
let notifications = notification_model
.get_notifications(
&ctx,
&user.id,
&notification_types,
unread_only.unwrap_or_default(),
&mut pagination,
)
.await?;
Ok((pagination, Json(notifications)))
}
pub async fn handle_following_self(
Query(_): Query<Req<GetFollowingSelf>>,
State(service): State<Arc<MagnetarService>>,

View File

@ -17,6 +17,7 @@ macro_rules! impl_id {
impl_id!(ck::emoji::Model);
impl_id!(ck::user::Model);
impl_id!(ck::note::Model);
impl_id!(ck::notification::Model);
impl BaseId for ck::poll::Model {
fn get_id(&self) -> &str {

View File

@ -3,5 +3,6 @@ pub mod emoji;
pub mod id;
pub mod instance;
pub mod note;
pub mod notification;
pub mod poll;
pub mod user;

View File

@ -0,0 +1,65 @@
use crate::model::{PackType, PackingContext};
use magnetar_calckey_model::ck;
use magnetar_sdk::types::note::{PackNoteMaybeFull, Reaction};
use magnetar_sdk::types::notification::{
NotificationAppExt, NotificationBase, NotificationNoteExt, NotificationReactionExt,
NotificationUserExt,
};
use magnetar_sdk::types::user::PackUserBase;
impl<'a> PackType<&'a ck::notification::Model> for NotificationBase {
fn extract(_: &PackingContext, data: &'a ck::notification::Model) -> Self {
NotificationBase {
id: data.id.clone(),
created_at: data.created_at.into(),
is_read: data.is_read,
}
}
}
impl<'a> PackType<&'a PackNoteMaybeFull> for NotificationNoteExt {
fn extract(_: &PackingContext, data: &'a PackNoteMaybeFull) -> Self {
NotificationNoteExt { note: data.clone() }
}
}
impl<'a> PackType<&'a PackUserBase> for NotificationUserExt {
fn extract(_: &PackingContext, data: &'a PackUserBase) -> Self {
NotificationUserExt { user: data.clone() }
}
}
impl<'a> PackType<&'a Reaction> for NotificationReactionExt {
fn extract(_: &PackingContext, data: &'a Reaction) -> Self {
NotificationReactionExt {
reaction: data.clone(),
}
}
}
impl<'a> PackType<(&'a ck::notification::Model, &'a ck::access_token::Model)>
for NotificationAppExt
{
fn extract(
_: &PackingContext,
(notification, access_token): (&'a ck::notification::Model, &'a ck::access_token::Model),
) -> Self {
NotificationAppExt {
body: notification.custom_body.clone().unwrap_or_default(),
header: notification
.custom_header
.as_ref()
.or(access_token.name.as_ref())
.map(String::to_string)
.clone()
.unwrap_or_default(),
icon: notification
.custom_icon
.as_ref()
.or(access_token.icon_url.as_ref())
.map(String::to_string)
.clone()
.unwrap_or_default(),
}
}
}

View File

@ -1,11 +1,57 @@
use crate::model::processing::PackResult;
use crate::model::processing::{PackError, PackResult};
use crate::model::{PackType, PackingContext};
use either::Either;
use futures_util::TryFutureExt;
use itertools::Itertools;
use magnetar_calckey_model::ck;
use magnetar_calckey_model::emoji::EmojiTag;
use magnetar_common::util::{parse_reaction, RawReaction};
use magnetar_sdk::types::emoji::{EmojiBase, PackEmojiBase};
use magnetar_sdk::types::note::{Reaction, ReactionShortcode, ReactionUnicode, ReactionUnknown};
use magnetar_sdk::types::Id;
use magnetar_sdk::{Packed, Required};
use std::sync::Arc;
pub fn parse_emoji_or_raw(tag: &str) -> Either<String, RawReaction> {
parse_reaction(tag).map_or_else(|| Either::Left(tag.to_string()), Either::Right)
}
pub fn shortcode_tag_or_none(value: &RawReaction) -> Option<EmojiTag<'_>> {
match value {
RawReaction::Shortcode { shortcode, host } => Some(EmojiTag {
name: shortcode.as_str(),
host: host.as_deref(),
}),
_ => None,
}
}
pub fn resolve_reaction<'a>(
value: RawReaction,
code_lookup: &impl Fn(&str, Option<&str>) -> Option<&'a ck::emoji::Model>,
) -> Reaction {
match value {
RawReaction::Unicode(text) => Reaction::Unicode(ReactionUnicode(text)),
RawReaction::Shortcode { shortcode, host } => code_lookup(&shortcode, host.as_deref())
.map_or_else(
|| {
Reaction::Unknown(ReactionUnknown {
raw: format!(
":{shortcode}{}:",
host.as_deref().map(|h| format!("@{h}")).unwrap_or_default()
),
})
},
|e| {
Reaction::Shortcode(ReactionShortcode {
name: shortcode.clone(),
host: host.clone(),
url: e.public_url.clone(),
})
},
),
}
}
pub struct EmojiModel;
@ -48,4 +94,78 @@ impl EmojiModel {
.take(ctx.limits.max_emojis)
.collect::<Vec<_>>()
}
pub async fn resolve_reaction(
&self,
ctx: &PackingContext,
reaction: &str,
) -> PackResult<Reaction> {
let parsed = parse_emoji_or_raw(reaction);
Ok(match parsed {
Either::Left(raw) => Reaction::Unknown(ReactionUnknown { raw }),
Either::Right(raw) => {
let reaction_fetched = match shortcode_tag_or_none(&raw) {
Some(tag) => {
ctx.service
.emoji_cache
.get(tag.name, tag.host)
.map_err(PackError::from)
.await?
}
None => None,
};
let reaction_ref = reaction_fetched.as_ref().map(Arc::as_ref);
resolve_reaction(raw, &move |_, _| reaction_ref)
}
})
}
pub async fn resolve_reactions_many(
&self,
ctx: &PackingContext,
reactions_raw: &[String],
) -> PackResult<Vec<Reaction>> {
let reactions_parsed = reactions_raw
.iter()
.map(String::as_ref)
.map(parse_emoji_or_raw)
.collect::<Vec<_>>();
// Pick out all successfully-parsed shortcode emojis
let reactions_to_resolve = reactions_parsed
.iter()
.map(Either::as_ref)
.filter_map(Either::right)
.filter_map(shortcode_tag_or_none)
.collect::<Vec<_>>();
let reactions_fetched = ctx
.service
.emoji_cache
.get_many_tagged(&reactions_to_resolve)
.map_err(PackError::from)
.await?;
// Left reactions and the Right ones that didn't resolve to any emoji are turned back into Unknown
let reactions_resolved = reactions_parsed
.into_iter()
.map(|val| {
val.either(
|raw| Reaction::Unknown(ReactionUnknown { raw }),
|raw| {
resolve_reaction(raw, &|shortcode, host| {
reactions_fetched
.iter()
.find(|e| e.host.as_deref() == host && e.name == shortcode)
.map(Arc::as_ref)
})
},
)
})
.collect::<Vec<_>>();
Ok(reactions_resolved)
}
}

View File

@ -10,6 +10,7 @@ use thiserror::Error;
pub mod drive;
pub mod emoji;
pub mod note;
pub mod notification;
pub mod user;
#[derive(Debug, Error, strum::IntoStaticStr)]
@ -18,7 +19,7 @@ pub enum PackError {
DbError(#[from] DbErr),
#[error("Calckey database wrapper error: {0}")]
CalckeyDbError(#[from] CalckeyDbError),
#[error("Emoji cache error: {0}")]
#[error("Data error: {0}")]
DataError(String),
#[error("Emoji cache error: {0}")]
EmojiCacheError(#[from] EmojiCacheError),

View File

@ -7,9 +7,8 @@ use crate::model::{PackType, PackingContext};
use compact_str::CompactString;
use either::Either;
use futures_util::future::{try_join_all, BoxFuture};
use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use magnetar_calckey_model::ck::sea_orm_active_enums::NoteVisibilityEnum;
use magnetar_calckey_model::emoji::EmojiTag;
use magnetar_calckey_model::model_ext::AliasColumnExt;
use magnetar_calckey_model::note_model::data::{
sub_interaction_reaction, sub_interaction_renote, NoteData,
@ -20,14 +19,12 @@ use magnetar_calckey_model::sea_orm::sea_query::{PgFunc, Query, SimpleExpr};
use magnetar_calckey_model::sea_orm::{ActiveEnum, ColumnTrait, Iden, IntoSimpleExpr};
use magnetar_calckey_model::user_model::UserResolveOptions;
use magnetar_calckey_model::{ck, CalckeyDbError};
use magnetar_common::util::{parse_reaction, RawReaction};
use magnetar_sdk::mmm::Token;
use magnetar_sdk::types::drive::PackDriveFileBase;
use magnetar_sdk::types::emoji::EmojiContext;
use magnetar_sdk::types::note::{
NoteAttachmentExt, NoteBase, NoteDetailExt, NoteSelfContextExt, PackNoteBase,
PackNoteMaybeAttachments, PackNoteMaybeFull, PackPollBase, PollBase, Reaction, ReactionPair,
ReactionShortcode, ReactionUnicode, ReactionUnknown,
PackNoteMaybeAttachments, PackNoteMaybeFull, PackPollBase, PollBase, ReactionPair,
};
use magnetar_sdk::types::user::UserRelationship;
use magnetar_sdk::types::{Id, MmXml};
@ -299,95 +296,38 @@ impl NoteModel {
let shortcodes = emoji_model.deduplicate_emoji(ctx, emoji_extracted);
// Parse the JSON into an ordered map and turn it into a Vec of pairs, parsing the reaction codes
// Failed reaction parses -> Left, Successful ones -> Right
let reactions_raw =
let (reactions_raw, reactions_counts): (Vec<_>, Vec<_>) =
serde_json::Map::<String, serde_json::Value>::deserialize(&note.reactions)?
.into_iter()
.map(|(ref code, count)| {
let reaction = parse_reaction(code)
.map_or_else(|| Either::Left(code.to_string()), Either::Right);
(
reaction,
count,
note_data
.interaction_user_reaction()
.as_ref()
.and_then(|r| r.reaction_name.as_deref())
.map(|r| r == code),
)
})
.map(|(code, count, self_reacted)| {
Ok((code, usize::deserialize(count)?, self_reacted))
})
.filter(|v| !v.as_ref().is_ok_and(|(_, count, _)| *count == 0))
.collect::<Result<Vec<_>, serde_json::Error>>()?;
// Pick out all successfully-parsed shortcode emojis
let reactions_to_resolve = reactions_raw
.filter_map(|(k, v)| Some((k, usize::deserialize(v).ok()?)))
.filter(|(_, count)| *count > 0)
.unzip();
let self_reaction_name = note_data
.interaction_user_reaction()
.as_ref()
.and_then(|r| r.reaction_name.as_deref());
let reactions_has_self_reacted = reactions_raw
.iter()
.map(|(code, _, _)| code)
.map(Either::as_ref)
.filter_map(Either::right)
.filter_map(|c| match c {
RawReaction::Shortcode { shortcode, host } => Some(EmojiTag {
name: shortcode,
host: host.as_deref(),
}),
_ => None,
})
.map(|r| self_reaction_name.map(|srn| srn == r))
.collect::<Vec<_>>();
let reaction_fetch = ctx
.service
.emoji_cache
.get_many_tagged(&reactions_to_resolve)
.map_err(PackError::from);
let emoji_fetch = emoji_model.fetch_many_emojis(
ctx,
&shortcodes,
note_data.user().user().host.as_deref(),
);
let reaction_fetch = emoji_model.resolve_reactions_many(ctx, &reactions_raw);
let (reactions_fetched, emojis) = try_join!(reaction_fetch, emoji_fetch)?;
// Left reactions and the Right ones that didn't resolve to any emoji are turned back into Unknown
let reactions = &reactions_raw
let reactions = reactions_fetched
.into_iter()
.map(|(raw, count, self_reaction)| {
let reaction = raw.either(
|raw| Reaction::Unknown(ReactionUnknown { raw }),
|raw| match raw {
RawReaction::Unicode(text) => Reaction::Unicode(ReactionUnicode(text)),
RawReaction::Shortcode { shortcode, host } => reactions_fetched
.iter()
.find(|e| e.host == host && e.name == shortcode)
.map_or_else(
|| {
Reaction::Unknown(ReactionUnknown {
raw: format!(
":{shortcode}{}:",
host.as_deref()
.map(|h| format!("@{h}"))
.unwrap_or_default()
),
})
},
|e| {
Reaction::Shortcode(ReactionShortcode {
name: shortcode.clone(),
host: host.clone(),
url: e.public_url.clone(),
})
},
),
},
);
match self_reaction {
Some(self_reaction) => {
ReactionPair::WithContext(reaction, count, self_reaction)
}
None => ReactionPair::WithoutContext(reaction, count),
}
.zip(reactions_counts.into_iter())
.zip(reactions_has_self_reacted.into_iter())
.map(|((reaction, count), self_reaction)| match self_reaction {
Some(self_reaction) => ReactionPair::WithContext(reaction, count, self_reaction),
None => ReactionPair::WithoutContext(reaction, count),
})
.collect::<Vec<_>>();
@ -409,7 +349,7 @@ impl NoteModel {
.and_then(Result::ok)
.map(MmXml)
.as_ref(),
reactions,
reactions: &reactions,
user,
emoji_context,
},
@ -506,7 +446,7 @@ impl NoteModel {
)))
}
fn pack_full_single<'b, 'a: 'b>(
pub fn pack_full_single<'b, 'a: 'b>(
&'a self,
ctx: &'a PackingContext,
note: &'b (dyn NoteShapedData<'a> + 'b),

View File

@ -0,0 +1,344 @@
use crate::model::data::id::BaseId;
use crate::model::processing::emoji::EmojiModel;
use crate::model::processing::note::{NoteModel, NoteVisibilityFilterModel};
use crate::model::processing::user::UserModel;
use crate::model::processing::{PackError, PackResult};
use crate::model::{PackType, PackingContext};
use crate::web::pagination::Pagination;
use futures_util::{StreamExt, TryStreamExt};
use magnetar_calckey_model::ck;
use magnetar_calckey_model::ck::sea_orm_active_enums::NotificationTypeEnum;
use magnetar_calckey_model::note_model::NoteResolveOptions;
use magnetar_calckey_model::notification_model::{NotificationData, NotificationResolveOptions};
use magnetar_calckey_model::user_model::UserResolveOptions;
use magnetar_sdk::types::notification::{
NotificationAppExt, NotificationBase, NotificationNoteExt, NotificationReactionExt,
NotificationType, NotificationUserExt, PackNotification, PackNotificationApp,
PackNotificationFollow, PackNotificationFollowRequestAccepted,
PackNotificationFollowRequestReceived, PackNotificationMention, PackNotificationPollEnd,
PackNotificationQuote, PackNotificationReaction, PackNotificationRenote, PackNotificationReply,
};
use magnetar_sdk::types::Id;
use magnetar_sdk::{Packed, Required};
use std::sync::Arc;
impl PackType<&NotificationType> for NotificationTypeEnum {
fn extract(_: &PackingContext, value: &NotificationType) -> Self {
use NotificationType as NT;
use NotificationTypeEnum as NTE;
match value {
NT::Follow => NTE::Follow,
NT::FollowRequestReceived => NTE::ReceiveFollowRequest,
NT::FollowRequestAccepted => NTE::FollowRequestAccepted,
NT::Mention => NTE::Mention,
NT::Reply => NTE::Reply,
NT::Renote => NTE::Renote,
NT::Reaction => NTE::Reaction,
NT::Quote => NTE::Quote,
NT::PollEnd => NTE::PollEnded,
NT::App => NTE::App,
}
}
}
impl PackType<&NotificationTypeEnum> for NotificationType {
fn extract(_: &PackingContext, value: &NotificationTypeEnum) -> Self {
use NotificationType as NT;
use NotificationTypeEnum as NTE;
match value {
NTE::Follow => NT::Follow,
NTE::ReceiveFollowRequest => NT::FollowRequestReceived,
NTE::FollowRequestAccepted => NT::FollowRequestAccepted,
NTE::Mention => NT::Mention,
NTE::Reply => NT::Reply,
NTE::Renote => NT::Renote,
NTE::Reaction => NT::Reaction,
NTE::Quote => NT::Quote,
NTE::PollEnded => NT::PollEnd,
NTE::App => NT::App,
}
}
}
pub struct NotificationModel;
impl NotificationModel {
async fn pack_notification_single(
&self,
ctx: &PackingContext,
notification_data: &NotificationData,
note_model: &NoteModel,
user_model: &UserModel,
emoji_model: &EmojiModel,
) -> PackResult<PackNotification> {
let notification_type =
NotificationType::extract(ctx, &notification_data.notification.r#type);
let id = Required(Id::from(&notification_data.notification.id));
let base = Required(NotificationBase::extract(
ctx,
&notification_data.notification,
));
let notifier = notification_data
.notifier
.as_ref()
.ok_or_else(|| PackError::DataError("Missing notification user".to_string()));
let note = notification_data
.notification_note
.as_ref()
.ok_or_else(|| PackError::DataError("Missing notification note".to_string()));
let access_token = notification_data
.access_token
.as_ref()
.ok_or_else(|| PackError::DataError("Missing notification access token".to_string()));
Ok(match notification_type {
NotificationType::Follow => {
PackNotification::Follow(PackNotificationFollow::pack_from((
id,
base,
Required(NotificationUserExt::extract(
ctx,
&user_model.base_from_existing(ctx, &notifier?).await?,
)),
)))
}
NotificationType::FollowRequestReceived => PackNotification::FollowRequestReceived(
PackNotificationFollowRequestReceived::pack_from((
id,
base,
Required(NotificationUserExt::extract(
ctx,
&user_model.base_from_existing(ctx, &notifier?).await?,
)),
)),
),
NotificationType::FollowRequestAccepted => PackNotification::FollowRequestAccepted(
PackNotificationFollowRequestAccepted::pack_from((
id,
base,
Required(NotificationUserExt::extract(
ctx,
&user_model.base_from_existing(ctx, &notifier?).await?,
)),
)),
),
NotificationType::Mention => {
PackNotification::Mention(PackNotificationMention::pack_from((
id,
base,
Required(NotificationNoteExt::extract(
ctx,
&note_model.pack_full_single(ctx, &note?).await?,
)),
)))
}
NotificationType::Reply => PackNotification::Reply(PackNotificationReply::pack_from((
id,
base,
Required(NotificationNoteExt::extract(
ctx,
&note_model.pack_full_single(ctx, &note?).await?,
)),
))),
NotificationType::Renote => {
PackNotification::Renote(PackNotificationRenote::pack_from((
id,
base,
Required(NotificationNoteExt::extract(
ctx,
&note_model.pack_full_single(ctx, &note?).await?,
)),
)))
}
NotificationType::Reaction => {
PackNotification::Reaction(PackNotificationReaction::pack_from((
id,
base,
Required(NotificationNoteExt::extract(
ctx,
&note_model.pack_full_single(ctx, &note?).await?,
)),
Required(NotificationReactionExt::extract(
ctx,
&emoji_model
.resolve_reaction(
ctx,
notification_data
.notification
.reaction
.as_deref()
.ok_or_else(|| {
PackError::DataError(
"Missing notification reaction".to_string(),
)
})?,
)
.await?,
)),
Required(NotificationUserExt::extract(
ctx,
&user_model.base_from_existing(ctx, &notifier?).await?,
)),
)))
}
NotificationType::Quote => PackNotification::Quote(PackNotificationQuote::pack_from((
id,
base,
Required(NotificationNoteExt::extract(
ctx,
&note_model.pack_full_single(ctx, &note?).await?,
)),
))),
NotificationType::PollEnd => {
PackNotification::PollEnd(PackNotificationPollEnd::pack_from((
id,
base,
Required(NotificationNoteExt::extract(
ctx,
&note_model.pack_full_single(ctx, &note?).await?,
)),
)))
}
NotificationType::App => PackNotification::App(PackNotificationApp::pack_from((
id,
base,
Required(NotificationAppExt::extract(
ctx,
(&notification_data.notification, access_token?),
)),
))),
})
}
pub async fn get_notification(
&self,
ctx: &PackingContext,
notification_id: &str,
user_id: &str,
) -> PackResult<Option<PackNotification>> {
let user_resolve_options = UserResolveOptions {
with_avatar: true,
with_banner: false,
with_profile: false,
};
let self_id = ctx.self_user.as_deref().map(ck::user::Model::get_id);
let Some(notification_raw) = ctx
.service
.db
.get_notification_resolver()
.get_single(
&NotificationResolveOptions {
note_options: NoteResolveOptions {
ids: None,
visibility_filter: Arc::new(
NoteVisibilityFilterModel.new_note_visibility_filter(Some(user_id)),
),
time_range: None,
limit: None,
with_reply_target: true,
with_renote_target: true,
with_interactions_from: self_id.map(str::to_string),
only_pins_from: None,
user_options: user_resolve_options.clone(),
},
user_options: user_resolve_options,
},
notification_id,
)
.await?
else {
return Ok(None);
};
let note_model = NoteModel {
with_context: true,
attachments: false,
};
let user_model = UserModel;
let emoji_model = EmojiModel;
let notification = self
.pack_notification_single(
ctx,
&notification_raw,
&note_model,
&user_model,
&emoji_model,
)
.await?;
Ok(Some(notification))
}
pub async fn get_notifications(
&self,
ctx: &PackingContext,
id: &str,
notification_types: &[NotificationType],
unread_only: bool,
pagination: &mut Pagination,
) -> PackResult<Vec<PackNotification>> {
let user_resolve_options = UserResolveOptions {
with_avatar: true,
with_banner: false,
with_profile: false,
};
let self_id = ctx.self_user.as_deref().map(ck::user::Model::get_id);
let notifications_raw = ctx
.service
.db
.get_notification_resolver()
.get(
&NotificationResolveOptions {
note_options: NoteResolveOptions {
ids: None,
visibility_filter: Arc::new(
NoteVisibilityFilterModel.new_note_visibility_filter(Some(id)),
),
time_range: None,
limit: None,
with_reply_target: true,
with_renote_target: true,
with_interactions_from: self_id.map(str::to_string),
only_pins_from: None,
user_options: user_resolve_options.clone(),
},
user_options: user_resolve_options,
},
id,
&notification_types
.iter()
.map(|v| NotificationTypeEnum::extract(ctx, v))
.collect::<Vec<_>>(),
unread_only,
&pagination.current,
&mut pagination.prev,
&mut pagination.next,
pagination.limit.into(),
)
.await?;
let note_model = NoteModel {
with_context: true,
attachments: false,
};
let user_model = UserModel;
let emoji_model = EmojiModel;
let fut_iter = notifications_raw
.iter()
.map(|n| self.pack_notification_single(ctx, n, &note_model, &user_model, &emoji_model))
.collect::<Vec<_>>();
let processed = futures::stream::iter(fut_iter)
.buffered(10)
.err_into::<PackError>()
.try_collect::<Vec<_>>()
.await?;
Ok(processed)
}
}

View File

@ -98,13 +98,16 @@ impl EmojiCacheService {
}
drop(read);
let emoji = self
.db
.fetch_many_emojis(&to_resolve, host)
.await?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let emoji = if to_resolve.is_empty() {
Vec::new()
} else {
self.db
.fetch_many_emojis(&to_resolve, host)
.await?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>()
};
resolved.extend(emoji.iter().cloned());
let mut write = self.cache.lock().await;
@ -148,13 +151,16 @@ impl EmojiCacheService {
}
drop(read);
let emoji = self
.db
.fetch_many_tagged_emojis(&to_resolve)
.await?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>();
let emoji = if to_resolve.is_empty() {
Vec::new()
} else {
self.db
.fetch_many_tagged_emojis(&to_resolve)
.await?
.into_iter()
.map(Arc::new)
.collect::<Vec<_>>()
};
resolved.extend(emoji.iter().cloned());
let mut write = self.cache.lock().await;

View File

@ -1,6 +1,7 @@
use crate::web::ApiError;
use lru::LruCache;
use magnetar_calckey_model::{ck, CalckeyDbError, CalckeyModel};
use magnetar_common::config::MagnetarConfig;
use std::sync::Arc;
use std::time::{Duration, Instant};
use strum::EnumVariantNames;
@ -44,10 +45,16 @@ pub struct RemoteInstanceCacheService {
cache: Mutex<LruCache<String, CacheEntry>>,
lifetime_max: Duration,
db: CalckeyModel,
config: &'static MagnetarConfig,
}
impl RemoteInstanceCacheService {
pub(super) fn new(db: CalckeyModel, cache_size: usize, entry_lifetime: Duration) -> Self {
pub(super) fn new(
db: CalckeyModel,
config: &'static MagnetarConfig,
cache_size: usize,
entry_lifetime: Duration,
) -> Self {
const CACHE_SIZE: usize = 256;
Self {
@ -58,6 +65,7 @@ impl RemoteInstanceCacheService {
)),
lifetime_max: entry_lifetime,
db,
config,
}
}
@ -65,6 +73,10 @@ impl RemoteInstanceCacheService {
&self,
host: &str,
) -> Result<Option<Arc<ck::instance::Model>>, RemoteInstanceCacheError> {
if host == self.config.networking.host {
return Ok(None);
}
let mut read = self.cache.lock().await;
if let Some(item) = read.peek(host) {
if item.created + self.lifetime_max >= Instant::now() {

View File

@ -51,6 +51,7 @@ impl MagnetarService {
let emoji_cache = emoji_cache::EmojiCacheService::new(db.clone());
let remote_instance_cache = instance_cache::RemoteInstanceCacheService::new(
db.clone(),
config,
256,
Duration::from_secs(100),
);

View File

@ -6,6 +6,8 @@ use magnetar_calckey_model::{CalckeyCacheError, CalckeyDbError};
use magnetar_common::util::FediverseTagParseError;
use serde::Serialize;
use serde_json::json;
use std::fmt::{Display, Formatter};
use thiserror::Error;
pub mod auth;
pub mod pagination;
@ -36,13 +38,23 @@ impl ErrorCode {
}
}
#[derive(Debug)]
#[derive(Debug, Error)]
pub struct ApiError {
pub status: StatusCode,
pub code: ErrorCode,
pub message: String,
}
impl Display for ApiError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ApiError[status = \"{}\", code = \"{:?}\"]: \"{}\"",
self.status, self.code, self.message
)
}
}
#[derive(Debug)]
pub struct AccessForbidden(pub String);