diff --git a/packages/backend/src/server/api/stream/channel.ts b/packages/backend/src/server/api/stream/channel.ts index 8e14490ebc..f6533c9af2 100644 --- a/packages/backend/src/server/api/stream/channel.ts +++ b/packages/backend/src/server/api/stream/channel.ts @@ -65,9 +65,9 @@ export default abstract class Channel { public onMessage?(type: string, body: any): void; protected withPackedNote( - callback: (note: Packed<"Note">) => void, - ): (Note) => void { - return async (note: Note) => { + callback: (noteMessage: { type: "note", body: Packed<"Note"> }) => void, + ): (noteMessage: { type: "note", body: Note }) => void { + return async ({body: note}) => { try { // because `note` was previously JSON.stringify'ed, the fields that // were objects before are now strings and have to be restored or @@ -79,7 +79,7 @@ export default abstract class Channel { const packed = await Notes.pack(note, this.user, { detail: true }); - callback(packed); + callback({ type: "note", body: packed }); } catch (err) { if ( err instanceof IdentifiableError && diff --git a/packages/backend/src/server/api/stream/channels/global-timeline.ts b/packages/backend/src/server/api/stream/channels/global-timeline.ts index d5ea82f624..baca4cda16 100644 --- a/packages/backend/src/server/api/stream/channels/global-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/global-timeline.ts @@ -34,7 +34,7 @@ export default class extends Channel { this.subscriber.off("notesStream", this.onNote); } - private async onNote(note: Packed<"Note">) { + private async onNote({body: note}: { type: "note", body: Packed<"Note"> }) { if (note.visibility !== "public") return; // 関係ない返信は除外 diff --git a/packages/backend/src/server/api/stream/channels/hashtag.ts b/packages/backend/src/server/api/stream/channels/hashtag.ts index 011bb0889d..2a5e9f1b3d 100644 --- a/packages/backend/src/server/api/stream/channels/hashtag.ts +++ b/packages/backend/src/server/api/stream/channels/hashtag.ts @@ -1,12 +1,12 @@ import Channel from "../channel.js"; -import { normalizeForSearch } from "@/misc/normalize-for-search.js"; -import { isUserRelated } from "@/misc/is-user-related.js"; -import type { Packed } from "@/misc/schema.js"; +import {normalizeForSearch} from "@/misc/normalize-for-search.js"; +import {isUserRelated} from "@/misc/is-user-related.js"; +import type {Packed} from "@/misc/schema.js"; export default class extends Channel { - public readonly chName = "hashtag"; public static shouldShare = false; public static requireCredential = false; + public readonly chName = "hashtag"; private q: string[][]; constructor(id: string, connection: Channel["connection"]) { @@ -23,7 +23,12 @@ export default class extends Channel { this.subscriber.on("notesStream", this.onNote); } - private async onNote(note: Packed<"Note">) { + public dispose() { + // Unsubscribe events + this.subscriber.off("notesStream", this.onNote); + } + + private async onNote({body: note}: {type: "note", body: Packed<"Note">}) { if (note.visibility === "hidden") return; const noteTags = note.tags ? note.tags.map((t: string) => t.toLowerCase()) @@ -44,9 +49,4 @@ export default class extends Channel { this.send("note", note); } - - public dispose() { - // Unsubscribe events - this.subscriber.off("notesStream", this.onNote); - } } diff --git a/packages/backend/src/server/api/stream/channels/home-timeline.ts b/packages/backend/src/server/api/stream/channels/home-timeline.ts index 7e8c24d9ad..fe8ff8274c 100644 --- a/packages/backend/src/server/api/stream/channels/home-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/home-timeline.ts @@ -27,7 +27,7 @@ export default class extends Channel { this.subscriber.off("notesStream", this.onNote); } - private async onNote(note: Packed<"Note">) { + private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) { if (note.visibility === "hidden") return; // Filter away notes that are not authored by the user or any of the followed users diff --git a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts index cf14c73ff7..1aeb5f71fe 100644 --- a/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/hybrid-timeline.ts @@ -36,7 +36,7 @@ export default class extends Channel { this.subscriber.off("notesStream", this.onNote); } - private async onNote(note: Packed<"Note">) { + private async onNote({ body: note }: { type: "note", "body": Packed<"Note"> }) { if (note.visibility === "hidden") return; diff --git a/packages/backend/src/server/api/stream/channels/local-timeline.ts b/packages/backend/src/server/api/stream/channels/local-timeline.ts index 637f9a3e5d..91472a6679 100644 --- a/packages/backend/src/server/api/stream/channels/local-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/local-timeline.ts @@ -33,7 +33,7 @@ export default class extends Channel { this.subscriber.off("notesStream", this.onNote); } - private async onNote(note: Packed<"Note">) { + private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) { if (note.user.host !== null) return; if (note.visibility !== "public") return; diff --git a/packages/backend/src/server/api/stream/channels/recommended-timeline.ts b/packages/backend/src/server/api/stream/channels/recommended-timeline.ts index 0b78d8b66c..6b4e589ad4 100644 --- a/packages/backend/src/server/api/stream/channels/recommended-timeline.ts +++ b/packages/backend/src/server/api/stream/channels/recommended-timeline.ts @@ -1,14 +1,14 @@ import Channel from "../channel.js"; -import { fetchMeta } from "@/misc/fetch-meta.js"; -import { getWordHardMute } from "@/misc/check-word-mute.js"; -import { isUserRelated } from "@/misc/is-user-related.js"; -import { isInstanceMuted } from "@/misc/is-instance-muted.js"; -import type { Packed } from "@/misc/schema.js"; +import {fetchMeta} from "@/misc/fetch-meta.js"; +import {getWordHardMute} from "@/misc/check-word-mute.js"; +import {isUserRelated} from "@/misc/is-user-related.js"; +import {isInstanceMuted} from "@/misc/is-instance-muted.js"; +import type {Packed} from "@/misc/schema.js"; export default class extends Channel { - public readonly chName = "recommendedTimeline"; public static shouldShare = true; public static requireCredential = true; + public readonly chName = "recommendedTimeline"; private withReplies: boolean; constructor(id: string, connection: Channel["connection"]) { @@ -31,7 +31,12 @@ export default class extends Channel { this.subscriber.on("notesStream", this.onNote); } - private async onNote(note: Packed<"Note">) { + public dispose() { + // Unsubscribe events + this.subscriber.off("notesStream", this.onNote); + } + + private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) { if (note.visibility === "hidden") return; // チャンネルの投稿ではなく、自分自身の投稿 または // チャンネルの投稿ではなく、その投稿のユーザーをフォローしている または @@ -90,9 +95,4 @@ export default class extends Channel { this.send("note", note); } - - public dispose() { - // Unsubscribe events - this.subscriber.off("notesStream", this.onNote); - } } diff --git a/packages/backend/src/server/api/stream/channels/user-list.ts b/packages/backend/src/server/api/stream/channels/user-list.ts index 0b52f69127..3acfa53d94 100644 --- a/packages/backend/src/server/api/stream/channels/user-list.ts +++ b/packages/backend/src/server/api/stream/channels/user-list.ts @@ -1,15 +1,15 @@ import Channel from "../channel.js"; -import { UserListJoinings, UserLists } from "@/models/index.js"; -import type { User } from "@/models/entities/user.js"; -import { isUserRelated } from "@/misc/is-user-related.js"; -import type { Packed } from "@/misc/schema.js"; +import {UserListJoinings, UserLists} from "@/models/index.js"; +import type {User} from "@/models/entities/user.js"; +import {isUserRelated} from "@/misc/is-user-related.js"; +import type {Packed} from "@/misc/schema.js"; export default class extends Channel { - public readonly chName = "userList"; public static shouldShare = false; public static requireCredential = false; - private listId: string; + public readonly chName = "userList"; public listUsers: User["id"][] = []; + private listId: string; private listUsersClock: NodeJS.Timer; constructor(id: string, connection: Channel["connection"]) { @@ -37,6 +37,14 @@ export default class extends Channel { this.listUsersClock = setInterval(this.updateListUsers, 5000); } + public dispose() { + // Unsubscribe events + this.subscriber.off(`userListStream:${this.listId}`, this.send); + this.subscriber.off("notesStream", this.onNote); + + clearInterval(this.listUsersClock); + } + private async updateListUsers() { const users = await UserListJoinings.find({ where: { @@ -48,7 +56,7 @@ export default class extends Channel { this.listUsers = users.map((x) => x.userId); } - private async onNote(note: Packed<"Note">) { + private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) { if (note.visibility === "hidden") return; if (!this.listUsers.includes(note.userId)) return; @@ -61,12 +69,4 @@ export default class extends Channel { this.send("note", note); } - - public dispose() { - // Unsubscribe events - this.subscriber.off(`userListStream:${this.listId}`, this.send); - this.subscriber.off("notesStream", this.onNote); - - clearInterval(this.listUsersClock); - } } diff --git a/packages/backend/src/server/api/stream/types.ts b/packages/backend/src/server/api/stream/types.ts index 108501f872..7e968a2a5d 100644 --- a/packages/backend/src/server/api/stream/types.ts +++ b/packages/backend/src/server/api/stream/types.ts @@ -213,7 +213,7 @@ export type StreamMessages = { }; notes: { name: "notesStream"; - payload: Note; + payload: EventUnionFromDictionary<{ note: Note }>; }; }; diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts index 14e07b7487..19e608e354 100644 --- a/packages/backend/src/server/api/streaming.ts +++ b/packages/backend/src/server/api/streaming.ts @@ -1,10 +1,10 @@ import type * as http from "node:http"; -import { EventEmitter } from "events"; -import type { ParsedUrlQuery } from "querystring"; +import {EventEmitter} from "events"; +import type {ParsedUrlQuery} from "querystring"; import * as websocket from "websocket"; -import { subscriber as redisClient } from "@/db/redis.js"; -import { Users } from "@/models/index.js"; +import {subscriber as redisClient} from "@/db/redis.js"; +import {Users} from "@/models/index.js"; import MainStreamConnection from "./stream/index.js"; import authenticate from "./authenticate.js"; diff --git a/packages/backend/src/services/stream.ts b/packages/backend/src/services/stream.ts index 280cc67e05..2c1728180e 100644 --- a/packages/backend/src/services/stream.ts +++ b/packages/backend/src/services/stream.ts @@ -108,7 +108,7 @@ class Publisher { }; public publishNotesStream = (note: Note): void => { - this.publish("notesStream", null, note); + this.publish("notesStream", "note", note); }; public publishAdminStream = ( @@ -125,15 +125,10 @@ class Publisher { private publish = ( channel: StreamChannels, - type: string | null, + type: string, value?: any, ): void => { - const message = - type == null - ? value - : value == null - ? { type: type, body: null } - : { type: type, body: value }; + const message = { type: type, body: value }; redisClient.publish( config.host,