Normalized channel message format
This commit is contained in:
parent
f9ab6d7c65
commit
0409015c96
|
@ -65,9 +65,9 @@ export default abstract class Channel {
|
|||
public onMessage?(type: string, body: any): void;
|
||||
|
||||
protected withPackedNote(
|
||||
callback: (note: Packed<"Note">) => void,
|
||||
): (Note) => void {
|
||||
return async (note: Note) => {
|
||||
callback: (noteMessage: { type: "note", body: Packed<"Note"> }) => void,
|
||||
): (noteMessage: { type: "note", body: Note }) => void {
|
||||
return async ({body: note}) => {
|
||||
try {
|
||||
// because `note` was previously JSON.stringify'ed, the fields that
|
||||
// were objects before are now strings and have to be restored or
|
||||
|
@ -79,7 +79,7 @@ export default abstract class Channel {
|
|||
|
||||
const packed = await Notes.pack(note, this.user, { detail: true });
|
||||
|
||||
callback(packed);
|
||||
callback({ type: "note", body: packed });
|
||||
} catch (err) {
|
||||
if (
|
||||
err instanceof IdentifiableError &&
|
||||
|
|
|
@ -34,7 +34,7 @@ export default class extends Channel {
|
|||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
private async onNote({body: note}: { type: "note", body: Packed<"Note"> }) {
|
||||
if (note.visibility !== "public") return;
|
||||
|
||||
// 関係ない返信は除外
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
import Channel from "../channel.js";
|
||||
import { normalizeForSearch } from "@/misc/normalize-for-search.js";
|
||||
import { isUserRelated } from "@/misc/is-user-related.js";
|
||||
import type { Packed } from "@/misc/schema.js";
|
||||
import {normalizeForSearch} from "@/misc/normalize-for-search.js";
|
||||
import {isUserRelated} from "@/misc/is-user-related.js";
|
||||
import type {Packed} from "@/misc/schema.js";
|
||||
|
||||
export default class extends Channel {
|
||||
public readonly chName = "hashtag";
|
||||
public static shouldShare = false;
|
||||
public static requireCredential = false;
|
||||
public readonly chName = "hashtag";
|
||||
private q: string[][];
|
||||
|
||||
constructor(id: string, connection: Channel["connection"]) {
|
||||
|
@ -23,7 +23,12 @@ export default class extends Channel {
|
|||
this.subscriber.on("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
public dispose() {
|
||||
// Unsubscribe events
|
||||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote({body: note}: {type: "note", body: Packed<"Note">}) {
|
||||
if (note.visibility === "hidden") return;
|
||||
const noteTags = note.tags
|
||||
? note.tags.map((t: string) => t.toLowerCase())
|
||||
|
@ -44,9 +49,4 @@ export default class extends Channel {
|
|||
|
||||
this.send("note", note);
|
||||
}
|
||||
|
||||
public dispose() {
|
||||
// Unsubscribe events
|
||||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ export default class extends Channel {
|
|||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) {
|
||||
if (note.visibility === "hidden") return;
|
||||
|
||||
// Filter away notes that are not authored by the user or any of the followed users
|
||||
|
|
|
@ -36,7 +36,7 @@ export default class extends Channel {
|
|||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
private async onNote({ body: note }: { type: "note", "body": Packed<"Note"> }) {
|
||||
if (note.visibility === "hidden")
|
||||
return;
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ export default class extends Channel {
|
|||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) {
|
||||
if (note.user.host !== null) return;
|
||||
if (note.visibility !== "public") return;
|
||||
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
import Channel from "../channel.js";
|
||||
import { fetchMeta } from "@/misc/fetch-meta.js";
|
||||
import { getWordHardMute } from "@/misc/check-word-mute.js";
|
||||
import { isUserRelated } from "@/misc/is-user-related.js";
|
||||
import { isInstanceMuted } from "@/misc/is-instance-muted.js";
|
||||
import type { Packed } from "@/misc/schema.js";
|
||||
import {fetchMeta} from "@/misc/fetch-meta.js";
|
||||
import {getWordHardMute} from "@/misc/check-word-mute.js";
|
||||
import {isUserRelated} from "@/misc/is-user-related.js";
|
||||
import {isInstanceMuted} from "@/misc/is-instance-muted.js";
|
||||
import type {Packed} from "@/misc/schema.js";
|
||||
|
||||
export default class extends Channel {
|
||||
public readonly chName = "recommendedTimeline";
|
||||
public static shouldShare = true;
|
||||
public static requireCredential = true;
|
||||
public readonly chName = "recommendedTimeline";
|
||||
private withReplies: boolean;
|
||||
|
||||
constructor(id: string, connection: Channel["connection"]) {
|
||||
|
@ -31,7 +31,12 @@ export default class extends Channel {
|
|||
this.subscriber.on("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
public dispose() {
|
||||
// Unsubscribe events
|
||||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
|
||||
private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) {
|
||||
if (note.visibility === "hidden") return;
|
||||
// チャンネルの投稿ではなく、自分自身の投稿 または
|
||||
// チャンネルの投稿ではなく、その投稿のユーザーをフォローしている または
|
||||
|
@ -90,9 +95,4 @@ export default class extends Channel {
|
|||
|
||||
this.send("note", note);
|
||||
}
|
||||
|
||||
public dispose() {
|
||||
// Unsubscribe events
|
||||
this.subscriber.off("notesStream", this.onNote);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
import Channel from "../channel.js";
|
||||
import { UserListJoinings, UserLists } from "@/models/index.js";
|
||||
import type { User } from "@/models/entities/user.js";
|
||||
import { isUserRelated } from "@/misc/is-user-related.js";
|
||||
import type { Packed } from "@/misc/schema.js";
|
||||
import {UserListJoinings, UserLists} from "@/models/index.js";
|
||||
import type {User} from "@/models/entities/user.js";
|
||||
import {isUserRelated} from "@/misc/is-user-related.js";
|
||||
import type {Packed} from "@/misc/schema.js";
|
||||
|
||||
export default class extends Channel {
|
||||
public readonly chName = "userList";
|
||||
public static shouldShare = false;
|
||||
public static requireCredential = false;
|
||||
private listId: string;
|
||||
public readonly chName = "userList";
|
||||
public listUsers: User["id"][] = [];
|
||||
private listId: string;
|
||||
private listUsersClock: NodeJS.Timer;
|
||||
|
||||
constructor(id: string, connection: Channel["connection"]) {
|
||||
|
@ -37,6 +37,14 @@ export default class extends Channel {
|
|||
this.listUsersClock = setInterval(this.updateListUsers, 5000);
|
||||
}
|
||||
|
||||
public dispose() {
|
||||
// Unsubscribe events
|
||||
this.subscriber.off(`userListStream:${this.listId}`, this.send);
|
||||
this.subscriber.off("notesStream", this.onNote);
|
||||
|
||||
clearInterval(this.listUsersClock);
|
||||
}
|
||||
|
||||
private async updateListUsers() {
|
||||
const users = await UserListJoinings.find({
|
||||
where: {
|
||||
|
@ -48,7 +56,7 @@ export default class extends Channel {
|
|||
this.listUsers = users.map((x) => x.userId);
|
||||
}
|
||||
|
||||
private async onNote(note: Packed<"Note">) {
|
||||
private async onNote({ body: note }: { type: "note", body: Packed<"Note"> }) {
|
||||
if (note.visibility === "hidden") return;
|
||||
if (!this.listUsers.includes(note.userId)) return;
|
||||
|
||||
|
@ -61,12 +69,4 @@ export default class extends Channel {
|
|||
|
||||
this.send("note", note);
|
||||
}
|
||||
|
||||
public dispose() {
|
||||
// Unsubscribe events
|
||||
this.subscriber.off(`userListStream:${this.listId}`, this.send);
|
||||
this.subscriber.off("notesStream", this.onNote);
|
||||
|
||||
clearInterval(this.listUsersClock);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -213,7 +213,7 @@ export type StreamMessages = {
|
|||
};
|
||||
notes: {
|
||||
name: "notesStream";
|
||||
payload: Note;
|
||||
payload: EventUnionFromDictionary<{ note: Note }>;
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
import type * as http from "node:http";
|
||||
import { EventEmitter } from "events";
|
||||
import type { ParsedUrlQuery } from "querystring";
|
||||
import {EventEmitter} from "events";
|
||||
import type {ParsedUrlQuery} from "querystring";
|
||||
import * as websocket from "websocket";
|
||||
|
||||
import { subscriber as redisClient } from "@/db/redis.js";
|
||||
import { Users } from "@/models/index.js";
|
||||
import {subscriber as redisClient} from "@/db/redis.js";
|
||||
import {Users} from "@/models/index.js";
|
||||
import MainStreamConnection from "./stream/index.js";
|
||||
import authenticate from "./authenticate.js";
|
||||
|
||||
|
|
|
@ -108,7 +108,7 @@ class Publisher {
|
|||
};
|
||||
|
||||
public publishNotesStream = (note: Note): void => {
|
||||
this.publish("notesStream", null, note);
|
||||
this.publish("notesStream", "note", note);
|
||||
};
|
||||
|
||||
public publishAdminStream = <K extends keyof AdminStreamTypes>(
|
||||
|
@ -125,15 +125,10 @@ class Publisher {
|
|||
|
||||
private publish = (
|
||||
channel: StreamChannels,
|
||||
type: string | null,
|
||||
type: string,
|
||||
value?: any,
|
||||
): void => {
|
||||
const message =
|
||||
type == null
|
||||
? value
|
||||
: value == null
|
||||
? { type: type, body: null }
|
||||
: { type: type, body: value };
|
||||
const message = { type: type, body: value };
|
||||
|
||||
redisClient.publish(
|
||||
config.host,
|
||||
|
|
Loading…
Reference in New Issue