Skip to content

Commit

Permalink
fix(core-manager): handle stream errors on log generatiton (#4303)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastijankuzner authored Feb 9, 2021
1 parent 4570d9f commit 25d0943
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 55 deletions.
41 changes: 39 additions & 2 deletions __tests__/unit/core-manager/worker/generate-log-gz.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import "jest-extended";

import { Database } from "@packages/core-manager/src/database/database";
import { GenerateLogGz } from "@packages/core-manager/src/workers/generate-log-gz";
import { createReadStream } from "fs-extra";
import { createReadStream, existsSync } from "fs-extra";
import { join } from "path";
import { Readable, Writable } from "stream";
import { dirSync, setGracefulCleanup } from "tmp";
import zlib from "zlib";

Expand All @@ -12,6 +13,18 @@ jest.mock("@packages/core-manager/src/database/database");
const mockLogArray = [{ timestamp: 1607948405, level: "info", content: "log message" }];
const mockIterator = mockLogArray[Symbol.iterator]();

class ReadableError extends Readable {
public constructor() {
super({ objectMode: true });
}

public _read(size: number) {
this.emit("error", new Error("Dummy error"));
}
}

const mockReadable = new ReadableError();

beforeEach(() => {
setGracefulCleanup();

Expand All @@ -27,7 +40,6 @@ afterAll(() => {
describe("Generate Log", () => {
it("should generate log", async () => {
const spyOnBoot = jest.spyOn(Database.prototype, "boot");
// @ts-ignore
const spyOnGetAllIterator = jest.spyOn(Database.prototype, "getAllIterator").mockReturnValue(mockIterator);

// @ts-ignore
Expand All @@ -44,6 +56,7 @@ describe("Generate Log", () => {

expect(spyOnGetAllIterator).toHaveBeenCalled();

// Check data
const stream = createReadStream(join(process.env.CORE_PATH_DATA!, "log-archive", "test.log.gz")).pipe(
zlib.createGunzip(),
);
Expand All @@ -56,4 +69,28 @@ describe("Generate Log", () => {

expect(stream.read().toString()).toEqual("[2020-12-14 12:20:05.000] INFO : log message\n");
});

it("should destroy socket, remove temp files and throw error on pipeline error", async () => {
const spyOnBoot = jest.spyOn(Database.prototype, "boot");
const spyOnGetAllIterator = jest.spyOn(Database.prototype, "getAllIterator").mockReturnValue(mockIterator);
const spyOnReadableFrom = jest.spyOn(Readable, "from").mockReturnValue(mockReadable);
const spyOnDestroy = jest.spyOn(Writable.prototype, "destroy");

// @ts-ignore
const generateLog = new GenerateLogGz({
databaseFilePath: "path/to/db",
schema: { tables: [] },
logFileName: "test.log.gz",
query: {},
});

expect(spyOnBoot).toHaveBeenCalled();

await expect(generateLog.execute()).rejects.toThrow("Dummy error");
expect(spyOnGetAllIterator).toHaveBeenCalled();
expect(spyOnReadableFrom).toHaveBeenCalled();
expect(spyOnDestroy).toHaveBeenCalled();
expect(existsSync(join(process.env.CORE_PATH_TEMP!, "log-archive", "test.log.gz"))).toBeFalse();
expect(existsSync(join(process.env.CORE_PATH_DATA!, "log-archive", "test.log.gz"))).toBeFalse();
});
});
102 changes: 100 additions & 2 deletions __tests__/unit/core-manager/worker/generate-log-zip.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,30 @@ import "jest-extended";

import { Database } from "@packages/core-manager/src/database/database";
import { GenerateLogZip } from "@packages/core-manager/src/workers/generate-log-zip";
import archiver from "archiver";
import { EventEmitter } from "events";
import { existsSync } from "fs-extra";
import { join } from "path";
import { Readable, Writable } from "stream";
import { dirSync, setGracefulCleanup } from "tmp";

jest.mock("@packages/core-manager/src/database/database");

const mockLogArray = [{ timestamp: 1607948405, level: "info", content: "log message" }];
const mockIterator = mockLogArray[Symbol.iterator]();

class ReadableError extends Readable {
public constructor() {
super({ objectMode: true });
}

public _read(size: number) {
this.emit("error", new Error("Dummy error"));
}
}

const mockReadable = new ReadableError();

beforeEach(() => {
setGracefulCleanup();

Expand All @@ -21,14 +36,18 @@ beforeEach(() => {
afterAll(() => {
delete process.env.CORE_PATH_TEMP;
delete process.env.CORE_PATH_DATA;

jest.clearAllMocks();
});

describe("Generate Log", () => {
it("should generate log", async () => {
const spyOnBoot = jest.spyOn(Database.prototype, "boot");
// @ts-ignore
const spyOnGetAllIterator = jest.spyOn(Database.prototype, "getAllIterator").mockReturnValue(mockIterator);

console.log(process.env.CORE_PATH_TEMP);
console.log(process.env.CORE_PATH_DATA);

// @ts-ignore
const generateLog = new GenerateLogZip({
databaseFilePath: "path/to/db",
Expand All @@ -42,7 +61,86 @@ describe("Generate Log", () => {
await generateLog.execute();

expect(spyOnGetAllIterator).toHaveBeenCalled();

expect(existsSync(join(process.env.CORE_PATH_DATA!, "log-archive", "test.zip"))).toBeTrue();
});

it("should destroy socket, remove temp files and throw error on pipeline error", async () => {
const spyOnBoot = jest.spyOn(Database.prototype, "boot");
const spyOnGetAllIterator = jest.spyOn(Database.prototype, "getAllIterator").mockReturnValue(mockIterator);
const spyOnReadableFrom = jest.spyOn(Readable, "from").mockReturnValue(mockReadable);
const spyOnDestroy = jest.spyOn(Writable.prototype, "destroy");

// @ts-ignore
const generateLog = new GenerateLogZip({
databaseFilePath: "path/to/db",
schema: { tables: [] },
logFileName: "test.zip",
query: {},
});

const spyOnMoveArchive = jest.spyOn(generateLog, "moveArchive");

expect(spyOnBoot).toHaveBeenCalled();

await expect(generateLog.execute()).rejects.toThrow("Dummy error");
expect(spyOnGetAllIterator).toHaveBeenCalled();
expect(spyOnReadableFrom).toHaveBeenCalled();
expect(spyOnDestroy).toHaveBeenCalled();
expect(existsSync(join(process.env.CORE_PATH_TEMP!, "log-archive", "test.zip"))).toBeFalse();
expect(existsSync(join(process.env.CORE_PATH_DATA!, "log-archive", "test.zip"))).toBeFalse();

await new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 200);
});

expect(spyOnMoveArchive).not.toHaveBeenCalled();
});

it("should destroy socket, remove temp files and throw error on archiver error", async () => {
const spyOnBoot = jest.spyOn(Database.prototype, "boot");
const spyOnGetAllIterator = jest.spyOn(Database.prototype, "getAllIterator").mockReturnValue(mockIterator);
const spyOnDestroy = jest.spyOn(Writable.prototype, "destroy");
const spyOnArchiverCreate = jest.spyOn(archiver, "create").mockImplementation((...data) => {
const archive: any = new EventEmitter();

archive.pipe = () => {
archive.emit("error", new Error("Archive error"));
};

archive.abort = jest.fn();
archive.append = jest.fn();
archive.finalize = jest.fn();

return archive;
});

// @ts-ignore
const generateLog = new GenerateLogZip({
databaseFilePath: "path/to/db",
schema: { tables: [] },
logFileName: "test.zip",
query: {},
});

const spyOnMoveArchive = jest.spyOn(generateLog, "moveArchive");

expect(spyOnBoot).toHaveBeenCalled();

await expect(generateLog.execute()).rejects.toThrow("Archive error");
expect(spyOnGetAllIterator).toHaveBeenCalled();
expect(spyOnArchiverCreate).toHaveBeenCalled();
expect(spyOnDestroy).toHaveBeenCalled();
expect(existsSync(join(process.env.CORE_PATH_TEMP!, "log-archive", "test.zip"))).toBeFalse();
expect(existsSync(join(process.env.CORE_PATH_DATA!, "log-archive", "test.zip"))).toBeFalse();

await new Promise((resolve) => {
setTimeout(() => {
resolve();
}, 200);
});

expect(spyOnMoveArchive).not.toHaveBeenCalled();
});
});
38 changes: 20 additions & 18 deletions packages/core-manager/src/workers/generate-log-gz.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import { createWriteStream, ensureDirSync, renameSync } from "fs-extra";
import { dirname } from "path";
import { Writable, Readable } from "stream";
import { pipeline, Readable } from "stream";
import zlib from "zlib";

import { GenerateLog } from "./generate-log";
import { LogTransformStream } from "./log-transform-stream";

export class GenerateLogGz extends GenerateLog {
public async execute(): Promise<void> {
const readStream = Readable.from(this.database.getAllIterator("logs", this.options.query), { objectMode: true });
const writeStream = this.prepareOutputStream();
await new Promise((resolve, reject) => {
const writeStream = this.prepareOutputStream();

readStream.pipe(new LogTransformStream()).pipe(writeStream);
await this.resolveOnClose(writeStream);
pipeline(
Readable.from(this.database.getAllIterator("logs", this.options.query), { objectMode: true }),
new LogTransformStream(),
zlib.createGzip(),
writeStream,
(err) => {
if (err) {
writeStream.destroy();
this.removeTempFiles();

ensureDirSync(dirname(this.getFilePath()));
renameSync(this.getTempFilePath(), this.getFilePath());
}

private prepareOutputStream(): Writable {
ensureDirSync(dirname(this.getTempFilePath()));

const stream = zlib.createGzip();
stream.pipe(createWriteStream(this.getTempFilePath()));

return stream;
reject(err);
} else {
this.moveArchive();
resolve();
}
},
);
});
}
}
64 changes: 41 additions & 23 deletions packages/core-manager/src/workers/generate-log-zip.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,57 @@
import archiver from "archiver";
import { createWriteStream, ensureDirSync, renameSync } from "fs-extra";
import { dirname, parse } from "path";
import { Writable, Readable } from "stream";
import { parse } from "path";
import { pipeline, Readable } from "stream";

import { GenerateLog } from "./generate-log";
import { LogTransformStream } from "./log-transform-stream";

export class GenerateLogZip extends GenerateLog {
public async execute(): Promise<void> {
const readStream = Readable.from(this.database.getAllIterator("logs", this.options.query), {objectMode: true});
const writeStream = this.prepareOutputStream();
await new Promise(async (resolve, reject) => {
const writeStream = this.prepareOutputStream();

const archive = archiver("zip", {
zlib: { level: 9 },
});
archive.pipe(writeStream);
writeStream.on("close", () => {
resolve();
});

archive.append(
readStream.pipe(
new LogTransformStream(),
),
{ name: parse(this.options.logFileName).name + ".log" },
);
const archive = archiver("zip", {
zlib: { level: 9 },
});

archive.finalize();
const handleError = (err) => {
writeStream.removeAllListeners("close");

await this.resolveOnClose(writeStream);
archive.abort();
writeStream.destroy();
this.removeTempFiles();

ensureDirSync(dirname(this.getFilePath()));
renameSync(this.getTempFilePath(), this.getFilePath());
}
reject(err);
};

private prepareOutputStream(): Writable {
ensureDirSync(dirname(this.getTempFilePath()));
archive.on("error", (err) => {
handleError(err);
});

const readStream = pipeline(
Readable.from(this.database.getAllIterator("logs", this.options.query), {
objectMode: true,
}),
new LogTransformStream(),
(err) => {
if (err) {
handleError(err);
}
},
);

archive.pipe(writeStream);
archive.append(readStream, {
name: parse(this.options.logFileName).name + ".log",
});

archive.finalize();
});

return createWriteStream(this.getTempFilePath());
this.moveArchive();
}
}
24 changes: 14 additions & 10 deletions packages/core-manager/src/workers/generate-log.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { join } from "path";
import { createWriteStream, ensureDirSync, removeSync, renameSync } from "fs-extra";
import { dirname, join } from "path";
import { Writable } from "stream";

import { GenerateLog as GenerateLogContracts } from "../contracts";
Expand All @@ -23,15 +24,18 @@ export class GenerateLog implements GenerateLogContracts.GenerateLog {
return join(process.env.CORE_PATH_TEMP!, "log-archive", this.options.logFileName);
}

protected resolveOnClose(stream: Writable): Promise<void> {
return new Promise((resolve) => {
stream.on("end", () => {
stream.destroy();
});
protected prepareOutputStream(): Writable {
ensureDirSync(dirname(this.getTempFilePath()));

stream.on("close", () => {
resolve();
});
});
return createWriteStream(this.getTempFilePath());
}

protected moveArchive(): void {
ensureDirSync(dirname(this.getFilePath()));
renameSync(this.getTempFilePath(), this.getFilePath());
}

protected removeTempFiles(): void {
removeSync(this.getTempFilePath());
}
}

0 comments on commit 25d0943

Please sign in to comment.