chore: Add basic logging of metrics to event queue (#2545)
* chore: Add basic logging of metrics to event queue closes #2524 * Better naming for multiple queue types * Add stalled event
This commit is contained in:
@ -1,10 +1,13 @@
|
|||||||
// @flow
|
// @flow
|
||||||
import Queue from "bull";
|
import Queue from "bull";
|
||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
|
import { snakeCase } from "lodash";
|
||||||
import { client, subscriber } from "../redis";
|
import { client, subscriber } from "../redis";
|
||||||
|
import * as metrics from "../utils/metrics";
|
||||||
|
|
||||||
export function createQueue(name: string) {
|
export function createQueue(name: string) {
|
||||||
return new Queue(name, {
|
const prefix = `queue.${snakeCase(name)}`;
|
||||||
|
const queue = new Queue(name, {
|
||||||
createClient(type) {
|
createClient(type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case "client":
|
case "client":
|
||||||
@ -16,4 +19,27 @@ export function createQueue(name: string) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
queue.on("stalled", () => {
|
||||||
|
metrics.increment(`${prefix}.jobs.stalled`);
|
||||||
|
});
|
||||||
|
|
||||||
|
queue.on("completed", () => {
|
||||||
|
metrics.increment(`${prefix}.jobs.completed`);
|
||||||
|
});
|
||||||
|
|
||||||
|
queue.on("error", () => {
|
||||||
|
metrics.increment(`${prefix}.jobs.errored`);
|
||||||
|
});
|
||||||
|
|
||||||
|
queue.on("failed", () => {
|
||||||
|
metrics.increment(`${prefix}.jobs.failed`);
|
||||||
|
});
|
||||||
|
|
||||||
|
setInterval(async () => {
|
||||||
|
metrics.gauge(`${prefix}.count`, await queue.count());
|
||||||
|
metrics.gauge(`${prefix}.delayed_count`, await queue.getDelayedCount());
|
||||||
|
}, 5 * 1000);
|
||||||
|
|
||||||
|
return queue;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user