Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
- Fix function deploy retry after quota exceeded bug and increase backoff. (#5601)
- Fix bug where EVENTARC_CLOUD_EVENT_SOURCE environment variable was correctly set for some functions. (#5597)
- Adds 2nd gen firestore triggers to firebase deploy (#5592).
- Adds `database:import` command for non-atomic imports (#5396).
104 changes: 104 additions & 0 deletions src/commands/database-import.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import * as clc from "colorette";
import * as fs from "fs";
import * as utils from "../utils";

import { Command } from "../command";
import DatabaseImporter from "../database/import";
import { Emulators } from "../emulator/types";
import { FirebaseError } from "../error";
import { logger } from "../logger";
import { needProjectId } from "../projectUtils";
import { Options } from "../options";
import { printNoticeIfEmulated } from "../emulator/commandUtils";
import { promptOnce } from "../prompt";
import { DatabaseInstance, populateInstanceDetails } from "../management/database";
import { realtimeOriginOrEmulatorOrCustomUrl } from "../database/api";
import { requireDatabaseInstance } from "../requireDatabaseInstance";
import { requirePermissions } from "../requirePermissions";

interface DatabaseImportOptions extends Options {
instance: string;
instanceDetails: DatabaseInstance;
disableTriggers?: boolean;
filter?: string;
}

export const command = new Command("database:import <path> [infile]")
.description(
"non-atomically import the contents of a JSON file to the specified path in Realtime Database"
)
.withForce()
.option(
"--instance <instance>",
"use the database <instance>.firebaseio.com (if omitted, use default database instance)"
)
.option(
"--disable-triggers",
"suppress any Cloud functions triggered by this operation, default to true",
true
)
.option(
"--filter <dataPath>",
"import only data at this path in the JSON file (if omitted, import entire file)"
)
.before(requirePermissions, ["firebasedatabase.instances.update"])
.before(requireDatabaseInstance)
.before(populateInstanceDetails)
.before(printNoticeIfEmulated, Emulators.DATABASE)
.action(async (path: string, infile: string | undefined, options: DatabaseImportOptions) => {
if (!path.startsWith("/")) {
throw new FirebaseError("Path must begin with /");
}

if (!infile) {
throw new FirebaseError("No file supplied");
}

const projectId = needProjectId(options);
const origin = realtimeOriginOrEmulatorOrCustomUrl(options.instanceDetails.databaseUrl);
const dbPath = utils.getDatabaseUrl(origin, options.instance, path);
const dbUrl = new URL(dbPath);
if (options.disableTriggers) {
dbUrl.searchParams.set("disableTriggers", "true");
}

const confirm = await promptOnce(
{
type: "confirm",
name: "force",
default: false,
message: "You are about to import data to " + clc.cyan(dbPath) + ". Are you sure?",
},
options
);
if (!confirm) {
throw new FirebaseError("Command aborted.");
}

const inStream = fs.createReadStream(infile);
const dataPath = options.filter || "";
const importer = new DatabaseImporter(dbUrl, inStream, dataPath);

let responses;
try {
responses = await importer.execute();
} catch (err: any) {
if (err instanceof FirebaseError) {
throw err;
}
logger.debug(err);
throw new FirebaseError(`Unexpected error while importing data: ${err}`, { exit: 2 });
}

if (responses.length) {
utils.logSuccess("Data persisted successfully");
} else {
utils.logWarning("No data was persisted. Check the data path supplied.");
}

logger.info();
logger.info(
clc.bold("View data at:"),
utils.getDatabaseViewDataUrl(origin, projectId, options.instance, path)
);
});
1 change: 1 addition & 0 deletions src/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export function load(client: any): any {
client.crashlytics.mappingfile.upload = loadCommand("crashlytics-mappingfile-upload");
client.database = {};
client.database.get = loadCommand("database-get");
client.database.import = loadCommand("database-import");
client.database.instances = {};
client.database.instances.create = loadCommand("database-instances-create");
client.database.instances.list = loadCommand("database-instances-list");
Expand Down
176 changes: 176 additions & 0 deletions src/database/import.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import * as Chain from "stream-chain";
import * as clc from "colorette";
import * as Filter from "stream-json/filters/Filter";
import * as stream from "stream";
import * as StreamObject from "stream-json/streamers/StreamObject";

import pLimit from "p-limit";

import { URL } from "url";
import { Client, ClientResponse } from "../apiv2";
import { FirebaseError } from "../error";

const MAX_CHUNK_SIZE = 1024 * 1024 * 10;
const CONCURRENCY_LIMIT = 5;

type JsonType = { [key: string]: JsonType } | string | number | boolean;

type Data = {
json: JsonType;
pathname: string;
};

type ChunkedData = {
chunks: Data[] | null;
size: number;
};

/**
* Imports JSON data to a given RTDB instance.
*
* The data is parsed and chunked into subtrees of ~10 MB, to be subsequently written in parallel.
*/
export default class DatabaseImporter {
private client: Client;
private limit = pLimit(CONCURRENCY_LIMIT);

constructor(
private dbUrl: URL,
private inStream: stream.Readable,
private dataPath: string,
private chunkSize = MAX_CHUNK_SIZE
) {
this.client = new Client({ urlPrefix: dbUrl.origin, auth: true });
}

/**
* Writes the chunked data to RTDB. Any existing data at the specified location will be overwritten.
*/
async execute(): Promise<ClientResponse<JsonType>[]> {
await this.checkLocationIsEmpty();
return this.readAndWriteChunks();
}

private async checkLocationIsEmpty(): Promise<void> {
const response = await this.client.request({
method: "GET",
path: this.dbUrl.pathname + ".json",
queryParams: { shallow: "true" },
});

if (response.body) {
throw new FirebaseError(
"Importing is only allowed for an empty location. Delete all data by running " +
clc.bold(`firebase database:remove ${this.dbUrl.pathname} --disable-triggers`) +
", then rerun this command.",
{ exit: 2 }
);
}
}

private readAndWriteChunks(): Promise<ClientResponse<JsonType>[]> {
const { dbUrl } = this;
const chunkData = this.chunkData.bind(this);
const writeChunk = this.writeChunk.bind(this);
const getJoinedPath = this.joinPath.bind(this);

const readChunks = new stream.Transform({ objectMode: true });
readChunks._transform = function (chunk: { key: string; value: JsonType }, _, done) {
const data = { json: chunk.value, pathname: getJoinedPath(dbUrl.pathname, chunk.key) };
const chunkedData = chunkData(data);
const chunks = chunkedData.chunks || [data];
for (const chunk of chunks) {
this.push(chunk);
}
done();
};

const writeChunks = new stream.Transform({ objectMode: true });
writeChunks._transform = async function (chunk: Data, _, done) {
const res = await writeChunk(chunk);
this.push(res);
done();
};

return new Promise((resolve, reject) => {
const responses: ClientResponse<JsonType>[] = [];
const pipeline = new Chain([
this.inStream,
Filter.withParser({
filter: this.computeFilterString(this.dataPath) || (() => true),
pathSeparator: "/",
}),
StreamObject.streamObject(),
]);
pipeline
.on("error", (err: Error) =>
reject(
new FirebaseError(
`Invalid data; couldn't parse JSON object, array, or value. ${err.message}`,
{
original: err,
exit: 2,
}
)
)
)
.pipe(readChunks)
.pipe(writeChunks)
.on("data", (res: ClientResponse<JsonType>) => responses.push(res))
.on("error", reject)
.once("end", () => resolve(responses));
});
}

private writeChunk(chunk: Data): Promise<ClientResponse<JsonType>> {
return this.limit(() =>
this.client.request({
method: "PUT",
path: chunk.pathname + ".json",
body: JSON.stringify(chunk.json),
queryParams: this.dbUrl.searchParams,
})
);
}

private chunkData({ json, pathname }: Data): ChunkedData {
if (typeof json === "string" || typeof json === "number" || typeof json === "boolean") {
// Leaf node, cannot be chunked
return { chunks: null, size: JSON.stringify(json).length };
} else {
// Children node
let size = 2; // {}

const chunks = [];
let hasChunkedChild = false;

for (const [key, val] of Object.entries(json)) {
size += key.length + 3; // "":

const child = { json: val, pathname: this.joinPath(pathname, key) };
const childChunks = this.chunkData(child);
size += childChunks.size;
if (childChunks.chunks) {
hasChunkedChild = true;
chunks.push(...childChunks.chunks);
} else {
chunks.push(child);
}
}

if (hasChunkedChild || size >= this.chunkSize) {
return { chunks, size };
} else {
return { chunks: null, size };
}
}
}

private computeFilterString(dataPath: string): string {
return dataPath.split("/").filter(Boolean).join("/");
}

private joinPath(root: string, key: string): string {
return [root, key].join("/").replace("//", "/");
}
}
92 changes: 92 additions & 0 deletions src/test/database/import.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import * as nock from "nock";
import * as stream from "stream";
import * as utils from "../../utils";
import { expect } from "chai";

import DatabaseImporter from "../../database/import";
import { FirebaseError } from "../../error";

const dbUrl = new URL("https://test-db.firebaseio.com/foo");

describe("DatabaseImporter", () => {
const DATA = { a: 100, b: [true, "bar", { f: { g: 0, h: 1 }, i: "baz" }] };
let DATA_STREAM: stream.Readable;

beforeEach(() => {
DATA_STREAM = utils.stringToStream(JSON.stringify(DATA))!;
});

it("throws FirebaseError when JSON is invalid", async () => {
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
const INVALID_JSON = '{"a": {"b"}}';
const importer = new DatabaseImporter(
dbUrl,
utils.stringToStream(INVALID_JSON)!,
/* importPath= */ "/"
);

await expect(importer.execute()).to.be.rejectedWith(
FirebaseError,
"Invalid data; couldn't parse JSON object, array, or value."
);
});

it("chunks data in top-level objects", async () => {
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200);
nock("https://test-db.firebaseio.com")
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
.reply(200);
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/");

const responses = await importer.execute();

expect(responses).to.have.length(2);
expect(nock.isDone()).to.be.true;
});

it("chunks data according to provided chunk size", async () => {
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
nock("https://test-db.firebaseio.com").put("/foo/a.json", "100").reply(200);
nock("https://test-db.firebaseio.com").put("/foo/b/0.json", "true").reply(200);
nock("https://test-db.firebaseio.com").put("/foo/b/1.json", '"bar"').reply(200);
nock("https://test-db.firebaseio.com")
.put("/foo/b/2/f.json", JSON.stringify({ g: 0, h: 1 }))
.reply(200);
nock("https://test-db.firebaseio.com").put("/foo/b/2/i.json", '"baz"').reply(200);
const importer = new DatabaseImporter(
dbUrl,
DATA_STREAM,
/* importPath= */ "/",
/* chunkSize= */ 20
);

const responses = await importer.execute();

expect(responses).to.have.length(5);
expect(nock.isDone()).to.be.true;
});

it("imports from data path", async () => {
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200);
nock("https://test-db.firebaseio.com")
.put("/foo/b.json", JSON.stringify([true, "bar", { f: { g: 0, h: 1 }, i: "baz" }]))
.reply(200);
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/b");

const responses = await importer.execute();

expect(responses).to.have.length(1);
expect(nock.isDone()).to.be.true;
});

it("throws FirebaseError when data location is nonempty", async () => {
nock("https://test-db.firebaseio.com").get("/foo.json?shallow=true").reply(200, { a: "foo" });
const importer = new DatabaseImporter(dbUrl, DATA_STREAM, /* importPath= */ "/");

await expect(importer.execute()).to.be.rejectedWith(
FirebaseError,
/Importing is only allowed for an empty location./
);
});
});