From 476b5e03f95d5cf796a98eb5af64a6887095506e Mon Sep 17 00:00:00 2001 From: Tom Moor Date: Tue, 31 Aug 2021 17:41:57 -0700 Subject: [PATCH] perf: Move exports to worker service (#2514) * first pass * fixes * fix: Move export related emails to queue * i18n --- app/scenes/Settings/ImportExport.js | 2 +- server/commands/collectionExporter.js | 44 +++++ server/exporter.js | 188 --------------------- server/mailer.js | 8 +- server/queues/processors/exports.js | 99 +++++++++++ server/routes/api/collections.js | 52 ++---- server/services/worker.js | 2 + server/types.js | 20 ++- shared/i18n/locales/en_US/translation.json | 2 +- 9 files changed, 179 insertions(+), 238 deletions(-) create mode 100644 server/commands/collectionExporter.js delete mode 100644 server/exporter.js create mode 100644 server/queues/processors/exports.js diff --git a/app/scenes/Settings/ImportExport.js b/app/scenes/Settings/ImportExport.js index 27c01155..1f29e6ab 100644 --- a/app/scenes/Settings/ImportExport.js +++ b/app/scenes/Settings/ImportExport.js @@ -184,7 +184,7 @@ function ImportExport() { {t("Export")} }} /> diff --git a/server/commands/collectionExporter.js b/server/commands/collectionExporter.js new file mode 100644 index 00000000..4e48dfbc --- /dev/null +++ b/server/commands/collectionExporter.js @@ -0,0 +1,44 @@ +// @flow +import { Collection, Event, Team, User, FileOperation } from "../models"; +import { getAWSKeyForFileOp } from "../utils/s3"; + +export default async function collectionExporter({ + collection, + team, + user, + ip, +}: { + collection?: Collection, + team: Team, + user: User, + ip: string, +}) { + const collectionId = collection?.id; + const key = getAWSKeyForFileOp(user.teamId, collection?.name || team.name); + + const fileOperation = await FileOperation.create({ + type: "export", + state: "creating", + key, + url: null, + size: 0, + collectionId, + userId: user.id, + teamId: user.teamId, + }); + + // Event is consumed on worker in queues/processors/exports + await Event.create({ + name: collection ? "collections.export" : "collections.export_all", + collectionId, + teamId: user.teamId, + actorId: user.id, + modelId: fileOperation.id, + ip, + }); + + fileOperation.user = user; + fileOperation.collection = collection; + + return fileOperation; +} diff --git a/server/exporter.js b/server/exporter.js deleted file mode 100644 index 8240cf46..00000000 --- a/server/exporter.js +++ /dev/null @@ -1,188 +0,0 @@ -// @flow -import fs from "fs"; -import debug from "debug"; -import mailer from "./mailer"; -import { FileOperation, Collection, Team, Event, User } from "./models"; -import { createQueue } from "./utils/queue"; -import { uploadToS3FromBuffer } from "./utils/s3"; - -const log = debug("exporter"); -const exporterQueue = createQueue("exporter"); -const queueOptions = { - attempts: 2, - removeOnComplete: true, - backoff: { - type: "exponential", - delay: 60 * 1000, - }, -}; - -async function fileOperationsUpdate(teamId, userId, exportData) { - await Event.add({ - name: "fileOperations.update", - teamId: teamId, - actorId: userId, - data: { - type: exportData.type, - id: exportData.id, - state: exportData.state, - size: exportData.size, - collectionId: exportData.collectionId, - createdAt: exportData.createdAt, - }, - }); -} - -type exportAndEmailCollectionsType = {| - teamId: string, - userId: string, - email: string, - fileOperationId: string, - collectionId?: string, -|}; - -// TODO: Refactor to use command pattern -async function exportAndEmailCollections({ - teamId, - userId, - email, - collectionId, - fileOperationId, -}: exportAndEmailCollectionsType) { - log("Archiving team", teamId); - const { archiveCollections } = require("./utils/zip"); - const team = await Team.findByPk(teamId); - const user = await User.findByPk(userId); - - let collections; - if (!collectionId) { - const collectionIds = await user.collectionIds(); - - collections = await Promise.all( - collectionIds.map( - async (collectionId) => await Collection.findByPk(collectionId) - ) - ); - } else { - collections = [await Collection.findByPk(collectionId)]; - } - - let exportData; - let state; - let key; - - exportData = await FileOperation.findByPk(fileOperationId); - state = exportData.state; - key = exportData.key; - await fileOperationsUpdate(teamId, userId, exportData); - - const filePath = await archiveCollections(collections); - - log("Archive path", filePath); - - let url; - try { - const readBuffer = await fs.promises.readFile(filePath); - state = "uploading"; - exportData.state = state; - const stat = await fs.promises.stat(filePath); - exportData.size = stat.size; - - await exportData.save(); - await fileOperationsUpdate(teamId, userId, exportData); - - url = await uploadToS3FromBuffer( - readBuffer, - "application/zip", - key, - "private" - ); - - state = "complete"; - } catch (e) { - log("Failed to export data", e); - state = "error"; - url = null; - } finally { - exportData.state = state; - exportData.url = url; - await exportData.save(); - - await fileOperationsUpdate(teamId, userId, exportData); - - if (collectionId) { - await Event.create({ - name: "collections.export", - collectionId, - teamId: teamId, - actorId: userId, - data: { name: collections[0].name, exportId: exportData.id }, - }); - } else { - const collectionsExported = collections.map((c) => ({ - name: c.name, - id: c.id, - })); - - await Event.create({ - name: "collections.export_all", - teamId: teamId, - actorId: userId, - data: { - exportId: exportData.id, - collections: collectionsExported, - }, - }); - } - - if (state === "error") { - mailer.exportFailure({ - to: email, - teamUrl: team.url, - }); - } else { - mailer.exportSuccess({ - to: email, - id: exportData.id, - teamUrl: team.url, - }); - } - } -} - -exporterQueue.process(async function exportProcessor(job) { - log("Process", job.data); - - switch (job.data.type) { - case "export-collections": - const { teamId, userId, email, collectionId, fileOperationId } = job.data; - return await exportAndEmailCollections({ - teamId, - userId, - email, - fileOperationId, - collectionId, - }); - default: - } -}); - -export const exportCollections = ( - teamId: string, - userId: string, - email: string, - fileOperationId: string, - collectionId?: string -) => { - exporterQueue.add( - { - type: "export-collections", - teamId, - userId, - email, - fileOperationId, - collectionId, - }, - queueOptions - ); -}; diff --git a/server/mailer.js b/server/mailer.js index a81fb580..d470642e 100644 --- a/server/mailer.js +++ b/server/mailer.js @@ -37,7 +37,13 @@ const log = debug("emails"); const useTestEmailService = process.env.NODE_ENV === "development" && !process.env.SMTP_USERNAME; -export type EmailTypes = "welcome" | "export" | "invite" | "signin"; +export type EmailTypes = + | "welcome" + | "export" + | "invite" + | "signin" + | "exportFailure" + | "exportSuccess"; export type EmailSendOptions = { to: string, diff --git a/server/queues/processors/exports.js b/server/queues/processors/exports.js new file mode 100644 index 00000000..083b757c --- /dev/null +++ b/server/queues/processors/exports.js @@ -0,0 +1,99 @@ +// @flow +import fs from "fs"; +import debug from "debug"; +import mailer from "../../mailer"; +import { FileOperation, Collection, Event, Team, User } from "../../models"; +import type { Event as TEvent } from "../../types"; +import { uploadToS3FromBuffer } from "../../utils/s3"; +import { archiveCollections } from "../../utils/zip"; + +const log = debug("commands"); + +export default class ExportsProcessor { + async on(event: TEvent) { + switch (event.name) { + case "collections.export": + case "collections.export_all": + const { actorId, teamId } = event; + const team = await Team.findByPk(teamId); + const user = await User.findByPk(actorId); + let exportData = await FileOperation.findByPk(event.modelId); + + const collectionIds = + event.collectionId || (await user.collectionIds()); + const collections = await Collection.findAll({ + where: { id: collectionIds }, + }); + + this.updateFileOperation(exportData, actorId, teamId, { + state: "creating", + }); + + // heavy lifting of creating the zip file + log(`Archiving collections for file operation ${exportData.id}`); + const filePath = await archiveCollections(collections); + + let url, state; + try { + const readBuffer = await fs.promises.readFile(filePath); + const stat = await fs.promises.stat(filePath); + + this.updateFileOperation(exportData, actorId, teamId, { + state: "uploading", + size: stat.size, + }); + + log(`Uploading archive for file operation ${exportData.id}`); + url = await uploadToS3FromBuffer( + readBuffer, + "application/zip", + exportData.key, + "private" + ); + + log(`Upload complete for file operation ${exportData.id}`); + state = "complete"; + } catch (e) { + log("Failed to export data", e); + state = "error"; + url = null; + } finally { + this.updateFileOperation(exportData, actorId, teamId, { + state, + url, + }); + + if (state === "error") { + mailer.sendTemplate("exportFailure", { + to: user.email, + teamUrl: team.url, + }); + } else { + mailer.sendTemplate("exportSuccess", { + to: user.email, + id: exportData.id, + teamUrl: team.url, + }); + } + } + break; + default: + } + } + + async updateFileOperation( + fileOperation: FileOperation, + actorId: string, + teamId: string, + data: Object + ) { + await fileOperation.update(data); + + await Event.add({ + name: "fileOperations.update", + teamId, + actorId, + data: fileOperation.dataValues, + }); + } +} diff --git a/server/routes/api/collections.js b/server/routes/api/collections.js index 8cbf86e2..9fa99d16 100644 --- a/server/routes/api/collections.js +++ b/server/routes/api/collections.js @@ -1,8 +1,8 @@ // @flow import fractionalIndex from "fractional-index"; import Router from "koa-router"; +import collectionExporter from "../../commands/collectionExporter"; import { ValidationError } from "../../errors"; -import { exportCollections } from "../../exporter"; import auth from "../../middlewares/authentication"; import { Collection, @@ -13,7 +13,6 @@ import { User, Group, Attachment, - FileOperation, } from "../../models"; import policy from "../../policies"; import { @@ -29,7 +28,6 @@ import { Op, sequelize } from "../../sequelize"; import collectionIndexing from "../../utils/collectionIndexing"; import removeIndexCollision from "../../utils/removeIndexCollision"; -import { getAWSKeyForFileOp } from "../../utils/s3"; import pagination from "./middlewares/pagination"; const { authorize } = policy; @@ -465,28 +463,16 @@ router.post("collections.export", auth(), async (ctx) => { ctx.assertPresent(collection, "Collection should be present"); authorize(user, "read", collection); - const key = getAWSKeyForFileOp(team.id, collection.name); - - let exportData; - exportData = await FileOperation.create({ - type: "export", - state: "creating", - key, - url: null, - size: 0, - collectionId: id, - userId: user.id, - teamId: team.id, + const fileOperation = await collectionExporter({ + collection, + user, + team, + ip: ctx.request.ip, }); - exportCollections(user.teamId, user.id, user.email, exportData.id, id); - - exportData.user = user; - exportData.collection = collection; - ctx.body = { success: true, - data: { fileOperation: presentFileOperation(exportData) }, + data: { fileOperation: presentFileOperation(fileOperation) }, }; }); @@ -495,29 +481,15 @@ router.post("collections.export_all", auth(), async (ctx) => { const team = await Team.findByPk(user.teamId); authorize(user, "export", team); - const key = getAWSKeyForFileOp(team.id, team.name); - - let exportData; - exportData = await FileOperation.create({ - type: "export", - state: "creating", - key, - url: null, - size: 0, - collectionId: null, - userId: user.id, - teamId: team.id, + const fileOperation = await collectionExporter({ + user, + team, + ip: ctx.request.ip, }); - // async operation to upload zip archive to cloud and email user with link - exportCollections(user.teamId, user.id, user.email, exportData.id); - - exportData.user = user; - exportData.collection = null; - ctx.body = { success: true, - data: { fileOperation: presentFileOperation(exportData) }, + data: { fileOperation: presentFileOperation(fileOperation) }, }; }); diff --git a/server/services/worker.js b/server/services/worker.js index 928f80ea..ba4da6f7 100644 --- a/server/services/worker.js +++ b/server/services/worker.js @@ -11,6 +11,7 @@ import { import Backlinks from "../queues/processors/backlinks"; import Debouncer from "../queues/processors/debouncer"; import Emails from "../queues/processors/emails"; +import Exports from "../queues/processors/exports"; import Imports from "../queues/processors/imports"; import Notifications from "../queues/processors/notifications"; import Revisions from "../queues/processors/revisions"; @@ -25,6 +26,7 @@ const eventProcessors = { backlinks: new Backlinks(), debouncer: new Debouncer(), imports: new Imports(), + exports: new Exports(), notifications: new Notifications(), revisions: new Revisions(), slack: new Slack(), diff --git a/server/types.js b/server/types.js index 046bb1e5..b3515751 100644 --- a/server/types.js +++ b/server/types.js @@ -116,14 +116,19 @@ export type CollectionImportEvent = { ip: string, }; -export type CollectionExportAll = { +export type CollectionExportEvent = { + name: "collections.export", + teamId: string, + actorId: string, + collectionId: string, + modelId: string, +}; + +export type CollectionExportAllEvent = { name: "collections.export_all", teamId: string, actorId: string, - data: { - exportId: string, - collections: [{ name: string, id: string }], - }, + modelId: string, }; export type FileOperationEvent = { @@ -185,7 +190,8 @@ export type CollectionEvent = sharingChanged: boolean, }, ip: string, - }; + } + | CollectionExportEvent; export type GroupEvent = | { @@ -227,7 +233,7 @@ export type Event = | DocumentEvent | CollectionEvent | CollectionImportEvent - | CollectionExportAll + | CollectionExportAllEvent | FileOperationEvent | IntegrationEvent | GroupEvent diff --git a/shared/i18n/locales/en_US/translation.json b/shared/i18n/locales/en_US/translation.json index 6aa06d7a..63bd46b8 100644 --- a/shared/i18n/locales/en_US/translation.json +++ b/shared/i18n/locales/en_US/translation.json @@ -510,7 +510,7 @@ "Uploading": "Uploading", "Confirm & Import": "Confirm & Import", "Choose File": "Choose File", - "A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started – we will email a link to {{ userEmail }} when it's complete.": "A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started – we will email a link to {{ userEmail }} when it's complete.", + "A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started – we will email a link to {{ userEmail }} when it’s complete.": "A full export might take some time, consider exporting a single document or collection. The exported data is a zip of your documents in Markdown format. You may leave this page once the export has started – we will email a link to {{ userEmail }} when it’s complete.", "Export Requested": "Export Requested", "Requesting Export": "Requesting Export", "Export Data": "Export Data",