Skip to content

Commit 5a8b2ac

Browse files
authored
Allow user to configure database:import chunk size and concurrency (#5726)
* Allow user to configure database:import chunk size and concurrency * Cap chunk size at NGINX max payload size
1 parent a98a49a commit 5a8b2ac

File tree

3 files changed

+49
-12
lines changed

3 files changed

+49
-12
lines changed

src/commands/database-import.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@ interface DatabaseImportOptions extends Options {
2121
instanceDetails: DatabaseInstance;
2222
disableTriggers?: boolean;
2323
filter?: string;
24+
chunkSize?: string;
25+
concurrency?: string;
2426
}
2527

28+
const MAX_CHUNK_SIZE_MB = 10;
29+
const MAX_PAYLOAD_SIZE_MB = 256;
30+
const CONCURRENCY_LIMIT = 5;
31+
2632
export const command = new Command("database:import <path> [infile]")
2733
.description(
2834
"non-atomically import the contents of a JSON file to the specified path in Realtime Database"
@@ -41,6 +47,8 @@ export const command = new Command("database:import <path> [infile]")
4147
"--filter <dataPath>",
4248
"import only data at this path in the JSON file (if omitted, import entire file)"
4349
)
50+
.option("--chunk-size <mb>", "max chunk size in megabytes, default to 10 MB")
51+
.option("--concurrency <val>", "concurrency limit, default to 5")
4452
.before(requirePermissions, ["firebasedatabase.instances.update"])
4553
.before(requireDatabaseInstance)
4654
.before(populateInstanceDetails)
@@ -54,6 +62,11 @@ export const command = new Command("database:import <path> [infile]")
5462
throw new FirebaseError("No file supplied");
5563
}
5664

65+
const chunkMegabytes = options.chunkSize ? parseInt(options.chunkSize, 10) : MAX_CHUNK_SIZE_MB;
66+
if (chunkMegabytes > MAX_PAYLOAD_SIZE_MB) {
67+
throw new FirebaseError("Max chunk size cannot exceed 256 MB");
68+
}
69+
5770
const projectId = needProjectId(options);
5871
const origin = realtimeOriginOrEmulatorOrCustomUrl(options.instanceDetails.databaseUrl);
5972
const dbPath = utils.getDatabaseUrl(origin, options.instance, path);
@@ -77,7 +90,9 @@ export const command = new Command("database:import <path> [infile]")
7790

7891
const inStream = fs.createReadStream(infile);
7992
const dataPath = options.filter || "";
80-
const importer = new DatabaseImporter(dbUrl, inStream, dataPath);
93+
const chunkBytes = chunkMegabytes * 1024 * 1024;
94+
const concurrency = options.concurrency ? parseInt(options.concurrency, 10) : CONCURRENCY_LIMIT;
95+
const importer = new DatabaseImporter(dbUrl, inStream, dataPath, chunkBytes, concurrency);
8196

8297
let responses;
8398
try {

src/database/import.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ import { Client, ClientResponse } from "../apiv2";
99
import { FirebaseError } from "../error";
1010
import * as pLimit from "p-limit";
1111

12-
const MAX_CHUNK_SIZE = 1024 * 1024 * 10;
13-
const CONCURRENCY_LIMIT = 5;
14-
1512
type JsonType = { [key: string]: JsonType } | string | number | boolean;
1613

1714
type Data = {
@@ -31,15 +28,17 @@ type ChunkedData = {
3128
*/
3229
export default class DatabaseImporter {
3330
private client: Client;
34-
private limit = pLimit(CONCURRENCY_LIMIT);
31+
private limit: pLimit.Limit;
3532

3633
constructor(
3734
private dbUrl: URL,
3835
private inStream: stream.Readable,
3936
private dataPath: string,
40-
private chunkSize = MAX_CHUNK_SIZE
37+
private chunkBytes: number,
38+
concurrency: number
4139
) {
4240
this.client = new Client({ urlPrefix: dbUrl.origin, auth: true });
41+
this.limit = pLimit(concurrency);
4342
}
4443

4544
/**
@@ -157,7 +156,7 @@ export default class DatabaseImporter {
157156
}
158157
}
159158

160-
if (hasChunkedChild || size >= this.chunkSize) {
159+
if (hasChunkedChild || size >= this.chunkBytes) {
161160
return { chunks, size };
162161
} else {
163162
return { chunks: null, size };

src/test/database/import.spec.ts

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import DatabaseImporter from "../../database/import";
77
import { FirebaseError } from "../../error";
88

99
const dbUrl = new URL("https://test-db.firebaseio.com/foo");
10+
const chunkSize = 1024 * 1024 * 10;
11+
const concurrencyLimit = 5;
1012

1113
describe("DatabaseImporter", () => {
1214
const DATA = { a: 100, b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] };
@@ -22,7 +24,9 @@ describe("DatabaseImporter", () => {
2224
const importer = new DatabaseImporter(
2325
dbUrl,
2426
utils.stringToStream(INVALID_JSON)!,
25-
/* importPath= */ "/"
27+
/* importPath= */ "/",
28+
chunkSize,
29+
concurrencyLimit
2630
);
2731

2832
await expect(importer.execute()).to.be.rejectedWith(
@@ -37,7 +41,13 @@ describe("DatabaseImporter", () => {
3741
nock("https://test-db.firebaseio.com")
3842
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
3943
.reply(200);
40-
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/");
44+
const importer = new DatabaseImporter(
45+
dbUrl,
46+
DATA_STREAM,
47+
/* importPath= */ "/",
48+
chunkSize,
49+
concurrencyLimit
50+
);
4151

4252
const responses = await importer.execute();
4353

@@ -58,7 +68,8 @@ describe("DatabaseImporter", () => {
5868
dbUrl,
5969
DATA_STREAM,
6070
/* importPath= */ "/",
61-
/* chunkSize= */ 20
71+
/* chunkSize= */ 20,
72+
concurrencyLimit
6273
);
6374

6475
const responses = await importer.execute();
@@ -72,7 +83,13 @@ describe("DatabaseImporter", () => {
7283
nock("https://test-db.firebaseio.com")
7384
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
7485
.reply(200);
75-
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/b");
86+
const importer = new DatabaseImporter(
87+
dbUrl,
88+
DATA_STREAM,
89+
/* importPath= */ "/b",
90+
chunkSize,
91+
concurrencyLimit
92+
);
7693

7794
const responses = await importer.execute();
7895

@@ -82,7 +99,13 @@ describe("DatabaseImporter", () => {
8299

83100
it("throws FirebaseError when data location is nonempty", async () => {
84101
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200, { a: "foo" });
85-
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/");
102+
const importer = new DatabaseImporter(
103+
dbUrl,
104+
DATA_STREAM,
105+
/* importPath= */ "/",
106+
chunkSize,
107+
concurrencyLimit
108+
);
86109

87110
await expect(importer.execute()).to.be.rejectedWith(
88111
FirebaseError,

0 commit comments

Comments
 (0)