magnetar/src/api_v1/streaming.rs

118 lines
4.0 KiB
Rust

use crate::model::processing::notification::NotificationModel;
use crate::model::PackingContext;
use crate::service::MagnetarService;
use crate::web::auth::AuthenticatedUser;
use crate::web::ApiError;
use axum::extract::State;
use axum::response::sse::{Event, KeepAlive};
use axum::response::Sse;
use futures::Stream;
use futures_util::StreamExt as _;
use magnetar_calckey_model::model_ext::IdShape;
use magnetar_calckey_model::{CalckeySub, MainStreamMessage, SubMessage};
use magnetar_sdk::types::streaming::ChannelEvent;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, trace, warn};
pub fn drop_on_close(sub: CalckeySub, tx: mpsc::Sender<MainStreamMessage>) {
tokio::spawn(async move {
tx.closed().await;
drop(sub);
debug!("Dropped the listener.");
});
}
pub async fn handle_streaming(
State(service): State<Arc<MagnetarService>>,
AuthenticatedUser(self_user): AuthenticatedUser,
) -> Result<Sse<impl Stream<Item = Result<Event, axum::Error>>>, ApiError> {
trace!("SSE connection from user `{}` start", self_user.username);
let (tx, rx) = mpsc::channel(1024);
let sub_tx = tx.clone();
let sub_user_id = self_user.id.clone();
let sub = service
.cache
.conn()
.await?
.subscribe(&service.config.networking.host, move |message| {
let user_id = sub_user_id.clone();
let tx = sub_tx.clone();
async move {
let SubMessage::MainStream(id, msg) = message else {
return;
};
if id != user_id {
trace!(
"Skipping message intended for {} in channel {}",
id,
user_id
);
return;
}
if let Err(e) = tx.send(msg).await {
warn!("Failed to send stream channel message: {e}");
}
}
})
.await?;
drop_on_close(sub, tx);
let event_counter = Arc::new(AtomicU64::default());
let stream = ReceiverStream::new(rx).filter_map(move |m| {
trace!("Processing raw message: {:?}", m);
let service = service.clone();
let self_user = self_user.clone();
let event_counter = event_counter.clone();
async move {
let message = match m {
MainStreamMessage::Notification(IdShape { id }) => {
let ctx = PackingContext::new(service, Some(self_user.clone()))
.await
.map_err(|e| {
error!("Failed to create notification packing context: {}", e);
e
})
.ok()?;
let notification_model = NotificationModel;
Some(
Event::default()
.id(event_counter.fetch_add(1, Ordering::Relaxed).to_string())
.event("message")
.json_data(ChannelEvent::Notification(
notification_model
.get_notification(&ctx, &id, &self_user.id)
.await
.map_err(|e| {
error!("Failed to fetch notification: {}", e);
e
})
.ok()
.flatten()?,
)),
)
}
};
trace!("Sending message: {:?}", message);
message
}
});
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(2))
.text("mag-keep-alive"),
))
}