perf: Move exports to worker service (#2514)

* first pass

* fixes

* fix: Move export related emails to queue

* i18n
This commit is contained in:
Tom Moor 2021-08-31 17:41:57 -07:00 committed by GitHub
parent 23a6459ae8
commit 476b5e03f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 179 additions and 238 deletions

View File

@ -184,7 +184,7 @@ function ImportExport() {
<Heading>{t("Export")}</Heading>
<HelpText>
<Trans
defaults="A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started we will email a link to <em>{{ userEmail }}</em> when it's complete."
defaults="A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started we will email a link to <em>{{ userEmail }}</em> when its complete."
values={{ userEmail: user.email }}
components={{ em: <strong /> }}
/>

View File

@ -0,0 +1,44 @@
// @flow
import { Collection, Event, Team, User, FileOperation } from "../models";
import { getAWSKeyForFileOp } from "../utils/s3";
export default async function collectionExporter({
collection,
team,
user,
ip,
}: {
collection?: Collection,
team: Team,
user: User,
ip: string,
}) {
const collectionId = collection?.id;
const key = getAWSKeyForFileOp(user.teamId, collection?.name || team.name);
const fileOperation = await FileOperation.create({
type: "export",
state: "creating",
key,
url: null,
size: 0,
collectionId,
userId: user.id,
teamId: user.teamId,
});
// Event is consumed on worker in queues/processors/exports
await Event.create({
name: collection ? "collections.export" : "collections.export_all",
collectionId,
teamId: user.teamId,
actorId: user.id,
modelId: fileOperation.id,
ip,
});
fileOperation.user = user;
fileOperation.collection = collection;
return fileOperation;
}

View File

@ -1,188 +0,0 @@
// @flow
import fs from "fs";
import debug from "debug";
import mailer from "./mailer";
import { FileOperation, Collection, Team, Event, User } from "./models";
import { createQueue } from "./utils/queue";
import { uploadToS3FromBuffer } from "./utils/s3";
const log = debug("exporter");
const exporterQueue = createQueue("exporter");
const queueOptions = {
attempts: 2,
removeOnComplete: true,
backoff: {
type: "exponential",
delay: 60 * 1000,
},
};
async function fileOperationsUpdate(teamId, userId, exportData) {
await Event.add({
name: "fileOperations.update",
teamId: teamId,
actorId: userId,
data: {
type: exportData.type,
id: exportData.id,
state: exportData.state,
size: exportData.size,
collectionId: exportData.collectionId,
createdAt: exportData.createdAt,
},
});
}
type exportAndEmailCollectionsType = {|
teamId: string,
userId: string,
email: string,
fileOperationId: string,
collectionId?: string,
|};
// TODO: Refactor to use command pattern
async function exportAndEmailCollections({
teamId,
userId,
email,
collectionId,
fileOperationId,
}: exportAndEmailCollectionsType) {
log("Archiving team", teamId);
const { archiveCollections } = require("./utils/zip");
const team = await Team.findByPk(teamId);
const user = await User.findByPk(userId);
let collections;
if (!collectionId) {
const collectionIds = await user.collectionIds();
collections = await Promise.all(
collectionIds.map(
async (collectionId) => await Collection.findByPk(collectionId)
)
);
} else {
collections = [await Collection.findByPk(collectionId)];
}
let exportData;
let state;
let key;
exportData = await FileOperation.findByPk(fileOperationId);
state = exportData.state;
key = exportData.key;
await fileOperationsUpdate(teamId, userId, exportData);
const filePath = await archiveCollections(collections);
log("Archive path", filePath);
let url;
try {
const readBuffer = await fs.promises.readFile(filePath);
state = "uploading";
exportData.state = state;
const stat = await fs.promises.stat(filePath);
exportData.size = stat.size;
await exportData.save();
await fileOperationsUpdate(teamId, userId, exportData);
url = await uploadToS3FromBuffer(
readBuffer,
"application/zip",
key,
"private"
);
state = "complete";
} catch (e) {
log("Failed to export data", e);
state = "error";
url = null;
} finally {
exportData.state = state;
exportData.url = url;
await exportData.save();
await fileOperationsUpdate(teamId, userId, exportData);
if (collectionId) {
await Event.create({
name: "collections.export",
collectionId,
teamId: teamId,
actorId: userId,
data: { name: collections[0].name, exportId: exportData.id },
});
} else {
const collectionsExported = collections.map((c) => ({
name: c.name,
id: c.id,
}));
await Event.create({
name: "collections.export_all",
teamId: teamId,
actorId: userId,
data: {
exportId: exportData.id,
collections: collectionsExported,
},
});
}
if (state === "error") {
mailer.exportFailure({
to: email,
teamUrl: team.url,
});
} else {
mailer.exportSuccess({
to: email,
id: exportData.id,
teamUrl: team.url,
});
}
}
}
exporterQueue.process(async function exportProcessor(job) {
log("Process", job.data);
switch (job.data.type) {
case "export-collections":
const { teamId, userId, email, collectionId, fileOperationId } = job.data;
return await exportAndEmailCollections({
teamId,
userId,
email,
fileOperationId,
collectionId,
});
default:
}
});
export const exportCollections = (
teamId: string,
userId: string,
email: string,
fileOperationId: string,
collectionId?: string
) => {
exporterQueue.add(
{
type: "export-collections",
teamId,
userId,
email,
fileOperationId,
collectionId,
},
queueOptions
);
};

View File

@ -37,7 +37,13 @@ const log = debug("emails");
const useTestEmailService =
process.env.NODE_ENV === "development" && !process.env.SMTP_USERNAME;
export type EmailTypes = "welcome" | "export" | "invite" | "signin";
export type EmailTypes =
| "welcome"
| "export"
| "invite"
| "signin"
| "exportFailure"
| "exportSuccess";
export type EmailSendOptions = {
to: string,

View File

@ -0,0 +1,99 @@
// @flow
import fs from "fs";
import debug from "debug";
import mailer from "../../mailer";
import { FileOperation, Collection, Event, Team, User } from "../../models";
import type { Event as TEvent } from "../../types";
import { uploadToS3FromBuffer } from "../../utils/s3";
import { archiveCollections } from "../../utils/zip";
const log = debug("commands");
export default class ExportsProcessor {
async on(event: TEvent) {
switch (event.name) {
case "collections.export":
case "collections.export_all":
const { actorId, teamId } = event;
const team = await Team.findByPk(teamId);
const user = await User.findByPk(actorId);
let exportData = await FileOperation.findByPk(event.modelId);
const collectionIds =
event.collectionId || (await user.collectionIds());
const collections = await Collection.findAll({
where: { id: collectionIds },
});
this.updateFileOperation(exportData, actorId, teamId, {
state: "creating",
});
// heavy lifting of creating the zip file
log(`Archiving collections for file operation ${exportData.id}`);
const filePath = await archiveCollections(collections);
let url, state;
try {
const readBuffer = await fs.promises.readFile(filePath);
const stat = await fs.promises.stat(filePath);
this.updateFileOperation(exportData, actorId, teamId, {
state: "uploading",
size: stat.size,
});
log(`Uploading archive for file operation ${exportData.id}`);
url = await uploadToS3FromBuffer(
readBuffer,
"application/zip",
exportData.key,
"private"
);
log(`Upload complete for file operation ${exportData.id}`);
state = "complete";
} catch (e) {
log("Failed to export data", e);
state = "error";
url = null;
} finally {
this.updateFileOperation(exportData, actorId, teamId, {
state,
url,
});
if (state === "error") {
mailer.sendTemplate("exportFailure", {
to: user.email,
teamUrl: team.url,
});
} else {
mailer.sendTemplate("exportSuccess", {
to: user.email,
id: exportData.id,
teamUrl: team.url,
});
}
}
break;
default:
}
}
async updateFileOperation(
fileOperation: FileOperation,
actorId: string,
teamId: string,
data: Object
) {
await fileOperation.update(data);
await Event.add({
name: "fileOperations.update",
teamId,
actorId,
data: fileOperation.dataValues,
});
}
}

View File

@ -1,8 +1,8 @@
// @flow
import fractionalIndex from "fractional-index";
import Router from "koa-router";
import collectionExporter from "../../commands/collectionExporter";
import { ValidationError } from "../../errors";
import { exportCollections } from "../../exporter";
import auth from "../../middlewares/authentication";
import {
Collection,
@ -13,7 +13,6 @@ import {
User,
Group,
Attachment,
FileOperation,
} from "../../models";
import policy from "../../policies";
import {
@ -29,7 +28,6 @@ import { Op, sequelize } from "../../sequelize";
import collectionIndexing from "../../utils/collectionIndexing";
import removeIndexCollision from "../../utils/removeIndexCollision";
import { getAWSKeyForFileOp } from "../../utils/s3";
import pagination from "./middlewares/pagination";
const { authorize } = policy;
@ -465,28 +463,16 @@ router.post("collections.export", auth(), async (ctx) => {
ctx.assertPresent(collection, "Collection should be present");
authorize(user, "read", collection);
const key = getAWSKeyForFileOp(team.id, collection.name);
let exportData;
exportData = await FileOperation.create({
type: "export",
state: "creating",
key,
url: null,
size: 0,
collectionId: id,
userId: user.id,
teamId: team.id,
const fileOperation = await collectionExporter({
collection,
user,
team,
ip: ctx.request.ip,
});
exportCollections(user.teamId, user.id, user.email, exportData.id, id);
exportData.user = user;
exportData.collection = collection;
ctx.body = {
success: true,
data: { fileOperation: presentFileOperation(exportData) },
data: { fileOperation: presentFileOperation(fileOperation) },
};
});
@ -495,29 +481,15 @@ router.post("collections.export_all", auth(), async (ctx) => {
const team = await Team.findByPk(user.teamId);
authorize(user, "export", team);
const key = getAWSKeyForFileOp(team.id, team.name);
let exportData;
exportData = await FileOperation.create({
type: "export",
state: "creating",
key,
url: null,
size: 0,
collectionId: null,
userId: user.id,
teamId: team.id,
const fileOperation = await collectionExporter({
user,
team,
ip: ctx.request.ip,
});
// async operation to upload zip archive to cloud and email user with link
exportCollections(user.teamId, user.id, user.email, exportData.id);
exportData.user = user;
exportData.collection = null;
ctx.body = {
success: true,
data: { fileOperation: presentFileOperation(exportData) },
data: { fileOperation: presentFileOperation(fileOperation) },
};
});

View File

@ -11,6 +11,7 @@ import {
import Backlinks from "../queues/processors/backlinks";
import Debouncer from "../queues/processors/debouncer";
import Emails from "../queues/processors/emails";
import Exports from "../queues/processors/exports";
import Imports from "../queues/processors/imports";
import Notifications from "../queues/processors/notifications";
import Revisions from "../queues/processors/revisions";
@ -25,6 +26,7 @@ const eventProcessors = {
backlinks: new Backlinks(),
debouncer: new Debouncer(),
imports: new Imports(),
exports: new Exports(),
notifications: new Notifications(),
revisions: new Revisions(),
slack: new Slack(),

View File

@ -116,14 +116,19 @@ export type CollectionImportEvent = {
ip: string,
};
export type CollectionExportAll = {
export type CollectionExportEvent = {
name: "collections.export",
teamId: string,
actorId: string,
collectionId: string,
modelId: string,
};
export type CollectionExportAllEvent = {
name: "collections.export_all",
teamId: string,
actorId: string,
data: {
exportId: string,
collections: [{ name: string, id: string }],
},
modelId: string,
};
export type FileOperationEvent = {
@ -185,7 +190,8 @@ export type CollectionEvent =
sharingChanged: boolean,
},
ip: string,
};
}
| CollectionExportEvent;
export type GroupEvent =
| {
@ -227,7 +233,7 @@ export type Event =
| DocumentEvent
| CollectionEvent
| CollectionImportEvent
| CollectionExportAll
| CollectionExportAllEvent
| FileOperationEvent
| IntegrationEvent
| GroupEvent

View File

@ -510,7 +510,7 @@
"Uploading": "Uploading",
"Confirm & Import": "Confirm & Import",
"Choose File": "Choose File",
"A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started we will email a link to <em>{{ userEmail }}</em> when it's complete.": "A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started we will email a link to <em>{{ userEmail }}</em> when it's complete.",
"A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started we will email a link to <em>{{ userEmail }}</em> when its complete.": "A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started we will email a link to <em>{{ userEmail }}</em> when its complete.",
"Export Requested": "Export Requested",
"Requesting Export": "Requesting Export",
"Export Data": "Export Data",