Skip to content

Commit 3c5ef87

Browse files
committed
Do multi-path PATCH instead of PUT in database:import
1 parent a410a87 commit 3c5ef87

File tree

2 files changed

+87
-35
lines changed

2 files changed

+87
-35
lines changed

src/database/import.ts

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ type ChunkedData = {
2222
size: number;
2323
};
2424

25+
type Batch = { [pathname: string]: JsonType };
26+
2527
/**
2628
* Imports JSON data to a given RTDB instance.
2729
*
28-
* The data is parsed and chunked into subtrees of ~10 MB, to be subsequently written in parallel.
30+
* The data is parsed and chunked into subtrees of the specified payload size, to be subsequently
31+
* written in parallel.
2932
*/
3033
export default class DatabaseImporter {
3134
private client: Client;
@@ -36,7 +39,7 @@ export default class DatabaseImporter {
3639
private dbUrl: URL,
3740
private inStream: stream.Readable,
3841
private dataPath: string,
39-
private chunkBytes: number,
42+
private payloadSize: number,
4043
concurrency: number
4144
) {
4245
this.client = new Client({ urlPrefix: dbUrl.origin, auth: true });
@@ -68,10 +71,19 @@ export default class DatabaseImporter {
6871
}
6972
}
7073

74+
/**
75+
* The top-level objects are parsed and chunked, with each chunk capped at payloadSize. Then,
76+
* chunks are batched, with each batch also capped at payloadSize. Finally, the batched chunks
77+
* are written in parallel.
78+
*
79+
* In the case where the data contains very large objects, chunking ensures that the request is
80+
* not too large. On the other hand, in the case where the data contains many small objects,
81+
* batching ensures that there are not too many requests.
82+
*/
7183
private readAndWriteChunks(): Promise<ClientResponse<JsonType>[]> {
72-
const { dbUrl } = this;
84+
const { dbUrl, payloadSize } = this;
7385
const chunkData = this.chunkData.bind(this);
74-
const writeChunk = this.writeChunk.bind(this);
86+
const doWriteBatch = this.doWriteBatch.bind(this);
7587
const getJoinedPath = this.joinPath.bind(this);
7688

7789
const readChunks = new stream.Transform({ objectMode: true });
@@ -85,9 +97,9 @@ export default class DatabaseImporter {
8597
done();
8698
};
8799

88-
const writeChunks = new stream.Transform({ objectMode: true });
89-
writeChunks._transform = async function (chunk: Data, _, done) {
90-
const res = await writeChunk(chunk);
100+
const writeBatch = new stream.Transform({ objectMode: true });
101+
writeBatch._transform = async function (batch: Batch, _, done) {
102+
const res = await doWriteBatch(batch);
91103
this.push(res);
92104
done();
93105
};
@@ -115,19 +127,20 @@ export default class DatabaseImporter {
115127
)
116128
)
117129
.pipe(readChunks)
118-
.pipe(writeChunks)
130+
.pipe(new BatchChunks(payloadSize))
131+
.pipe(writeBatch)
119132
.on("data", (res: ClientResponse<JsonType>) => responses.push(res))
120133
.on("error", reject)
121134
.once("end", () => resolve(responses));
122135
});
123136
}
124137

125-
private writeChunk(chunk: Data): Promise<ClientResponse<JsonType>> {
138+
private doWriteBatch(batch: Batch): Promise<ClientResponse<JsonType>> {
126139
const doRequest = (): Promise<ClientResponse<JsonType>> => {
127140
return this.client.request({
128-
method: "PUT",
129-
path: chunk.pathname + ".json",
130-
body: JSON.stringify(chunk.json),
141+
method: "PATCH",
142+
path: "/.json",
143+
body: JSON.stringify(batch),
131144
queryParams: this.dbUrl.searchParams,
132145
});
133146
};
@@ -174,7 +187,7 @@ export default class DatabaseImporter {
174187
}
175188
}
176189

177-
if (hasChunkedChild || size >= this.chunkBytes) {
190+
if (hasChunkedChild || size >= this.payloadSize) {
178191
return { chunks, size };
179192
} else {
180193
return { chunks: null, size };
@@ -190,3 +203,35 @@ export default class DatabaseImporter {
190203
return [root, key].join("/").replace("//", "/");
191204
}
192205
}
206+
207+
/**
208+
* Batches chunked JSON data up to the specified byte size limit. The emitted batch is an object
209+
* containing chunks keyed by pathname.
210+
*/
211+
class BatchChunks extends stream.Transform {
212+
private batch: Batch = {};
213+
private size = 0;
214+
215+
constructor(private maxSize: number, opts?: stream.TransformOptions) {
216+
super({ ...opts, objectMode: true });
217+
}
218+
219+
_transform(chunk: Data, _: BufferEncoding, callback: stream.TransformCallback) {
220+
const chunkSize = JSON.stringify(chunk.json).length;
221+
if (this.size + chunkSize > this.maxSize) {
222+
this.push(this.batch);
223+
this.batch = {};
224+
this.size = 0;
225+
}
226+
this.batch[chunk.pathname] = chunk.json;
227+
this.size += chunkSize;
228+
callback(null);
229+
}
230+
231+
_flush(callback: stream.TransformCallback) {
232+
if (this.size > 0) {
233+
this.push(this.batch);
234+
}
235+
callback(null);
236+
}
237+
}

src/test/database/import.spec.ts

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { FirebaseError } from "../../error";
88
import { FetchError } from "node-fetch";
99

1010
const dbUrl = new URL("https://test-db.firebaseio.com/foo");
11-
const chunkSize = 1024 * 1024 * 10;
11+
const payloadSize = 1024 * 1024 * 10;
1212
const concurrencyLimit = 5;
1313

1414
describe("DatabaseImporter", () => {
@@ -26,7 +26,7 @@ describe("DatabaseImporter", () => {
2626
dbUrl,
2727
utils.stringToStream(INVALID_JSON)!,
2828
/* importPath= */ "/",
29-
chunkSize,
29+
payloadSize,
3030
concurrencyLimit
3131
);
3232

@@ -36,17 +36,19 @@ describe("DatabaseImporter", () => {
3636
);
3737
});
3838

39-
it("chunks data in top-level objects", async () => {
39+
it("batches data from different top-level objects", async () => {
4040
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
41-
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200);
4241
nock("https://test-db.firebaseio.com")
43-
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
42+
.patch("/.json", JSON.stringify({ "/foo/a": 100, "/foo/b/0": true, "/foo/b/1": "bar" }))
43+
.reply(200);
44+
nock("https://test-db.firebaseio.com")
45+
.patch("/.json", JSON.stringify({ "/foo/b/2/f": { g: 0, h: 1 }, "/foo/b/2/i": "baz" }))
4446
.reply(200);
4547
const importer = new DatabaseImporter(
4648
dbUrl,
4749
DATA_STREAM,
4850
/* importPath= */ "/",
49-
chunkSize,
51+
/* payloadSize= */ 20,
5052
concurrencyLimit
5153
);
5254

@@ -56,39 +58,38 @@ describe("DatabaseImporter", () => {
5658
expect(nock.isDone()).to.be.true;
5759
});
5860

59-
it("chunks data according to provided chunk size", async () => {
61+
it("writes data as a single batch for large enough payload size", async () => {
6062
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
61-
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200);
62-
nock("https://test-db.firebaseio.com").put("/foo/b/0.json", "true").reply(200);
63-
nock("https://test-db.firebaseio.com").put("/foo/b/1.json", '"bar"').reply(200);
6463
nock("https://test-db.firebaseio.com")
65-
.put("/foo/b/2/f.json", JSON.stringify({ g: 0, h: 1 }))
64+
.patch(
65+
"/.json",
66+
JSON.stringify({ "/foo/a": 100, "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] })
67+
)
6668
.reply(200);
67-
nock("https://test-db.firebaseio.com").put("/foo/b/2/i.json", '"baz"').reply(200);
6869
const importer = new DatabaseImporter(
6970
dbUrl,
7071
DATA_STREAM,
7172
/* importPath= */ "/",
72-
/* chunkSize= */ 20,
73+
payloadSize,
7374
concurrencyLimit
7475
);
7576

7677
const responses = await importer.execute();
7778

78-
expect(responses).to.have.length(5);
79+
expect(responses).to.have.length(1);
7980
expect(nock.isDone()).to.be.true;
8081
});
8182

8283
it("imports from data path", async () => {
8384
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
8485
nock("https://test-db.firebaseio.com")
85-
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
86+
.patch("/.json", JSON.stringify({ "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] }))
8687
.reply(200);
8788
const importer = new DatabaseImporter(
8889
dbUrl,
8990
DATA_STREAM,
9091
/* importPath= */ "/b",
91-
chunkSize,
92+
payloadSize,
9293
concurrencyLimit
9394
);
9495

@@ -104,7 +105,7 @@ describe("DatabaseImporter", () => {
104105
dbUrl,
105106
DATA_STREAM,
106107
/* importPath= */ "/",
107-
chunkSize,
108+
payloadSize,
108109
concurrencyLimit
109110
);
110111

@@ -120,26 +121,32 @@ describe("DatabaseImporter", () => {
120121

121122
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
122123
nock("https://test-db.firebaseio.com")
123-
.put("/foo/a.json", "100")
124+
.patch(
125+
"/.json",
126+
JSON.stringify({ "/foo/a": 100, "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] })
127+
)
124128
.once()
125129
.replyWithError(timeoutErr);
126-
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").once().reply(200);
127130
nock("https://test-db.firebaseio.com")
128-
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
131+
.patch(
132+
"/.json",
133+
JSON.stringify({ "/foo/a": 100, "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] })
134+
)
135+
.once()
129136
.reply(200);
130137

131138
const importer = new DatabaseImporter(
132139
dbUrl,
133140
DATA_STREAM,
134141
/* importPath= */ "/",
135-
chunkSize,
142+
payloadSize,
136143
concurrencyLimit
137144
);
138145
importer.nonFatalRetryTimeout = 0;
139146

140147
const responses = await importer.execute();
141148

142-
expect(responses).to.have.length(2);
149+
expect(responses).to.have.length(1);
143150
expect(nock.isDone()).to.be.true;
144151
});
145152
});

0 commit comments

Comments
 (0)