Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions v1/README_Cassandra_To_Cloud_Bigtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **defaultColumnFamily**: The name of the column family of the Bigtable table. The default value is `default`.
* **rowKeySeparator**: The separator used to build row-keys. The default value is `#`.
* **splitLargeRows**: The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic. .
* **writetimeCassandraColumnSchema**: GCS path to schema to copy Cassandra writetimes to Bigtable. The command to generate this schema is ```cqlsh -e "select json * from system_schema.columns where keyspace_name='$CASSANDRA_KEYSPACE' and table_name='$CASSANDRA_TABLE'`" > column_schema.json```. Set $WRITETIME_CASSANDRA_COLUMN_SCHEMA to a GCS path, e.g. `gs://$BUCKET_NAME/column_schema.json`. Then upload the schema to GCS: `gcloud storage cp column_schema.json $WRITETIME_CASSANDRA_COLUMN_SCHEMA`. Requires Cassandra version 2.2 onwards for JSON support.
* **setZeroTimestamp**: The flag for setting Bigtable cell timestamp to 0 if Cassandra writetime is not present. The default behavior for when this flag is not set is to set the Bigtable cell timestamp as the template replication time, i.e. now.



Expand Down
8 changes: 4 additions & 4 deletions v1/README_Cloud_Bigtable_to_GCS_Json.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **bigtableProjectId**: The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
* **bigtableInstanceId**: The ID of the Bigtable instance that contains the table.
* **bigtableTableId**: The ID of the Bigtable table to read from.
* **filenamePrefix**: The prefix of the JSON file name. For example, `table1-`. If no value is provided, defaults to `part`.
* **outputDirectory**: The Cloud Storage path where the output JSON files are stored. For example, `gs://your-bucket/your-path/`.

### Optional parameters

* **outputDirectory**: The Cloud Storage path where the output JSON files are stored. For example, `gs://your-bucket/your-path/`.
* **filenamePrefix**: The prefix of the JSON file name. For example, `table1-`. If no value is provided, defaults to `part`.
* **userOption**: Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`.
* **columnsAliases**: A comma-separated list of columns that are required for the Vertex AI Vector Search index. The columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and, `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`.
* **bigtableAppProfileId**: The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.
Expand Down Expand Up @@ -216,8 +216,8 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_json" {
bigtableProjectId = "<bigtableProjectId>"
bigtableInstanceId = "<bigtableInstanceId>"
bigtableTableId = "<bigtableTableId>"
filenamePrefix = "part"
# outputDirectory = "<outputDirectory>"
outputDirectory = "<outputDirectory>"
# filenamePrefix = "part"
# userOption = "NONE"
# columnsAliases = "<columnsAliases>"
# bigtableAppProfileId = "default"
Expand Down
8 changes: 4 additions & 4 deletions v1/README_Cloud_Bigtable_to_Vector_Embeddings.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **bigtableProjectId**: The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
* **bigtableInstanceId**: The ID of the Bigtable instance that contains the table.
* **bigtableTableId**: The ID of the Bigtable table to read from.
* **filenamePrefix**: The prefix of the JSON filename. For example: `table1-`. If no value is provided, defaults to `part`.
* **outputDirectory**: The Cloud Storage path where the output JSON files are stored. For example, `gs://your-bucket/your-path/`.
* **idColumn**: The fully qualified column name where the ID is stored. In the format `cf:col` or `_key`.
* **embeddingColumn**: The fully qualified column name where the embeddings are stored. In the format `cf:col` or `_key`.

### Optional parameters

