118 lines
4.0 KiB
Rust
118 lines
4.0 KiB
Rust
use crate::model::processing::notification::NotificationModel;
|
|
use crate::model::PackingContext;
|
|
use crate::service::MagnetarService;
|
|
use crate::web::auth::AuthenticatedUser;
|
|
use crate::web::ApiError;
|
|
use axum::extract::State;
|
|
use axum::response::sse::{Event, KeepAlive};
|
|
use axum::response::Sse;
|
|
use futures::Stream;
|
|
use futures_util::StreamExt as _;
|
|
use magnetar_model::model_ext::IdShape;
|
|
use magnetar_model::{CalckeySub, MainStreamMessage, SubMessage};
|
|
use magnetar_sdk::types::streaming::ChannelEvent;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
use tracing::{debug, error, trace, warn};
|
|
|
|
pub fn drop_on_close(sub: CalckeySub, tx: mpsc::Sender<MainStreamMessage>) {
|
|
tokio::spawn(async move {
|
|
tx.closed().await;
|
|
drop(sub);
|
|
debug!("Dropped the listener.");
|
|
});
|
|
}
|
|
|
|
pub async fn handle_streaming(
|
|
State(service): State<Arc<MagnetarService>>,
|
|
AuthenticatedUser(self_user): AuthenticatedUser,
|
|
) -> Result<Sse<impl Stream<Item = Result<Event, axum::Error>>>, ApiError> {
|
|
trace!("SSE connection from user `{}` start", self_user.username);
|
|
|
|
let (tx, rx) = mpsc::channel(1024);
|
|
let sub_tx = tx.clone();
|
|
let sub_user_id = self_user.id.clone();
|
|
let sub = service
|
|
.cache
|
|
.conn()
|
|
.await?
|
|
.subscribe(&service.config.networking.host, move |message| {
|
|
let user_id = sub_user_id.clone();
|
|
let tx = sub_tx.clone();
|
|
async move {
|
|
let SubMessage::MainStream(id, msg) = message else {
|
|
return;
|
|
};
|
|
|
|
if id != user_id {
|
|
trace!(
|
|
"Skipping message intended for {} in channel {}",
|
|
id,
|
|
user_id
|
|
);
|
|
return;
|
|
}
|
|
|
|
if let Err(e) = tx.send(msg).await {
|
|
warn!("Failed to send stream channel message: {e}");
|
|
}
|
|
}
|
|
})
|
|
.await?;
|
|
|
|
drop_on_close(sub, tx);
|
|
|
|
let event_counter = Arc::new(AtomicU64::default());
|
|
let stream = ReceiverStream::new(rx).filter_map(move |m| {
|
|
trace!("Processing raw message: {:?}", m);
|
|
|
|
let service = service.clone();
|
|
let self_user = self_user.clone();
|
|
let event_counter = event_counter.clone();
|
|
async move {
|
|
let message = match m {
|
|
MainStreamMessage::Notification(IdShape { id }) => {
|
|
let ctx = PackingContext::new(service, Some(self_user.clone()))
|
|
.await
|
|
.map_err(|e| {
|
|
error!("Failed to create notification packing context: {}", e);
|
|
e
|
|
})
|
|
.ok()?;
|
|
let notification_model = NotificationModel;
|
|
|
|
Some(
|
|
Event::default()
|
|
.id(event_counter.fetch_add(1, Ordering::Relaxed).to_string())
|
|
.event("message")
|
|
.json_data(ChannelEvent::Notification(
|
|
notification_model
|
|
.get_notification(&ctx, &id, &self_user.id)
|
|
.await
|
|
.map_err(|e| {
|
|
error!("Failed to fetch notification: {}", e);
|
|
e
|
|
})
|
|
.ok()
|
|
.flatten()?,
|
|
)),
|
|
)
|
|
}
|
|
};
|
|
|
|
trace!("Sending message: {:?}", message);
|
|
|
|
message
|
|
}
|
|
});
|
|
|
|
Ok(Sse::new(stream).keep_alive(
|
|
KeepAlive::new()
|
|
.interval(Duration::from_secs(2))
|
|
.text("mag-keep-alive"),
|
|
))
|
|
}
|