Skip to content

Commit

Permalink
fix(firestore-bigquery-export): disable onInstall backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
cabljac committed Mar 26, 2024
1 parent 8c0ce7c commit ffcf2fe
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 135 deletions.
8 changes: 0 additions & 8 deletions firestore-bigquery-export/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,8 @@ essential for the script to insert data into an already partitioned table.)

* Exclude old data payloads: If enabled, table rows will never contain old data (document snapshot before the update), which should be more performant, and avoid potential resource limitations.

* Import existing Firestore documents into BigQuery?: Do you want to import existing documents from your Firestore collection into BigQuery? These documents will have each have a special changelog with the operation of `IMPORT` and the timestamp of epoch. This ensures that any operation on an imported document supersedes the import record.

* Existing Documents Collection: Specify the path of the Cloud Firestore Collection you would like to import from. This may or may not be the same Collection for which you plan to mirror changes. If you want to use a collectionGroup query, provide the collection name value here, and set 'Use Collection Group query' to true. You may use `{wildcard}` notation with an enabled collectionGroup query to match a subcollection of all documents in a collection (e.g., `chatrooms/{chatid}/posts`).

* Use Collection Group query: Do you want to use a [collection group](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) query for importing existing documents? You have to enable collectionGroup query if your import path contains subcollections. Warning: A collectionGroup query will target every collection in your Firestore project that matches the 'Existing documents collection'. For example, if you have 10,000 documents with a subcollection named: landmarks, this will query every document in 10,000 landmarks collections.

* Docs per backfill: When importing existing documents, how many should be imported at once? The default value of 200 should be ok for most users. If you are using a transform function or have very large documents, you may need to set this to a lower number. If the lifecycle event function times out, lower this value.

* Cloud KMS key name: Instead of Google managing the key encryption keys that protect your data, you control and manage key encryption keys in Cloud KMS. If this parameter is set, the extension will specify the KMS key name when creating the BQ table. See the PREINSTALL.md for more details.


Expand All @@ -174,8 +168,6 @@ essential for the script to insert data into an already partitioned table.)

* **fsexportbigquery:** Listens for document changes in your specified Cloud Firestore collection, then exports the changes into BigQuery.

* **fsimportexistingdocs:** Imports exisitng documents from the specified collection into BigQuery. Imported documents will have a special changelog with the operation of `IMPORT` and the timestamp of epoch.

* **syncBigQuery:** A task-triggered function that gets called on BigQuery sync

* **initBigQuerySync:** Runs configuration for sycning with BigQuery
Expand Down
120 changes: 60 additions & 60 deletions firestore-bigquery-export/extension.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ resources:
eventType: providers/cloud.firestore/eventTypes/document.write
resource: projects/${param:PROJECT_ID}/databases/${param:DATABASE_ID}/documents/${param:COLLECTION_PATH}/{documentId}

- name: fsimportexistingdocs
type: firebaseextensions.v1beta.function
description:
Imports exisitng documents from the specified collection into BigQuery.
Imported documents will have a special changelog with the operation of
`IMPORT` and the timestamp of epoch.
properties:
runtime: nodejs18
taskQueueTrigger:
retryConfig:
maxAttempts: 15
minBackoffSeconds: 60
# - name: fsimportexistingdocs
# type: firebaseextensions.v1beta.function
# description:
# Imports exisitng documents from the specified collection into BigQuery.
# Imported documents will have a special changelog with the operation of
# `IMPORT` and the timestamp of epoch.
# properties:
# runtime: nodejs18
# taskQueueTrigger:
# retryConfig:
# maxAttempts: 15
# minBackoffSeconds: 60

- name: syncBigQuery
type: firebaseextensions.v1beta.function
Expand Down Expand Up @@ -405,39 +405,39 @@ params:
- label: No
value: no

- param: DO_BACKFILL
label: Import existing Firestore documents into BigQuery?
description: >-
Do you want to import existing documents from your Firestore collection
into BigQuery? These documents will have each have a special changelog
with the operation of `IMPORT` and the timestamp of epoch. This ensures
that any operation on an imported document supersedes the import record.
type: select
required: true
default: no
options:
- label: Yes
value: yes
- label: No
value: no