* **outputDirectory**: The Cloud Storage path where the output JSON files are stored. For example, `gs://your-bucket/your-path/`.
* **filenamePrefix**: The prefix of the JSON filename. For example: `table1-`. If no value is provided, defaults to `part`.
* **crowdingTagColumn**: The fully qualified column name where the crowding tag is stored. In the format `cf:col` or `_key`.
* **embeddingByteSize**: The byte size of each entry in the embeddings array. For float, use the value `4`. For double, use the value `8`. Defaults to `4`.
* **allowRestrictsMappings**: The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format `cf:col->alias`.
Expand Down Expand Up @@ -245,10 +245,10 @@ resource "google_dataflow_job" "cloud_bigtable_to_vector_embeddings" {
bigtableProjectId = "<bigtableProjectId>"
bigtableInstanceId = "<bigtableInstanceId>"
bigtableTableId = "<bigtableTableId>"
outputDirectory = "gs://your-bucket/your-path/"
outputDirectory = "<outputDirectory>"
idColumn = "<idColumn>"
embeddingColumn = "<embeddingColumn>"
# outputDirectory = "<outputDirectory>"
# filenamePrefix = "part"
# crowdingTagColumn = "<crowdingTagColumn>"
# embeddingByteSize = "4"
# allowRestrictsMappings = "<allowRestrictsMappings>"
Expand Down
4 changes: 3 additions & 1 deletion v1/README_Cloud_PubSub_to_Datadog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ Pub/Sub to Datadog template
The Pub/Sub to Datadog template is a streaming pipeline that reads messages from
a Pub/Sub subscription and writes the message payload to Datadog by using a
Datadog endpoint. The most common use case for this template is to export log
files to Datadog.
files to Datadog. For more information check out <a
href="https://docs.datadoghq.com/integrations/google_cloud_platform/?tab=project#log-collection">Datadog's
log collection process</a>.

Before writing to Datadog, you can apply a JavaScript user-defined function to
the message payload. Any messages that experience processing failures are
Expand Down
9 changes: 8 additions & 1 deletion v2/bigquery-to-bigtable/README_BigQuery_to_Bigtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **query**: The SQL query to use to read data from BigQuery. If the BigQuery dataset is in a different project than the Dataflow job, specify the full dataset name in the SQL query, for example: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. By default, the `query` parameter uses GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), unless `useLegacySql` is `true`. You must specify either `inputTableSpec` or `query`. If you set both parameters, the template uses the `query` parameter. For example, `select * from sampledb.sample_table`.
* **useLegacySql**: Set to `true` to use legacy SQL. This parameter only applies when using the `query` parameter. Defaults to `false`.
* **queryLocation**: Needed when reading from an authorized view without underlying table's permission. For example, `US`.
* **queryTempDataset**: With this option, you can set an existing dataset to create the temporary table to store the results of the query. For example, `temp_dataset`.
* **KMSEncryptionKey**: If reading from BigQuery using query source, use this Cloud KMS key to encrypt any temporary tables created. For example, `projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key`.
* **bigtableRpcAttemptTimeoutMs**: The timeout for each Bigtable RPC attempt in milliseconds.
* **bigtableRpcTimeoutMs**: The total timeout for a Bigtable RPC operation in milliseconds.
* **bigtableAdditionalRetryCodes**: The additional retry codes. For example, `RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED`.
Expand Down Expand Up @@ -126,6 +128,7 @@ export QUERY=<query>
export USE_LEGACY_SQL=false
export QUERY_LOCATION=<queryLocation>
export QUERY_TEMP_DATASET=<queryTempDataset>
export KMSENCRYPTION_KEY=<KMSEncryptionKey>
export BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS=<bigtableRpcAttemptTimeoutMs>
export BIGTABLE_RPC_TIMEOUT_MS=<bigtableRpcTimeoutMs>
export BIGTABLE_ADDITIONAL_RETRY_CODES=<bigtableAdditionalRetryCodes>
Expand All @@ -146,6 +149,7 @@ gcloud dataflow flex-template run "bigquery-to-bigtable-job" \
--parameters "useLegacySql=$USE_LEGACY_SQL" \
--parameters "queryLocation=$QUERY_LOCATION" \
--parameters "queryTempDataset=$QUERY_TEMP_DATASET" \
--parameters "KMSEncryptionKey=$KMSENCRYPTION_KEY" \
--parameters "bigtableRpcAttemptTimeoutMs=$BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS" \
--parameters "bigtableRpcTimeoutMs=$BIGTABLE_RPC_TIMEOUT_MS" \
--parameters "bigtableAdditionalRetryCodes=$BIGTABLE_ADDITIONAL_RETRY_CODES" \
Expand Down Expand Up @@ -187,6 +191,7 @@ export QUERY=<query>
export USE_LEGACY_SQL=false
export QUERY_LOCATION=<queryLocation>
export QUERY_TEMP_DATASET=<queryTempDataset>
export KMSENCRYPTION_KEY=<KMSEncryptionKey>
export BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS=<bigtableRpcAttemptTimeoutMs>
export BIGTABLE_RPC_TIMEOUT_MS=<bigtableRpcTimeoutMs>
export BIGTABLE_ADDITIONAL_RETRY_CODES=<bigtableAdditionalRetryCodes>
Expand All @@ -203,7 +208,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="bigquery-to-bigtable-job" \
-DtemplateName="BigQuery_to_Bigtable" \
-Dparameters="readIdColumn=$READ_ID_COLUMN,inputTableSpec=$INPUT_TABLE_SPEC,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,query=$QUERY,useLegacySql=$USE_LEGACY_SQL,queryLocation=$QUERY_LOCATION,queryTempDataset=$QUERY_TEMP_DATASET,bigtableRpcAttemptTimeoutMs=$BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS,bigtableRpcTimeoutMs=$BIGTABLE_RPC_TIMEOUT_MS,bigtableAdditionalRetryCodes=$BIGTABLE_ADDITIONAL_RETRY_CODES,bigtableWriteInstanceId=$BIGTABLE_WRITE_INSTANCE_ID,bigtableWriteTableId=$BIGTABLE_WRITE_TABLE_ID,bigtableWriteColumnFamily=$BIGTABLE_WRITE_COLUMN_FAMILY,bigtableWriteAppProfile=$BIGTABLE_WRITE_APP_PROFILE,bigtableWriteProjectId=$BIGTABLE_WRITE_PROJECT_ID,bigtableBulkWriteLatencyTargetMs=$BIGTABLE_BULK_WRITE_LATENCY_TARGET_MS,bigtableBulkWriteMaxRowKeyCount=$BIGTABLE_BULK_WRITE_MAX_ROW_KEY_COUNT,bigtableBulkWriteMaxRequestSizeBytes=$BIGTABLE_BULK_WRITE_MAX_REQUEST_SIZE_BYTES" \
-Dparameters="readIdColumn=$READ_ID_COLUMN,inputTableSpec=$INPUT_TABLE_SPEC,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,query=$QUERY,useLegacySql=$USE_LEGACY_SQL,queryLocation=$QUERY_LOCATION,queryTempDataset=$QUERY_TEMP_DATASET,KMSEncryptionKey=$KMSENCRYPTION_KEY,bigtableRpcAttemptTimeoutMs=$BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS,bigtableRpcTimeoutMs=$BIGTABLE_RPC_TIMEOUT_MS,bigtableAdditionalRetryCodes=$BIGTABLE_ADDITIONAL_RETRY_CODES,bigtableWriteInstanceId=$BIGTABLE_WRITE_INSTANCE_ID,bigtableWriteTableId=$BIGTABLE_WRITE_TABLE_ID,bigtableWriteColumnFamily=$BIGTABLE_WRITE_COLUMN_FAMILY,bigtableWriteAppProfile=$BIGTABLE_WRITE_APP_PROFILE,bigtableWriteProjectId=$BIGTABLE_WRITE_PROJECT_ID,bigtableBulkWriteLatencyTargetMs=$BIGTABLE_BULK_WRITE_LATENCY_TARGET_MS,bigtableBulkWriteMaxRowKeyCount=$BIGTABLE_BULK_WRITE_MAX_ROW_KEY_COUNT,bigtableBulkWriteMaxRequestSizeBytes=$BIGTABLE_BULK_WRITE_MAX_REQUEST_SIZE_BYTES" \
-f v2/bigquery-to-bigtable
```

Expand Down Expand Up @@ -257,6 +262,8 @@ resource "google_dataflow_flex_template_job" "bigquery_to_bigtable" {
# query = "<query>"
# useLegacySql = "false"
# queryLocation = "<queryLocation>"
# queryTempDataset = "<queryTempDataset>"
# KMSEncryptionKey = "<KMSEncryptionKey>"
# bigtableRpcAttemptTimeoutMs = "<bigtableRpcAttemptTimeoutMs>"
# bigtableRpcTimeoutMs = "<bigtableRpcTimeoutMs>"
# bigtableAdditionalRetryCodes = "<bigtableAdditionalRetryCodes>"
Expand Down
11 changes: 7 additions & 4 deletions v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

* **instanceId**: The Spanner instance where the changes are replicated.
* **databaseId**: The Spanner database where the changes are replicated.
* **streamName**: The name or template for the stream to poll for schema information and source type.

### Optional parameters

Expand All @@ -54,6 +53,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **projectId**: The Spanner project ID.
* **spannerHost**: The Cloud Spanner endpoint to call in the template. For example, `https://batch-spanner.googleapis.com`. Defaults to: https://batch-spanner.googleapis.com.
* **gcsPubSubSubscription**: The Pub/Sub subscription being used in a Cloud Storage notification policy. For the name, use the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`.
* **streamName**: The name or template for the stream to poll for schema information and source type.
* **shadowTablePrefix**: The prefix used to name shadow tables. Default: `shadow_`.
* **shouldCreateShadowTables**: This flag indicates whether shadow tables must be created in Cloud Spanner database. Defaults to: true.
* **rfcStartDateTime**: The starting DateTime used to fetch from Cloud Storage (https://tools.ietf.org/html/rfc3339). Defaults to: 1970-01-01T00:00:00.00Z.
Expand All @@ -73,6 +73,10 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **transformationClassName**: Fully qualified class name having the custom transformation logic. It is a mandatory field in case transformationJarPath is specified. Defaults to empty.
* **transformationCustomParameters**: String containing any custom parameters to be passed to the custom transformation class. Defaults to empty.
* **filteredEventsDirectory**: This is the file path to store the events filtered via custom transformation. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
* **shardingContextFilePath**: Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard.It is of the format Map<stream_name, Map<db_name, shard_id>>.
* **tableOverrides**: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, `[{Singers, Vocalists}, {Albums, Records}]`. Defaults to empty.
* **columnOverrides**: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, `[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]`. Defaults to empty.
* **schemaOverridesFilePath**: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty.



Expand Down Expand Up @@ -329,7 +333,6 @@ resource "google_dataflow_flex_template_job" "cloud_datastream_to_spanner" {
parameters = {
instanceId = "<instanceId>"
databaseId = "<databaseId>"
streamName = "<streamName>"
# inputFilePattern = "<inputFilePattern>"
# inputFileFormat = "avro"
# sessionFilePath = "<sessionFilePath>"
Expand Down Expand Up @@ -357,8 +360,8 @@ resource "google_dataflow_flex_template_job" "cloud_datastream_to_spanner" {
# transformationCustomParameters = ""
# filteredEventsDirectory = ""
# shardingContextFilePath = "<shardingContextFilePath>"
# tableOverrides = "[{Singers, Vocalists}, {Albums, Records}]"
# columnOverrides = "[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]"
# tableOverrides = ""
# columnOverrides = ""
# schemaOverridesFilePath = ""
}
}
Expand Down
Loading
Loading