diff --git a/firestore-bigquery-export/README.md b/firestore-bigquery-export/README.md index c92fc9fc6..1609b5d28 100644 --- a/firestore-bigquery-export/README.md +++ b/firestore-bigquery-export/README.md @@ -158,14 +158,8 @@ essential for the script to insert data into an already partitioned table.) * Exclude old data payloads: If enabled, table rows will never contain old data (document snapshot before the update), which should be more performant, and avoid potential resource limitations. -* Import existing Firestore documents into BigQuery?: Do you want to import existing documents from your Firestore collection into BigQuery? These documents will have each have a special changelog with the operation of `IMPORT` and the timestamp of epoch. This ensures that any operation on an imported document supersedes the import record. - -* Existing Documents Collection: Specify the path of the Cloud Firestore Collection you would like to import from. This may or may not be the same Collection for which you plan to mirror changes. If you want to use a collectionGroup query, provide the collection name value here, and set 'Use Collection Group query' to true. You may use `{wildcard}` notation with an enabled collectionGroup query to match a subcollection of all documents in a collection (e.g., `chatrooms/{chatid}/posts`). - * Use Collection Group query: Do you want to use a [collection group](https://firebase.google.com/docs/firestore/query-data/queries#collection-group-query) query for importing existing documents? You have to enable collectionGroup query if your import path contains subcollections. Warning: A collectionGroup query will target every collection in your Firestore project that matches the 'Existing documents collection'. For example, if you have 10,000 documents with a subcollection named: landmarks, this will query every document in 10,000 landmarks collections. -* Docs per backfill: When importing existing documents, how many should be imported at once? The default value of 200 should be ok for most users. If you are using a transform function or have very large documents, you may need to set this to a lower number. If the lifecycle event function times out, lower this value. - * Cloud KMS key name: Instead of Google managing the key encryption keys that protect your data, you control and manage key encryption keys in Cloud KMS. If this parameter is set, the extension will specify the KMS key name when creating the BQ table. See the PREINSTALL.md for more details. @@ -174,8 +168,6 @@ essential for the script to insert data into an already partitioned table.) * **fsexportbigquery:** Listens for document changes in your specified Cloud Firestore collection, then exports the changes into BigQuery. -* **fsimportexistingdocs:** Imports exisitng documents from the specified collection into BigQuery. Imported documents will have a special changelog with the operation of `IMPORT` and the timestamp of epoch. - * **syncBigQuery:** A task-triggered function that gets called on BigQuery sync * **initBigQuerySync:** Runs configuration for sycning with BigQuery diff --git a/firestore-bigquery-export/extension.yaml b/firestore-bigquery-export/extension.yaml index fbe9ac160..dde372c59 100644 --- a/firestore-bigquery-export/extension.yaml +++ b/firestore-bigquery-export/extension.yaml @@ -60,18 +60,18 @@ resources: eventType: providers/cloud.firestore/eventTypes/document.write resource: projects/${param:PROJECT_ID}/databases/${param:DATABASE_ID}/documents/${param:COLLECTION_PATH}/{documentId} - - name: fsimportexistingdocs - type: firebaseextensions.v1beta.function - description: - Imports exisitng documents from the specified collection into BigQuery. - Imported documents will have a special changelog with the operation of - `IMPORT` and the timestamp of epoch. - properties: - runtime: nodejs18 - taskQueueTrigger: - retryConfig: - maxAttempts: 15 - minBackoffSeconds: 60 + # - name: fsimportexistingdocs + # type: firebaseextensions.v1beta.function + # description: + # Imports exisitng documents from the specified collection into BigQuery. + # Imported documents will have a special changelog with the operation of + # `IMPORT` and the timestamp of epoch. + # properties: + # runtime: nodejs18 + # taskQueueTrigger: + # retryConfig: + # maxAttempts: 15 + # minBackoffSeconds: 60 - name: syncBigQuery type: firebaseextensions.v1beta.function @@ -405,39 +405,39 @@ params: - label: No value: no - - param: DO_BACKFILL - label: Import existing Firestore documents into BigQuery? - description: >- - Do you want to import existing documents from your Firestore collection - into BigQuery? These documents will have each have a special changelog - with the operation of `IMPORT` and the timestamp of epoch. This ensures - that any operation on an imported document supersedes the import record. - type: select - required: true - default: no - options: - - label: Yes - value: yes - - label: No - value: no - - - param: IMPORT_COLLECTION_PATH - label: Existing Documents Collection - description: >- - Specify the path of the Cloud Firestore Collection you would like to - import from. This may or may not be the same Collection for which you plan - to mirror changes. If you want to use a collectionGroup query, provide the - collection name value here, and set 'Use Collection Group query' to true. - You may use `{wildcard}` notation with an enabled collectionGroup query to - match a subcollection of all documents in a collection (e.g., - `chatrooms/{chatid}/posts`). - type: string - validationRegex: "^[^/]+(/[^/]+/[^/]+)*$" - validationErrorMessage: - Firestore collection paths must be an odd number of segments separated by - slashes, e.g. "path/to/collection". - example: posts - required: false + # - param: DO_BACKFILL + # label: Import existing Firestore documents into BigQuery? + # description: >- + # Do you want to import existing documents from your Firestore collection + # into BigQuery? These documents will have each have a special changelog + # with the operation of `IMPORT` and the timestamp of epoch. This ensures + # that any operation on an imported document supersedes the import record. + # type: select + # required: true + # default: no + # options: + # - label: Yes + # value: yes + # - label: No + # value: no + + # - param: IMPORT_COLLECTION_PATH + # label: Existing Documents Collection + # description: >- + # Specify the path of the Cloud Firestore Collection you would like to + # import from. This may or may not be the same Collection for which you plan + # to mirror changes. If you want to use a collectionGroup query, provide the + # collection name value here, and set 'Use Collection Group query' to true. + # You may use `{wildcard}` notation with an enabled collectionGroup query to + # match a subcollection of all documents in a collection (e.g., + # `chatrooms/{chatid}/posts`). + # type: string + # validationRegex: "^[^/]+(/[^/]+/[^/]+)*$" + # validationErrorMessage: + # Firestore collection paths must be an odd number of segments separated by + # slashes, e.g. "path/to/collection". + # example: posts + # required: false - param: USE_COLLECTION_GROUP_QUERY label: Use Collection Group query @@ -458,20 +458,20 @@ params: - label: No value: no - - param: DOCS_PER_BACKFILL - label: Docs per backfill - description: >- - When importing existing documents, how many should be imported at once? - The default value of 200 should be ok for most users. If you are using a - transform function or have very large documents, you may need to set this - to a lower number. If the lifecycle event function times out, lower this - value. - type: string - example: 200 - validationRegex: "^[1-9][0-9]*$" - validationErrorMessage: Must be a postive integer. - default: 200 - required: true + # - param: DOCS_PER_BACKFILL + # label: Docs per backfill + # description: >- + # When importing existing documents, how many should be imported at once? + # The default value of 200 should be ok for most users. If you are using a + # transform function or have very large documents, you may need to set this + # to a lower number. If the lifecycle event function times out, lower this + # value. + # type: string + # example: 200 + # validationRegex: "^[1-9][0-9]*$" + # validationErrorMessage: Must be a postive integer. + # default: 200 + # required: true - param: KMS_KEY_NAME label: Cloud KMS key name @@ -514,7 +514,7 @@ lifecycleEvents: onInstall: function: initBigQuerySync processingMessage: - Configuring BigQuery Sync and running import if configured. + Configuring BigQuery Sync. onUpdate: function: setupBigQuerySync processingMessage: Configuring BigQuery Sync diff --git a/firestore-bigquery-export/functions/src/index.ts b/firestore-bigquery-export/functions/src/index.ts index 04e5b2555..a4a49010b 100644 --- a/firestore-bigquery-export/functions/src/index.ts +++ b/firestore-bigquery-export/functions/src/index.ts @@ -190,7 +190,7 @@ export const initBigQuerySync = functions.tasks await eventTracker.initialize(); /** Run Backfill */ - if (config.doBackfill) { + if (false) { await getFunctions() .taskQueue( `locations/${config.location}/functions/fsimportexistingdocs`, @@ -207,69 +207,69 @@ export const initBigQuerySync = functions.tasks return; }); -exports.fsimportexistingdocs = functions.tasks - .taskQueue() - .onDispatch(async (data, context) => { - const runtime = getExtensions().runtime(); - if (!config.doBackfill || !config.importCollectionPath) { - await runtime.setProcessingState( - "PROCESSING_COMPLETE", - "Completed. No existing documents imported into BigQuery." - ); - return; - } - - const offset = (data["offset"] as number) ?? 0; - const docsCount = (data["docsCount"] as number) ?? 0; - - const query = config.useCollectionGroupQuery - ? getFirestore(config.databaseId).collectionGroup( - config.importCollectionPath.split("/")[ - config.importCollectionPath.split("/").length - 1 - ] - ) - : getFirestore(config.databaseId).collection(config.importCollectionPath); - - const snapshot = await query - .offset(offset) - .limit(config.docsPerBackfill) - .get(); - - const rows = snapshot.docs.map((d) => { - return { - timestamp: new Date().toISOString(), - operation: ChangeType.IMPORT, - documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`, - documentId: d.id, - eventId: "", - pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path), - data: eventTracker.serializeData(d.data()), - }; - }); - try { - await eventTracker.record(rows); - } catch (err: any) { - /** If configured, event tracker wil handle failed rows in a backup collection */ - functions.logger.log(err); - } - if (rows.length == config.docsPerBackfill) { - // There are more documents to import - enqueue another task to continue the backfill. - const queue = getFunctions().taskQueue( - `locations/${config.location}/functions/fsimportexistingdocs`, - config.instanceId - ); - await queue.enqueue({ - offset: offset + config.docsPerBackfill, - docsCount: docsCount + rows.length, - }); - } else { - // We are finished, set the processing state to report back how many docs were imported. - runtime.setProcessingState( - "PROCESSING_COMPLETE", - `Successfully imported ${ - docsCount + rows.length - } documents into BigQuery` - ); - } - await events.recordCompletionEvent({ context }); - }); +// exports.fsimportexistingdocs = functions.tasks +// .taskQueue() +// .onDispatch(async (data, context) => { +// const runtime = getExtensions().runtime(); +// if (!config.doBackfill || !config.importCollectionPath) { +// await runtime.setProcessingState( +// "PROCESSING_COMPLETE", +// "Completed. No existing documents imported into BigQuery." +// ); +// return; +// } + +// const offset = (data["offset"] as number) ?? 0; +// const docsCount = (data["docsCount"] as number) ?? 0; + +// const query = config.useCollectionGroupQuery +// ? getFirestore(config.databaseId).collectionGroup( +// config.importCollectionPath.split("/")[ +// config.importCollectionPath.split("/").length - 1 +// ] +// ) +// : getFirestore(config.databaseId).collection(config.importCollectionPath); + +// const snapshot = await query +// .offset(offset) +// .limit(config.docsPerBackfill) +// .get(); + +// const rows = snapshot.docs.map((d) => { +// return { +// timestamp: new Date().toISOString(), +// operation: ChangeType.IMPORT, +// documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`, +// documentId: d.id, +// eventId: "", +// pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path), +// data: eventTracker.serializeData(d.data()), +// }; +// }); +// try { +// await eventTracker.record(rows); +// } catch (err: any) { +// /** If configured, event tracker wil handle failed rows in a backup collection */ +// functions.logger.log(err); +// } +// if (rows.length == config.docsPerBackfill) { +// // There are more documents to import - enqueue another task to continue the backfill. +// const queue = getFunctions().taskQueue( +// `locations/${config.location}/functions/fsimportexistingdocs`, +// config.instanceId +// ); +// await queue.enqueue({ +// offset: offset + config.docsPerBackfill, +// docsCount: docsCount + rows.length, +// }); +// } else { +// // We are finished, set the processing state to report back how many docs were imported. +// runtime.setProcessingState( +// "PROCESSING_COMPLETE", +// `Successfully imported ${ +// docsCount + rows.length +// } documents into BigQuery` +// ); +// } +// await events.recordCompletionEvent({ context }); +// });