Skip to content

Commit 077cc97

Browse files
committed
Stream top-level JSON objects
1 parent afa6795 commit 077cc97

File tree

4 files changed

+85
-39
lines changed

4 files changed

+85
-39
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@
176176
"@types/glob": "^7.1.1",
177177
"@types/inquirer": "^8.1.3",
178178
"@types/js-yaml": "^3.12.2",
179+
"@types/jsonstream": "^0.8.30",
179180
"@types/jsonwebtoken": "^8.3.8",
180181
"@types/libsodium-wrappers": "^0.7.9",
181182
"@types/lodash": "^4.14.149",

src/commands/database-import.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ export const command = new Command("database:import <path> [infile]")
5959
throw new FirebaseError("Command aborted.");
6060
}
6161

62-
const inputString = await utils.streamToString(fs.createReadStream(infile));
63-
const importer = new DatabaseImporter(dbUrl, inputString);
62+
const inStream = fs.createReadStream(infile);
63+
const importer = new DatabaseImporter(dbUrl, inStream);
6464
try {
6565
await importer.execute();
6666
} catch (err: any) {

src/database/import.ts

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import * as clc from "colorette";
2+
import * as stream from "stream";
23
import pLimit from "p-limit";
34
import { URL } from "url";
45
import { Client } from "../apiv2";
56
import { FirebaseError } from "../error";
67

8+
const JSONStream = require("JSONStream");
9+
710
const MAX_CHUNK_SIZE = 1024 * 1024;
811
const CONCURRENCY_LIMIT = 5;
912

@@ -23,23 +26,14 @@ type ChunkedData = {
2326
* The data is parsed and chunked into subtrees of ~1 MB, to be subsequently written in parallel.
2427
*/
2528
export default class DatabaseImporter {
26-
chunks: Data[];
2729
private client: Client;
2830
private limit = pLimit(CONCURRENCY_LIMIT);
2931

30-
constructor(private dbUrl: URL, file: string, private chunkSize = MAX_CHUNK_SIZE) {
31-
let data;
32-
try {
33-
data = { json: JSON.parse(file), pathname: dbUrl.pathname };
34-
} catch (err: any) {
35-
throw new FirebaseError("Invalid data; couldn't parse JSON object, array, or value.", {
36-
original: err,
37-
exit: 2,
38-
});
39-
}
40-
41-
const chunkedData = this.chunkData(data);
42-
this.chunks = chunkedData.chunks || [data];
32+
constructor(
33+
private dbUrl: URL,
34+
private inStream: NodeJS.ReadableStream,
35+
private chunkSize = MAX_CHUNK_SIZE
36+
) {
4337
this.client = new Client({ urlPrefix: dbUrl.origin, auth: true });
4438
}
4539

@@ -48,7 +42,7 @@ export default class DatabaseImporter {
4842
*/
4943
async execute(): Promise<any> {
5044
await this.checkLocationIsEmpty();
51-
return Promise.all(this.chunks.map(this.writeChunk.bind(this)));
45+
return this.readAndWriteChunks(this.inStream);
5246
}
5347

5448
private async checkLocationIsEmpty(): Promise<void> {
@@ -68,6 +62,48 @@ export default class DatabaseImporter {
6862
}
6963
}
7064

65+
private readAndWriteChunks(inStream: NodeJS.ReadableStream): Promise<any> {
66+
const { dbUrl } = this;
67+
const chunkData = this.chunkData.bind(this);
68+
const writeChunk = this.writeChunk.bind(this);
69+
const getJoinedPath = this.getJoinedPath.bind(this);
70+
71+
const readChunks = new stream.Transform({ objectMode: true });
72+
readChunks._transform = function (chunk: { key: string; value: any }, _, done) {
73+
const data = { json: chunk.value, pathname: getJoinedPath(dbUrl.pathname, chunk.key) };
74+
const chunkedData = chunkData(data);
75+
const chunks = chunkedData.chunks || [data];
76+
chunks.forEach((chunk: Data) => this.push(chunk));
77+
done();
78+
};
79+
80+
const writeChunks = new stream.Transform({ objectMode: true });
81+
writeChunks._transform = async function (chunk: Data, _, done) {
82+
const res = await writeChunk(chunk);
83+
this.push(res);
84+
done();
85+
};
86+
87+
return new Promise((resolve, reject) => {
88+
const results: any[] = [];
89+
inStream
90+
.pipe(JSONStream.parse("$*"))
91+
.on("error", (err: any) =>
92+
reject(
93+
new FirebaseError("Invalid data; couldn't parse JSON object, array, or value.", {
94+
original: err,
95+
exit: 2,
96+
})
97+
)
98+
)
99+
.pipe(readChunks)
100+
.pipe(writeChunks)
101+
.on("data", (res: any) => results.push(res))
102+
.on("error", reject)
103+
.once("end", () => resolve(results));
104+
});
105+
}
106+
71107
private writeChunk(chunk: Data): Promise<any> {
72108
return this.limit(() =>
73109
this.client.request({
@@ -91,9 +127,9 @@ export default class DatabaseImporter {
91127
let hasChunkedChild = false;
92128

93129
for (const key of Object.keys(json)) {
94-
size += key.length + 3; // "[key]":
130+
size += key.length + 3; // "":
95131

96-
const child = { json: json[key], pathname: [pathname, key].join("/").replace("//", "/") };
132+
const child = { json: json[key], pathname: this.getJoinedPath(pathname, key) };
97133
const childChunks = this.chunkData(child);
98134
size += childChunks.size;
99135
if (childChunks.chunks) {
@@ -111,4 +147,8 @@ export default class DatabaseImporter {
111147
}
112148
}
113149
}
150+
151+
private getJoinedPath(root: string, key: string): string {
152+
return [root, key].join("/").replace("//", "/");
153+
}
114154
}

src/test/database/import.spec.ts

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as nock from "nock";
2+
import * as utils from "../../utils";
23
import { expect } from "chai";
34

45
import DatabaseImporter from "../../database/import";
@@ -8,33 +9,37 @@ const dbUrl = new URL("https://test-db.firebaseio.com/foo");
89

910
describe("DatabaseImporter", () => {
1011
const DATA = { a: 100, b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] };
12+
let DATA_STREAM: NodeJS.ReadableStream;
1113

12-
it("parses data as single chunk", () => {
13-
const importer = new DatabaseImporter(dbUrl, JSON.stringify(DATA));
14-
expect(importer.chunks.length).to.equal(1);
15-
expect(importer.chunks[0].json).to.deep.equal(DATA);
16-
expect(importer.chunks[0].pathname).to.equal("/foo");
14+
beforeEach(() => {
15+
DATA_STREAM = utils.stringToStream(JSON.stringify(DATA))!;
1716
});
1817

19-
it("parses data as multiple chunks", () => {
20-
const importer = new DatabaseImporter(dbUrl, JSON.stringify(DATA), /* chunkSize= */ 20);
21-
expect(importer.chunks.length).to.equal(5);
22-
expect(importer.chunks).to.deep.include({ json: 100, pathname: "/foo/a" });
23-
expect(importer.chunks).to.deep.include({ json: true, pathname: "/foo/b/0" });
24-
expect(importer.chunks).to.deep.include({ json: "bar", pathname: "/foo/b/1" });
25-
expect(importer.chunks).to.deep.include({ json: { g: 0, h: 1 }, pathname: "/foo/b/2/f" });
26-
expect(importer.chunks).to.deep.include({ json: "baz", pathname: "/foo/b/2/i" });
27-
});
18+
it("throws FirebaseError when JSON is invalid", async () => {
19+
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
2820

29-
it("throws FirebaseError when JSON is invalid", () => {
30-
const INVALID_JSON = '{"a": }';
31-
expect(() => new DatabaseImporter(dbUrl, INVALID_JSON)).to.throw(
21+
const INVALID_JSON = '{"a": {"b"}}';
22+
const importer = new DatabaseImporter(dbUrl, utils.stringToStream(INVALID_JSON)!);
23+
await expect(importer.execute()).to.be.rejectedWith(
3224
FirebaseError,
3325
"Invalid data; couldn't parse JSON object, array, or value."
3426
);
3527
});
3628

37-
it("sends multiple chunked requests", async () => {
29+
it("chunks data in top-level objects", async () => {
30+
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
31+
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200);
32+
nock("https://test-db.firebaseio.com")
33+
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
34+
.reply(200);
35+
36+
const importer = new DatabaseImporter(dbUrl, DATA_STREAM);
37+
const responses = await importer.execute();
38+
expect(responses).to.have.length(2);
39+
expect(nock.isDone()).to.be.true;
40+
});
41+
42+
it("chunks data according to provided chunk size", async () => {
3843
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
3944
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200);
4045
nock("https://test-db.firebaseio.com").put("/foo/b/0.json", "true").reply(200);
@@ -44,15 +49,15 @@ describe("DatabaseImporter", () => {
4449
.reply(200);
4550
nock("https://test-db.firebaseio.com").put("/foo/b/2/i.json", '"baz"').reply(200);
4651

47-
const importer = new DatabaseImporter(dbUrl, JSON.stringify(DATA), /* chunkSize= */ 20);
52+
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* chunkSize= */ 20);
4853
const responses = await importer.execute();
4954
expect(responses).to.have.length(5);
5055
expect(nock.isDone()).to.be.true;
5156
});
5257

5358
it("throws FirebaseError when data location is nonempty", async () => {
5459
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200, { a: "foo" });
55-
const importer = new DatabaseImporter(dbUrl, JSON.stringify(DATA));
60+
const importer = new DatabaseImporter(dbUrl, DATA_STREAM);
5661
await expect(importer.execute()).to.be.rejectedWith(
5762
FirebaseError,
5863
/Importing is only allowed for an empty location./

0 commit comments

Comments
 (0)