parent
4982ea8f14
commit
d3b3426ebe
|
@ -100,6 +100,9 @@ async function workerMain() {
|
||||||
// start server
|
// start server
|
||||||
await require('./server').default();
|
await require('./server').default();
|
||||||
|
|
||||||
|
// start processor
|
||||||
|
require('./queue').default();
|
||||||
|
|
||||||
if (cluster.isWorker) {
|
if (cluster.isWorker) {
|
||||||
// Send a 'ready' message to parent process
|
// Send a 'ready' message to parent process
|
||||||
process.send('ready');
|
process.send('ready');
|
||||||
|
|
|
@ -1,9 +1,34 @@
|
||||||
|
import * as Queue from 'bee-queue';
|
||||||
|
import config from '../config';
|
||||||
import http from './processors/http';
|
import http from './processors/http';
|
||||||
import { ILocalUser } from '../models/user';
|
import { ILocalUser } from '../models/user';
|
||||||
import Logger from '../misc/logger';
|
import Logger from '../misc/logger';
|
||||||
|
|
||||||
|
const enableQueue = config.redis != null;
|
||||||
|
|
||||||
|
const queue = new Queue('misskey', {
|
||||||
|
redis: {
|
||||||
|
port: config.redis.port,
|
||||||
|
host: config.redis.host,
|
||||||
|
password: config.redis.pass
|
||||||
|
},
|
||||||
|
|
||||||
|
removeOnSuccess: true,
|
||||||
|
removeOnFailure: true,
|
||||||
|
getEvents: false,
|
||||||
|
sendEvents: false,
|
||||||
|
storeJobs: false
|
||||||
|
});
|
||||||
|
|
||||||
export function createHttpJob(data: any) {
|
export function createHttpJob(data: any) {
|
||||||
return http({ data }, () => {});
|
if (enableQueue) {
|
||||||
|
return queue.createJob(data)
|
||||||
|
.retries(4)
|
||||||
|
.backoff('exponential', 16384) // 16s
|
||||||
|
.save();
|
||||||
|
} else {
|
||||||
|
return http({ data }, () => {});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function deliver(user: ILocalUser, content: any, to: any) {
|
export function deliver(user: ILocalUser, content: any, to: any) {
|
||||||
|
@ -18,3 +43,9 @@ export function deliver(user: ILocalUser, content: any, to: any) {
|
||||||
}
|
}
|
||||||
|
|
||||||
export const queueLogger = new Logger('queue');
|
export const queueLogger = new Logger('queue');
|
||||||
|
|
||||||
|
export default function() {
|
||||||
|
if (enableQueue) {
|
||||||
|
queue.process(128, http);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue