Skip to content

Commit f75178c

Browse files
committed
Extract longest common prefix from pathnames in batch
1 parent 3c5ef87 commit f75178c

File tree

3 files changed

+118
-23
lines changed

3 files changed

+118
-23
lines changed

src/commands/database-import.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ interface DatabaseImportOptions extends Options {
2525
concurrency?: string;
2626
}
2727

28-
const MAX_CHUNK_SIZE_MB = 10;
28+
const MAX_CHUNK_SIZE_MB = 1;
2929
const MAX_PAYLOAD_SIZE_MB = 256;
3030
const CONCURRENCY_LIMIT = 5;
3131

src/database/import.ts

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ type ChunkedData = {
2222
size: number;
2323
};
2424

25-
type Batch = { [pathname: string]: JsonType };
26-
2725
/**
2826
* Imports JSON data to a given RTDB instance.
2927
*
@@ -98,7 +96,7 @@ export default class DatabaseImporter {
9896
};
9997

10098
const writeBatch = new stream.Transform({ objectMode: true });
101-
writeBatch._transform = async function (batch: Batch, _, done) {
99+
writeBatch._transform = async function (batch: Data, _, done) {
102100
const res = await doWriteBatch(batch);
103101
this.push(res);
104102
done();
@@ -135,12 +133,13 @@ export default class DatabaseImporter {
135133
});
136134
}
137135

138-
private doWriteBatch(batch: Batch): Promise<ClientResponse<JsonType>> {
136+
private doWriteBatch(data: Data): Promise<ClientResponse<JsonType>> {
137+
const sanitizedData = this.sanitizeDataForPatch(data);
139138
const doRequest = (): Promise<ClientResponse<JsonType>> => {
140139
return this.client.request({
141140
method: "PATCH",
142-
path: "/.json",
143-
body: JSON.stringify(batch),
141+
path: `${sanitizedData.pathname}.json`,
142+
body: JSON.stringify(sanitizedData.json),
144143
queryParams: this.dbUrl.searchParams,
145144
});
146145
};
@@ -162,6 +161,17 @@ export default class DatabaseImporter {
162161
});
163162
}
164163

164+
// Since we cannot PATCH arrays and primitives, we convert them to objects.
165+
private sanitizeDataForPatch(batch: Data): Data {
166+
if (typeof batch.json == "object") {
167+
return { json: { ...batch.json }, pathname: batch.pathname };
168+
} else {
169+
const tokens = batch.pathname.split("/");
170+
const lastToken = tokens.pop();
171+
return { json: { [lastToken!]: batch.json }, pathname: tokens.join("/") };
172+
}
173+
}
174+
165175
private chunkData({ json, pathname }: Data): ChunkedData {
166176
if (typeof json === "string" || typeof json === "number" || typeof json === "boolean") {
167177
// Leaf node, cannot be chunked
@@ -209,27 +219,56 @@ export default class DatabaseImporter {
209219
* containing chunks keyed by pathname.
210220
*/
211221
class BatchChunks extends stream.Transform {
212-
private batch: Batch = {};
222+
private batch: Data = { json: {}, pathname: "" };
213223
private size = 0;
214224

215225
constructor(private maxSize: number, opts?: stream.TransformOptions) {
216226
super({ ...opts, objectMode: true });
217227
}
218228

219229
_transform(chunk: Data, _: BufferEncoding, callback: stream.TransformCallback) {
220-
const chunkSize = JSON.stringify(chunk.json).length;
230+
const chunkSize = JSON.stringify(chunk.json).length + chunk.pathname.length;
221231
if (this.size + chunkSize > this.maxSize) {
222232
this.push(this.batch);
223-
this.batch = {};
233+
this.batch = { json: {}, pathname: "" };
224234
this.size = 0;
225235
}
226-
this.batch[chunk.pathname] = chunk.json;
236+
if (this.size == 0) {
237+
this.batch = chunk;
238+
} else {
239+
const newPathname = this._findLongestCommonPrefix(chunk.pathname, this.batch.pathname);
240+
const batchKey = this.batch.pathname.substring(newPathname.length + 1); // +1 to trim leading slash
241+
const chunkKey = chunk.pathname.substring(newPathname.length + 1);
242+
243+
if (batchKey == "") {
244+
this.batch.json = Object.assign({}, this.batch.json, { [chunkKey]: chunk.json });
245+
} else if (chunkKey == "") {
246+
this.batch.json = Object.assign({}, chunk.json, { [batchKey]: this.batch.json });
247+
} else {
248+
this.batch.json = { [batchKey]: this.batch.json, [chunkKey]: chunk.json };
249+
}
250+
this.batch.pathname = newPathname;
251+
}
227252
this.size += chunkSize;
228253
callback(null);
229254
}
230255

256+
_findLongestCommonPrefix(a: string, b: string) {
257+
const aTokens = a.split("/");
258+
const bTokens = b.split("/");
259+
let prefix = aTokens.slice(0, bTokens.length);
260+
for (let i = 0; i < prefix.length; i++) {
261+
if (prefix[i] != bTokens[i]) {
262+
prefix = prefix.slice(0, i);
263+
break;
264+
}
265+
}
266+
return prefix.join("/");
267+
}
268+
231269
_flush(callback: stream.TransformCallback) {
232270
if (this.size > 0) {
271+
console.log(`flushed: ${JSON.stringify(this.batch.json)}, path: ${this.batch.pathname}`);
233272
this.push(this.batch);
234273
}
235274
callback(null);

src/test/database/import.spec.ts

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const payloadSize = 1024 * 1024 * 10;
1212
const concurrencyLimit = 5;
1313

1414
describe("DatabaseImporter", () => {
15-
const DATA = { a: 100, b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] };
15+
const DATA = { a: 100, b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }], c: { d: false } };
1616
let DATA_STREAM: stream.Readable;
1717

1818
beforeEach(() => {
@@ -39,31 +39,38 @@ describe("DatabaseImporter", () => {
3939
it("batches data from different top-level objects", async () => {
4040
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
4141
nock("https://test-db.firebaseio.com")
42-
.patch("/.json", JSON.stringify({ "/foo/a": 100, "/foo/b/0": true, "/foo/b/1": "bar" }))
42+
.patch("/foo.json", JSON.stringify({ a: 100, "b/0": true, "b/1": "bar" }))
4343
.reply(200);
4444
nock("https://test-db.firebaseio.com")
45-
.patch("/.json", JSON.stringify({ "/foo/b/2/f": { g: 0, h: 1 }, "/foo/b/2/i": "baz" }))
45+
.patch("/foo/b/2.json", JSON.stringify({ f: { g: 0, h: 1 }, i: "baz" }))
46+
.reply(200);
47+
nock("https://test-db.firebaseio.com")
48+
.patch("/foo/c.json", JSON.stringify({ d: false }))
4649
.reply(200);
4750
const importer = new DatabaseImporter(
4851
dbUrl,
4952
DATA_STREAM,
5053
/* importPath= */ "/",
51-
/* payloadSize= */ 20,
54+
/* payloadSize= */ 40,
5255
concurrencyLimit
5356
);
5457

5558
const responses = await importer.execute();
5659

57-
expect(responses).to.have.length(2);
60+
expect(responses).to.have.length(3);
5861
expect(nock.isDone()).to.be.true;
5962
});
6063

6164
it("writes data as a single batch for large enough payload size", async () => {
6265
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
6366
nock("https://test-db.firebaseio.com")
6467
.patch(
65-
"/.json",
66-
JSON.stringify({ "/foo/a": 100, "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] })
68+
"/foo.json",
69+
JSON.stringify({
70+
a: 100,
71+
b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }],
72+
c: { d: false },
73+
})
6774
)
6875
.reply(200);
6976
const importer = new DatabaseImporter(
@@ -83,7 +90,48 @@ describe("DatabaseImporter", () => {
8390
it("imports from data path", async () => {
8491
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
8592
nock("https://test-db.firebaseio.com")
86-
.patch("/.json", JSON.stringify({ "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] }))
93+
.patch("/foo/c.json", JSON.stringify({ d: false }))
94+
.reply(200);
95+
const importer = new DatabaseImporter(
96+
dbUrl,
97+
DATA_STREAM,
98+
/* importPath= */ "/c",
99+
payloadSize,
100+
concurrencyLimit
101+
);
102+
103+
const responses = await importer.execute();
104+
105+
expect(responses).to.have.length(1);
106+
expect(nock.isDone()).to.be.true;
107+
});
108+
109+
it("writes primitive as object", async () => {
110+
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
111+
nock("https://test-db.firebaseio.com")
112+
.patch("/foo.json", JSON.stringify({ a: 100 }))
113+
.reply(200);
114+
const importer = new DatabaseImporter(
115+
dbUrl,
116+
DATA_STREAM,
117+
/* importPath= */ "/a",
118+
payloadSize,
119+
concurrencyLimit
120+
);
121+
122+
const responses = await importer.execute();
123+
124+
expect(responses).to.have.length(1);
125+
expect(nock.isDone()).to.be.true;
126+
});
127+
128+
it("writes array as object", async () => {
129+
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
130+
nock("https://test-db.firebaseio.com")
131+
.patch(
132+
"/foo/b.json",
133+
JSON.stringify({ "0": true, "1": "bar", "2": { f: { g: 0, h: 1 }, i: "baz" } })
134+
)
87135
.reply(200);
88136
const importer = new DatabaseImporter(
89137
dbUrl,
@@ -122,15 +170,23 @@ describe("DatabaseImporter", () => {
122170
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
123171
nock("https://test-db.firebaseio.com")
124172
.patch(
125-
"/.json",
126-
JSON.stringify({ "/foo/a": 100, "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] })
173+
"/foo.json",
174+
JSON.stringify({
175+
a: 100,
176+
b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }],
177+
c: { d: false },
178+
})
127179
)
128180
.once()
129181
.replyWithError(timeoutErr);
130182
nock("https://test-db.firebaseio.com")
131183
.patch(
132-
"/.json",
133-
JSON.stringify({ "/foo/a": 100, "/foo/b": [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] })
184+
"/foo.json",
185+
JSON.stringify({
186+
a: 100,
187+
b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }],
188+
c: { d: false },
189+
})
134190
)
135191
.once()
136192
.reply(200);

0 commit comments

Comments
 (0)