Skip to content
4 changes: 2 additions & 2 deletions src/commands/database-import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ interface DatabaseImportOptions extends Options {
concurrency?: string;
}

const MAX_CHUNK_SIZE_MB = 10;
const MAX_CHUNK_SIZE_MB = 1;
const MAX_PAYLOAD_SIZE_MB = 256;
const CONCURRENCY_LIMIT = 5;

Expand All @@ -47,7 +47,7 @@ export const command = new Command("database:import <path> [infile]")
"--filter <dataPath>",
"import only data at this path in the JSON file (if omitted, import entire file)"
)
.option("--chunk-size <mb>", "max chunk size in megabytes, default to 10 MB")
.option("--chunk-size <mb>", "max chunk size in megabytes, default to 1 MB")
.option("--concurrency <val>", "concurrency limit, default to 5")
.before(requirePermissions, ["firebasedatabase.instances.update"])
.before(requireDatabaseInstance)
Expand Down
134 changes: 117 additions & 17 deletions src/database/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,105 @@ type Data = {
pathname: string;
};

type SizedData = Data & { size: number };

type ChunkedData = {
chunks: Data[] | null;
chunks: SizedData[] | null;
size: number;
};

/**
* Batches chunked JSON data up to the specified byte size limit.
*/
class BatchChunks extends stream.Transform {
private batch: SizedData[] = [];
private size = 0;

constructor(private maxSize: number, opts?: stream.TransformOptions) {
super({ ...opts, objectMode: true });
}

_transform(chunk: SizedData, _: BufferEncoding, callback: stream.TransformCallback): void {
const totalChunkSize = chunk.size + chunk.pathname.length; // Overestimate
if (this.size + totalChunkSize > this.maxSize) {
this.push(this.transformBatchToPatchData(this.batch));
this.batch = [];
this.size = 0;
}
this.batch.push(chunk);
this.size += totalChunkSize;
callback(null);
}

private transformBatchToPatchData(batch: SizedData[]): SizedData {
return this.sanitizePatchData(this.compactData(batch));
}

private compactData(batch: SizedData[]): SizedData {
if (batch.length === 1) {
return batch[0];
}
const pathname = this.findLongestCommonPrefixArray(batch.map((d) => d.pathname));
let json = {};
let size = 0;
for (const chunk of batch) {
const truncatedPath = chunk.pathname.substring(pathname.length + 1); // +1 to trim leading slash
json = Object.assign({}, json, { [truncatedPath]: chunk.json });
size += chunk.size;
}
return { json, pathname, size };
}

// Since we cannot PATCH primitives and arrays, we explicitly convert them to objects.
private sanitizePatchData({ json, pathname, size }: SizedData): SizedData {
if (typeof json === "string" || typeof json === "number" || typeof json === "boolean") {
const tokens = pathname.split("/");
const lastToken = tokens.pop();
return { json: { [lastToken!]: json }, pathname: tokens.join("/"), size };
}
if (Array.isArray(json)) {
return { json: { ...json }, pathname, size };
}
return { json, pathname, size };
}

private findLongestCommonPrefixArray(paths: string[]): string {
const findLongestCommonPrefixPair = (p: string, q: string): string => {
const pTokens = p.split("/");
const qTokens = q.split("/");
let prefix = pTokens.slice(0, qTokens.length);
for (let i = 0; i < prefix.length; i++) {
if (prefix[i] !== qTokens[i]) {
prefix = prefix.slice(0, i);
break;
}
}
return prefix.join("/");
};

if (paths.length === 0) {
return "";
}
let prefix = paths[0];
for (let i = 1; i < paths.length; i++) {
prefix = findLongestCommonPrefixPair(prefix, paths[i]);
}
return prefix;
}

_flush(callback: stream.TransformCallback): void {
if (this.size > 0) {
this.push(this.transformBatchToPatchData(this.batch));
}
callback(null);
}
}

