-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add database:import command for non-atomic import #5396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
731a6cb
Import large data file using chunked writes
tohhsinpei 3d1bd1a
Add unit tests
tohhsinpei 29d33d7
Separate database:import command; add unit test for request
tohhsinpei f104a69
Add test case for array in JSON data
tohhsinpei 25e9c4e
Address PR feedback
tohhsinpei afa6795
Disallow STDIN and arg data input
tohhsinpei 077cc97
Stream top-level JSON objects
tohhsinpei de2e28b
Support importing at data path
tohhsinpei 972302a
Update dependencies
tohhsinpei c13b9d9
Merge branch 'master' into hsinpei/database-set-chunked
tohhsinpei 714dbc3
Minor rewording of CLI prompt
tohhsinpei 4e444b5
Add CHANGELOG entry
tohhsinpei 90991b9
Run npm install
tohhsinpei 4956dcc
Address PR feedback
tohhsinpei 8d70afc
Address PR feedback
tohhsinpei 2650a89
Merge branch 'master' into hsinpei/database-set-chunked
tohhsinpei 505767e
Increase chunk size to 10 MB
tohhsinpei 2a9b658
Merge branch 'master' into hsinpei/database-set-chunked
tohhsinpei File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| import * as clc from "colorette"; | ||
| import * as fs from "fs"; | ||
| import * as utils from "../utils"; | ||
|
|
||
| import { Command } from "../command"; | ||
| import DatabaseImporter from "../database/import"; | ||
| import { Emulators } from "../emulator/types"; | ||
| import { FirebaseError } from "../error"; | ||
| import { logger } from "../logger"; | ||
| import { needProjectId } from "../projectUtils"; | ||
| import { Options } from "../options"; | ||
| import { printNoticeIfEmulated } from "../emulator/commandUtils"; | ||
| import { promptOnce } from "../prompt"; | ||
| import { DatabaseInstance, populateInstanceDetails } from "../management/database"; | ||
| import { realtimeOriginOrEmulatorOrCustomUrl } from "../database/api"; | ||
| import { requireDatabaseInstance } from "../requireDatabaseInstance"; | ||
| import { requirePermissions } from "../requirePermissions"; | ||
|
|
||
| interface DatabaseImportOptions extends Options { | ||
| instance: string; | ||
| instanceDetails: DatabaseInstance; | ||
| disableTriggers?: boolean; | ||
| filter?: string; | ||
| } | ||
|
|
||
| export const command = new Command("database:import <path> [infile]") | ||
| .description( | ||
| "non-atomically import the contents of a JSON file to the specified path in Realtime Database" | ||
| ) | ||
| .withForce() | ||
| .option( | ||
| "--instance <instance>", | ||
| "use the database <instance>.firebaseio.com (if omitted, use default database instance)" | ||
| ) | ||
| .option( | ||
| "--disable-triggers", | ||
| "suppress any Cloud functions triggered by this operation, default to true", | ||
| true | ||
| ) | ||
| .option( | ||
| "--filter <dataPath>", | ||
| "import only data at this path in the JSON file (if omitted, import entire file)" | ||
| ) | ||
| .before(requirePermissions, ["firebasedatabase.instances.update"]) | ||
| .before(requireDatabaseInstance) | ||
| .before(populateInstanceDetails) | ||
| .before(printNoticeIfEmulated, Emulators.DATABASE) | ||
| .action(async (path: string, infile: string | undefined, options: DatabaseImportOptions) => { | ||
| if (!path.startsWith("/")) { | ||
| throw new FirebaseError("Path must begin with /"); | ||
| } | ||
|
|
||
| if (!infile) { | ||
| throw new FirebaseError("No file supplied"); | ||
| } | ||
|
|
||
| const projectId = needProjectId(options); | ||
| const origin = realtimeOriginOrEmulatorOrCustomUrl(options.instanceDetails.databaseUrl); | ||
| const dbPath = utils.getDatabaseUrl(origin, options.instance, path); | ||
| const dbUrl = new URL(dbPath); | ||
| if (options.disableTriggers) { | ||
| dbUrl.searchParams.set("disableTriggers", "true"); | ||
| } | ||
|
|
||
| const confirm = await promptOnce( | ||
| { | ||
| type: "confirm", | ||
| name: "force", | ||
| default: false, | ||
| message: "You are about to import data to " + clc.cyan(dbPath) + ". Are you sure?", | ||
| }, | ||
| options | ||
| ); | ||
| if (!confirm) { | ||
| throw new FirebaseError("Command aborted."); | ||
| } | ||
|
|
||
| const inStream = fs.createReadStream(infile); | ||
| const dataPath = options.filter || ""; | ||
| const importer = new DatabaseImporter(dbUrl, inStream, dataPath); | ||
|
|
||
| let responses; | ||
| try { | ||
| responses = await importer.execute(); | ||
| } catch (err: any) { | ||
| if (err instanceof FirebaseError) { | ||
| throw err; | ||
| } | ||
| logger.debug(err); | ||
| throw new FirebaseError(`Unexpected error while importing data: ${err}`, { exit: 2 }); | ||
| } | ||
|
|
||
| if (responses.length) { | ||
| utils.logSuccess("Data persisted successfully"); | ||
| } else { | ||
| utils.logWarning("No data was persisted. Check the data path supplied."); | ||
| } | ||
|
|
||
| logger.info(); | ||
| logger.info( | ||
| clc.bold("View data at:"), | ||
| utils.getDatabaseViewDataUrl(origin, projectId, options.instance, path) | ||
| ); | ||
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| import * as Chain from "stream-chain"; | ||
| import * as clc from "colorette"; | ||
| import * as Filter from "stream-json/filters/Filter"; | ||
| import * as stream from "stream"; | ||
| import * as StreamObject from "stream-json/streamers/StreamObject"; | ||
|
|
||
| import pLimit from "p-limit"; | ||
|
|
||
| import { URL } from "url"; | ||
| import { Client, ClientResponse } from "../apiv2"; | ||
| import { FirebaseError } from "../error"; | ||
|
|
||
| const MAX_CHUNK_SIZE = 1024 * 1024 * 10; | ||
| const CONCURRENCY_LIMIT = 5; | ||
|
|
||
| type JsonType = { [key: string]: JsonType } | string | number | boolean; | ||
|
|
||
| type Data = { | ||
| json: JsonType; | ||
| pathname: string; | ||
| }; | ||
|
|
||
| type ChunkedData = { | ||
| chunks: Data[] | null; | ||
| size: number; | ||
| }; | ||
|
|
||
| /** | ||
| * Imports JSON data to a given RTDB instance. | ||
| * | ||
| * The data is parsed and chunked into subtrees of ~10 MB, to be subsequently written in parallel. | ||
| */ | ||
| export default class DatabaseImporter { | ||
| private client: Client; | ||
| private limit = pLimit(CONCURRENCY_LIMIT); | ||
|
|
||
| constructor( | ||
| private dbUrl: URL, | ||
| private inStream: stream.Readable, | ||
| private dataPath: string, | ||
| private chunkSize = MAX_CHUNK_SIZE | ||
| ) { | ||
| this.client = new Client({ urlPrefix: dbUrl.origin, auth: true }); | ||
| } | ||
|
|
||
| /** | ||
| * Writes the chunked data to RTDB. Any existing data at the specified location will be overwritten. | ||
| */ | ||
| async execute(): Promise<ClientResponse<JsonType>[]> { | ||
| await this.checkLocationIsEmpty(); | ||
tohhsinpei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return this.readAndWriteChunks(); | ||
| } | ||
|
|
||
| private async checkLocationIsEmpty(): Promise<void> { | ||
| const response = await this.client.request({ | ||
| method: "GET", | ||
| path: this.dbUrl.pathname + ".json", | ||
| queryParams: { shallow: "true" }, | ||
| }); | ||
|
|
||
| if (response.body) { | ||
| throw new FirebaseError( | ||
| "Importing is only allowed for an empty location. Delete all data by running " + | ||
| clc.bold(`firebase database:remove ${this.dbUrl.pathname} --disable-triggers`) + | ||
| ", then rerun this command.", | ||
| { exit: 2 } | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| private readAndWriteChunks(): Promise<ClientResponse<JsonType>[]> { | ||
| const { dbUrl } = this; | ||
| const chunkData = this.chunkData.bind(this); | ||
| const writeChunk = this.writeChunk.bind(this); | ||
| const getJoinedPath = this.joinPath.bind(this); | ||
|
|
||
| const readChunks = new stream.Transform({ objectMode: true }); | ||
| readChunks._transform = function (chunk: { key: string; value: JsonType }, _, done) { | ||
| const data = { json: chunk.value, pathname: getJoinedPath(dbUrl.pathname, chunk.key) }; | ||
| const chunkedData = chunkData(data); | ||
| const chunks = chunkedData.chunks || [data]; | ||
| for (const chunk of chunks) { | ||
| this.push(chunk); | ||
| } | ||
| done(); | ||
| }; | ||
|
|
||
| const writeChunks = new stream.Transform({ objectMode: true }); | ||
| writeChunks._transform = async function (chunk: Data, _, done) { | ||
| const res = await writeChunk(chunk); | ||
| this.push(res); | ||
| done(); | ||
| }; | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| const responses: ClientResponse<JsonType>[] = []; | ||
| const pipeline = new Chain([ | ||
| this.inStream, | ||
| Filter.withParser({ | ||
| filter: this.computeFilterString(this.dataPath) || (() => true), | ||
| pathSeparator: "/", | ||
| }), | ||
| StreamObject.streamObject(), | ||
| ]); | ||
| pipeline | ||
| .on("error", (err: Error) => | ||
| reject( | ||
| new FirebaseError( | ||
| `Invalid data; couldn't parse JSON object, array, or value. ${err.message}`, | ||
| { | ||
| original: err, | ||
| exit: 2, | ||
| } | ||
| ) | ||
| ) | ||
| ) | ||
| .pipe(readChunks) | ||
| .pipe(writeChunks) | ||
| .on("data", (res: ClientResponse<JsonType>) => responses.push(res)) | ||
| .on("error", reject) | ||
| .once("end", () => resolve(responses)); | ||
| }); | ||
| } | ||
|
|
||
| private writeChunk(chunk: Data): Promise<ClientResponse<JsonType>> { | ||
| return this.limit(() => | ||
| this.client.request({ | ||
| method: "PUT", | ||
| path: chunk.pathname + ".json", | ||
| body: JSON.stringify(chunk.json), | ||
| queryParams: this.dbUrl.searchParams, | ||
| }) | ||
| ); | ||
| } | ||
|
|
||
| private chunkData({ json, pathname }: Data): ChunkedData { | ||
| if (typeof json === "string" || typeof json === "number" || typeof json === "boolean") { | ||
| // Leaf node, cannot be chunked | ||
| return { chunks: null, size: JSON.stringify(json).length }; | ||
| } else { | ||
| // Children node | ||
| let size = 2; // {} | ||
|
|
||
| const chunks = []; | ||
| let hasChunkedChild = false; | ||
|
|
||
| for (const [key, val] of Object.entries(json)) { | ||
| size += key.length + 3; // "": | ||
|
|
||
| const child = { json: val, pathname: this.joinPath(pathname, key) }; | ||
| const childChunks = this.chunkData(child); | ||
| size += childChunks.size; | ||
| if (childChunks.chunks) { | ||
| hasChunkedChild = true; | ||
| chunks.push(...childChunks.chunks); | ||
| } else { | ||
| chunks.push(child); | ||
| } | ||
| } | ||
|
|
||
| if (hasChunkedChild || size >= this.chunkSize) { | ||
| return { chunks, size }; | ||
| } else { | ||
| return { chunks: null, size }; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private computeFilterString(dataPath: string): string { | ||
| return dataPath.split("/").filter(Boolean).join("/"); | ||
| } | ||
|
|
||
| private joinPath(root: string, key: string): string { | ||
| return [root, key].join("/").replace("//", "/"); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| import * as nock from "nock"; | ||
| import * as stream from "stream"; | ||
| import * as utils from "../../utils"; | ||
| import { expect } from "chai"; | ||
|
|
||
| import DatabaseImporter from "../../database/import"; | ||
| import { FirebaseError } from "../../error"; | ||
|
|
||
| const dbUrl = new URL("https://test-db.firebaseio.com/foo"); | ||
|
|
||
| describe("DatabaseImporter", () => { | ||
| const DATA = { a: 100, b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] }; | ||
| let DATA_STREAM: stream.Readable; | ||
|
|
||
| beforeEach(() => { | ||
| DATA_STREAM = utils.stringToStream(JSON.stringify(DATA))!; | ||
| }); | ||
|
|
||
| it("throws FirebaseError when JSON is invalid", async () => { | ||
| nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200); | ||
| const INVALID_JSON = '{"a": {"b"}}'; | ||
| const importer = new DatabaseImporter( | ||
| dbUrl, | ||
| utils.stringToStream(INVALID_JSON)!, | ||
| /* importPath= */ "/" | ||
| ); | ||
|
|
||
| await expect(importer.execute()).to.be.rejectedWith( | ||
| FirebaseError, | ||
| "Invalid data; couldn't parse JSON object, array, or value." | ||
| ); | ||
| }); | ||
|
|
||
| it("chunks data in top-level objects", async () => { | ||
| nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200); | ||
| nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200); | ||
| nock("https://test-db.firebaseio.com") | ||
| .put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }])) | ||
| .reply(200); | ||
| const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/"); | ||
|
|
||
| const responses = await importer.execute(); | ||
tohhsinpei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| expect(responses).to.have.length(2); | ||
| expect(nock.isDone()).to.be.true; | ||
| }); | ||
|
|
||
| it("chunks data according to provided chunk size", async () => { | ||
| nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200); | ||
| nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200); | ||
| nock("https://test-db.firebaseio.com").put("/foo/b/0.json", "true").reply(200); | ||
| nock("https://test-db.firebaseio.com").put("/foo/b/1.json", '"bar"').reply(200); | ||
| nock("https://test-db.firebaseio.com") | ||
| .put("/foo/b/2/f.json", JSON.stringify({ g: 0, h: 1 })) | ||
| .reply(200); | ||
| nock("https://test-db.firebaseio.com").put("/foo/b/2/i.json", '"baz"').reply(200); | ||
| const importer = new DatabaseImporter( | ||
| dbUrl, | ||
| DATA_STREAM, | ||
| /* importPath= */ "/", | ||
| /* chunkSize= */ 20 | ||
| ); | ||
|
|
||
| const responses = await importer.execute(); | ||
|
|
||
| expect(responses).to.have.length(5); | ||
| expect(nock.isDone()).to.be.true; | ||
| }); | ||
|
|
||
| it("imports from data path", async () => { | ||
| nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200); | ||
| nock("https://test-db.firebaseio.com") | ||
| .put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }])) | ||
| .reply(200); | ||
| const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/b"); | ||
|
|
||
| const responses = await importer.execute(); | ||
|
|
||
| expect(responses).to.have.length(1); | ||
| expect(nock.isDone()).to.be.true; | ||
| }); | ||
|
|
||
| it("throws FirebaseError when data location is nonempty", async () => { | ||
| nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200, { a: "foo" }); | ||
| const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/"); | ||
|
|
||
| await expect(importer.execute()).to.be.rejectedWith( | ||
| FirebaseError, | ||
| /Importing is only allowed for an empty location./ | ||
| ); | ||
| }); | ||
| }); | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.