- param: IMPORT_COLLECTION_PATH
label: Existing Documents Collection
description: >-
Specify the path of the Cloud Firestore Collection you would like to
import from. This may or may not be the same Collection for which you plan
to mirror changes. If you want to use a collectionGroup query, provide the
collection name value here, and set 'Use Collection Group query' to true.
You may use `{wildcard}` notation with an enabled collectionGroup query to
match a subcollection of all documents in a collection (e.g.,
`chatrooms/{chatid}/posts`).
type: string
validationRegex: "^[^/]+(/[^/]+/[^/]+)*$"
validationErrorMessage:
Firestore collection paths must be an odd number of segments separated by
slashes, e.g. "path/to/collection".
example: posts
required: false
# - param: DO_BACKFILL
# label: Import existing Firestore documents into BigQuery?
# description: >-
# Do you want to import existing documents from your Firestore collection
# into BigQuery? These documents will have each have a special changelog
# with the operation of `IMPORT` and the timestamp of epoch. This ensures
# that any operation on an imported document supersedes the import record.
# type: select
# required: true
# default: no
# options:
# - label: Yes
# value: yes
# - label: No
# value: no

# - param: IMPORT_COLLECTION_PATH
# label: Existing Documents Collection
# description: >-
# Specify the path of the Cloud Firestore Collection you would like to
# import from. This may or may not be the same Collection for which you plan
# to mirror changes. If you want to use a collectionGroup query, provide the
# collection name value here, and set 'Use Collection Group query' to true.
# You may use `{wildcard}` notation with an enabled collectionGroup query to
# match a subcollection of all documents in a collection (e.g.,
# `chatrooms/{chatid}/posts`).
# type: string
# validationRegex: "^[^/]+(/[^/]+/[^/]+)*$"
# validationErrorMessage:
# Firestore collection paths must be an odd number of segments separated by
# slashes, e.g. "path/to/collection".
# example: posts
# required: false

- param: USE_COLLECTION_GROUP_QUERY
label: Use Collection Group query
Expand All @@ -458,20 +458,20 @@ params:
- label: No
value: no

- param: DOCS_PER_BACKFILL
label: Docs per backfill
description: >-
When importing existing documents, how many should be imported at once?
The default value of 200 should be ok for most users. If you are using a
transform function or have very large documents, you may need to set this
to a lower number. If the lifecycle event function times out, lower this
value.
type: string
example: 200
validationRegex: "^[1-9][0-9]*$"
validationErrorMessage: Must be a postive integer.
default: 200
required: true
# - param: DOCS_PER_BACKFILL
# label: Docs per backfill
# description: >-
# When importing existing documents, how many should be imported at once?
# The default value of 200 should be ok for most users. If you are using a
# transform function or have very large documents, you may need to set this
# to a lower number. If the lifecycle event function times out, lower this
# value.
# type: string
# example: 200
# validationRegex: "^[1-9][0-9]*$"
# validationErrorMessage: Must be a postive integer.
# default: 200
# required: true

- param: KMS_KEY_NAME
label: Cloud KMS key name
Expand Down Expand Up @@ -514,7 +514,7 @@ lifecycleEvents:
onInstall:
function: initBigQuerySync
processingMessage:
Configuring BigQuery Sync and running import if configured.
Configuring BigQuery Sync.
onUpdate:
function: setupBigQuerySync
processingMessage: Configuring BigQuery Sync
Expand Down
134 changes: 67 additions & 67 deletions firestore-bigquery-export/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export const initBigQuerySync = functions.tasks
await eventTracker.initialize();