/**
* Imports JSON data to a given RTDB instance.
*
* The data is parsed and chunked into subtrees of ~10 MB, to be subsequently written in parallel.
* The data is parsed and chunked into subtrees of the specified payload size, to be subsequently
* written in parallel.
*/
export default class DatabaseImporter {
private client: Client;
Expand All @@ -36,7 +126,7 @@ export default class DatabaseImporter {
private dbUrl: URL,
private inStream: stream.Readable,
private dataPath: string,
private chunkBytes: number,
private payloadSize: number,
concurrency: number
) {
this.client = new Client({ urlPrefix: dbUrl.origin, auth: true });
Expand Down Expand Up @@ -68,26 +158,35 @@ export default class DatabaseImporter {
}
}

/**
* The top-level objects are parsed and chunked, with each chunk capped at payloadSize. Then,
* chunks are batched, with each batch also capped at payloadSize. Finally, the batched chunks
* are written in parallel.
*
* In the case where the data contains very large objects, chunking ensures that the request is
* not too large. On the other hand, in the case where the data contains many small objects,
* batching ensures that there are not too many requests.
*/
private readAndWriteChunks(): Promise<ClientResponse<JsonType>[]> {
const { dbUrl } = this;
const { dbUrl, payloadSize } = this;
const chunkData = this.chunkData.bind(this);
const writeChunk = this.writeChunk.bind(this);
const doWriteBatch = this.doWriteBatch.bind(this);
const getJoinedPath = this.joinPath.bind(this);

const readChunks = new stream.Transform({ objectMode: true });
readChunks._transform = function (chunk: { key: string; value: JsonType }, _, done) {
const data = { json: chunk.value, pathname: getJoinedPath(dbUrl.pathname, chunk.key) };
const chunkedData = chunkData(data);
const chunks = chunkedData.chunks || [data];
const chunks = chunkedData.chunks || [{ ...data, size: JSON.stringify(data.json).length }];
for (const chunk of chunks) {
this.push(chunk);
}
done();
};

const writeChunks = new stream.Transform({ objectMode: true });
writeChunks._transform = async function (chunk: Data, _, done) {
const res = await writeChunk(chunk);
const writeBatch = new stream.Transform({ objectMode: true });
writeBatch._transform = async function (batch: SizedData, _, done) {
const res = await doWriteBatch(batch);
this.push(res);
done();
};
Expand Down Expand Up @@ -115,19 +214,20 @@ export default class DatabaseImporter {
)
)
.pipe(readChunks)
.pipe(writeChunks)
.pipe(new BatchChunks(payloadSize))
.pipe(writeBatch)
.on("data", (res: ClientResponse<JsonType>) => responses.push(res))
.on("error", reject)
.once("end", () => resolve(responses));
});
}

private writeChunk(chunk: Data): Promise<ClientResponse<JsonType>> {
private doWriteBatch(batch: SizedData): Promise<ClientResponse<JsonType>> {
const doRequest = (): Promise<ClientResponse<JsonType>> => {
return this.client.request({
method: "PUT",
path: chunk.pathname + ".json",
body: JSON.stringify(chunk.json),
method: "PATCH",
path: `${batch.pathname}.json`,
body: batch.json,
queryParams: this.dbUrl.searchParams,
});
};
Expand Down Expand Up @@ -157,7 +257,7 @@ export default class DatabaseImporter {
// Children node
let size = 2; // {}

const chunks = [];
const chunks: SizedData[] = [];
let hasChunkedChild = false;

for (const [key, val] of Object.entries(json)) {
Expand All @@ -170,11 +270,11 @@ export default class DatabaseImporter {
hasChunkedChild = true;
chunks.push(...childChunks.chunks);
} else {
chunks.push(child);
chunks.push({ ...child, size: childChunks.size });
}
}

if (hasChunkedChild || size >= this.chunkBytes) {
if (hasChunkedChild || size >= this.payloadSize) {
return { chunks, size };
} else {
return { chunks: null, size };
Expand Down
Loading