perf: Reuse redis connections where possible (#1157)
* reuse redis connections where possible * redis -> ioredis
This commit is contained in:
@ -95,6 +95,7 @@
|
||||
"immutable": "^3.8.2",
|
||||
"imports-loader": "0.6.5",
|
||||
"invariant": "^2.2.2",
|
||||
"ioredis": "^4.14.1",
|
||||
"isomorphic-fetch": "2.2.1",
|
||||
"js-search": "^1.4.2",
|
||||
"json-loader": "0.5.4",
|
||||
@ -138,7 +139,6 @@
|
||||
"react-portal": "^4.0.0",
|
||||
"react-router-dom": "^5.1.2",
|
||||
"react-waypoint": "^9.0.2",
|
||||
"redis": "^2.6.2",
|
||||
"rich-markdown-editor": "^9.11.1",
|
||||
"sequelize": "^5.21.1",
|
||||
"sequelize-cli": "^5.5.0",
|
||||
|
@ -1,5 +1,5 @@
|
||||
// @flow
|
||||
import Queue from 'bull';
|
||||
import { createQueue } from './utils/queue';
|
||||
import services from './services';
|
||||
|
||||
export type UserEvent =
|
||||
@ -93,8 +93,8 @@ export type Event =
|
||||
| CollectionEvent
|
||||
| IntegrationEvent;
|
||||
|
||||
const globalEventsQueue = new Queue('global events', process.env.REDIS_URL);
|
||||
const serviceEventsQueue = new Queue('service events', process.env.REDIS_URL);
|
||||
const globalEventsQueue = createQueue('global events');
|
||||
const serviceEventsQueue = createQueue('service events');
|
||||
|
||||
// this queue processes global events and hands them off to service hooks
|
||||
globalEventsQueue.process(async job => {
|
||||
|
@ -1,17 +1,14 @@
|
||||
// @flow
|
||||
import { promisify } from 'util';
|
||||
import http from 'http';
|
||||
import IO from 'socket.io';
|
||||
import SocketAuth from 'socketio-auth';
|
||||
import socketRedisAdapter from 'socket.io-redis';
|
||||
import { getUserForJWT } from './utils/jwt';
|
||||
import { Document, Collection, View } from './models';
|
||||
import { client } from './redis';
|
||||
import { client, subscriber } from './redis';
|
||||
import app from './app';
|
||||
import policy from './policies';
|
||||
|
||||
const redisHget = promisify(client.hget).bind(client);
|
||||
const redisHset = promisify(client.hset).bind(client);
|
||||
const server = http.createServer(app.callback());
|
||||
let io;
|
||||
|
||||
@ -24,7 +21,12 @@ if (process.env.WEBSOCKETS_ENABLED === 'true') {
|
||||
cookie: false,
|
||||
});
|
||||
|
||||
io.adapter(socketRedisAdapter(process.env.REDIS_URL));
|
||||
io.adapter(
|
||||
socketRedisAdapter({
|
||||
pubClient: client,
|
||||
subClient: subscriber,
|
||||
})
|
||||
);
|
||||
|
||||
SocketAuth(io, {
|
||||
authenticate: async (socket, data, callback) => {
|
||||
@ -36,7 +38,7 @@ if (process.env.WEBSOCKETS_ENABLED === 'true') {
|
||||
|
||||
// store the mapping between socket id and user id in redis
|
||||
// so that it is accessible across multiple server nodes
|
||||
await redisHset(socket.id, 'userId', user.id);
|
||||
await client.hset(socket.id, 'userId', user.id);
|
||||
|
||||
return callback(null, true);
|
||||
} catch (err) {
|
||||
@ -107,7 +109,7 @@ if (process.env.WEBSOCKETS_ENABLED === 'true') {
|
||||
// makes this easy.
|
||||
let userIds = new Map();
|
||||
for (const socketId of sockets) {
|
||||
const userId = await redisHget(socketId, 'userId');
|
||||
const userId = await client.hget(socketId, 'userId');
|
||||
userIds.set(userId, userId);
|
||||
}
|
||||
socket.emit('document.presence', {
|
||||
|
@ -1,12 +1,12 @@
|
||||
// @flow
|
||||
import Queue from 'bull';
|
||||
import debug from 'debug';
|
||||
import mailer from './mailer';
|
||||
import { Collection, Team } from './models';
|
||||
import { archiveCollections } from './utils/zip';
|
||||
import { createQueue } from './utils/queue';
|
||||
|
||||
const log = debug('logistics');
|
||||
const logisticsQueue = new Queue('logistics', process.env.REDIS_URL);
|
||||
const logisticsQueue = createQueue('logistics');
|
||||
const queueOptions = {
|
||||
attempts: 2,
|
||||
removeOnComplete: true,
|
||||
|
@ -4,7 +4,7 @@ import debug from 'debug';
|
||||
import bugsnag from 'bugsnag';
|
||||
import nodemailer from 'nodemailer';
|
||||
import Oy from 'oy-vey';
|
||||
import Queue from 'bull';
|
||||
import { createQueue } from './utils/queue';
|
||||
import { baseStyles } from './emails/components/EmailLayout';
|
||||
import { WelcomeEmail, welcomeEmailText } from './emails/WelcomeEmail';
|
||||
import { ExportEmail, exportEmailText } from './emails/ExportEmail';
|
||||
@ -182,7 +182,7 @@ export class Mailer {
|
||||
const mailer = new Mailer();
|
||||
export default mailer;
|
||||
|
||||
export const mailerQueue = new Queue('email', process.env.REDIS_URL);
|
||||
export const mailerQueue = createQueue('email');
|
||||
|
||||
mailerQueue.process(async (job: EmailJob) => {
|
||||
// $FlowIssue flow doesn't like dynamic values
|
||||
|
@ -1,6 +1,7 @@
|
||||
// @flow
|
||||
import redis from 'redis';
|
||||
import Redis from 'ioredis';
|
||||
|
||||
const client = redis.createClient(process.env.REDIS_URL);
|
||||
const client = new Redis(process.env.REDIS_URL);
|
||||
const subscriber = new Redis(process.env.REDIS_URL);
|
||||
|
||||
export { client };
|
||||
export { client, subscriber };
|
||||
|
19
server/utils/queue.js
Normal file
19
server/utils/queue.js
Normal file
@ -0,0 +1,19 @@
|
||||
// @flow
|
||||
import Redis from 'ioredis';
|
||||
import Queue from 'bull';
|
||||
import { client, subscriber } from '../redis';
|
||||
|
||||
export function createQueue(name: string) {
|
||||
return new Queue(name, {
|
||||
createClient(type) {
|
||||
switch (type) {
|
||||
case 'client':
|
||||
return client;
|
||||
case 'subscriber':
|
||||
return subscriber;
|
||||
default:
|
||||
return new Redis(process.env.REDIS_URL);
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
@ -8181,7 +8181,7 @@ redis-parser@^3.0.0:
|
||||
dependencies:
|
||||
redis-errors "^1.0.0"
|
||||
|
||||
redis@^2.6.2, redis@~2.8.0:
|
||||
redis@~2.8.0:
|
||||
version "2.8.0"
|
||||
resolved "https://registry.yarnpkg.com/redis/-/redis-2.8.0.tgz#202288e3f58c49f6079d97af7a10e1303ae14b02"
|
||||
integrity sha512-M1OkonEQwtRmZv4tEWF2VgpG0JWJ8Fv1PhlgT5+B+uNq2cA3Rt1Yt/ryoR+vQNOQcIEgdCdfH0jr3bDpihAw1A==
|
||||
|
Reference in New Issue
Block a user