/** Run Backfill */
if (config.doBackfill) {
if (false) {
await getFunctions()
.taskQueue(
`locations/${config.location}/functions/fsimportexistingdocs`,
Expand All @@ -207,69 +207,69 @@ export const initBigQuerySync = functions.tasks
return;
});

exports.fsimportexistingdocs = functions.tasks
.taskQueue()
.onDispatch(async (data, context) => {
const runtime = getExtensions().runtime();
if (!config.doBackfill || !config.importCollectionPath) {
await runtime.setProcessingState(
"PROCESSING_COMPLETE",
"Completed. No existing documents imported into BigQuery."
);
return;
}

const offset = (data["offset"] as number) ?? 0;
const docsCount = (data["docsCount"] as number) ?? 0;

const query = config.useCollectionGroupQuery
? getFirestore(config.databaseId).collectionGroup(
config.importCollectionPath.split("/")[
config.importCollectionPath.split("/").length - 1
]
)
: getFirestore(config.databaseId).collection(config.importCollectionPath);

const snapshot = await query
.offset(offset)
.limit(config.docsPerBackfill)
.get();

const rows = snapshot.docs.map((d) => {
return {
timestamp: new Date().toISOString(),
operation: ChangeType.IMPORT,
documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`,
documentId: d.id,
eventId: "",
pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path),
data: eventTracker.serializeData(d.data()),
};
});
try {
await eventTracker.record(rows);
} catch (err: any) {
/** If configured, event tracker wil handle failed rows in a backup collection */
functions.logger.log(err);
}
if (rows.length == config.docsPerBackfill) {
// There are more documents to import - enqueue another task to continue the backfill.
const queue = getFunctions().taskQueue(
`locations/${config.location}/functions/fsimportexistingdocs`,
config.instanceId
);
await queue.enqueue({
offset: offset + config.docsPerBackfill,
docsCount: docsCount + rows.length,
});
} else {
// We are finished, set the processing state to report back how many docs were imported.
runtime.setProcessingState(
"PROCESSING_COMPLETE",
`Successfully imported ${
docsCount + rows.length
} documents into BigQuery`
);
}
await events.recordCompletionEvent({ context });
});
// exports.fsimportexistingdocs = functions.tasks
// .taskQueue()
// .onDispatch(async (data, context) => {
// const runtime = getExtensions().runtime();
// if (!config.doBackfill || !config.importCollectionPath) {
// await runtime.setProcessingState(
// "PROCESSING_COMPLETE",
// "Completed. No existing documents imported into BigQuery."
// );
// return;
// }

// const offset = (data["offset"] as number) ?? 0;
// const docsCount = (data["docsCount"] as number) ?? 0;

// const query = config.useCollectionGroupQuery
// ? getFirestore(config.databaseId).collectionGroup(
// config.importCollectionPath.split("/")[
// config.importCollectionPath.split("/").length - 1
// ]
// )
// : getFirestore(config.databaseId).collection(config.importCollectionPath);

// const snapshot = await query
// .offset(offset)
// .limit(config.docsPerBackfill)
// .get();

// const rows = snapshot.docs.map((d) => {
// return {
// timestamp: new Date().toISOString(),
// operation: ChangeType.IMPORT,
// documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`,
// documentId: d.id,
// eventId: "",
// pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path),
// data: eventTracker.serializeData(d.data()),
// };
// });
// try {
// await eventTracker.record(rows);
// } catch (err: any) {
// /** If configured, event tracker wil handle failed rows in a backup collection */
// functions.logger.log(err);
// }
// if (rows.length == config.docsPerBackfill) {
// // There are more documents to import - enqueue another task to continue the backfill.
// const queue = getFunctions().taskQueue(
// `locations/${config.location}/functions/fsimportexistingdocs`,
// config.instanceId
// );
// await queue.enqueue({
// offset: offset + config.docsPerBackfill,
// docsCount: docsCount + rows.length,
// });
// } else {
// // We are finished, set the processing state to report back how many docs were imported.
// runtime.setProcessingState(
// "PROCESSING_COMPLETE",
// `Successfully imported ${
// docsCount + rows.length
// } documents into BigQuery`
// );
// }
// await events.recordCompletionEvent({ context });
// });

0 comments on commit ffcf2fe

Please sign in to comment.