Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/core-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

<reflections.version>0.9.11</reflections.version>
<gson.version>2.9.1</gson.version>
<commons-lang3.version>3.11</commons-lang3.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<freemarker.version>2.3.32</freemarker.version>
<auto-value.version>1.10.4</auto-value.version>
</properties>
Expand Down
15 changes: 8 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- <os.detected.classifier>osx-x86_64</os.detected.classifier>-->
<java.version>11</java.version>
<!-- Plugins -->
<templates-maven-plugin.version>1.0-SNAPSHOT</templates-maven-plugin.version>
Expand All @@ -47,9 +48,9 @@
<jacoco.version>0.8.8</jacoco.version>

<!-- Beam and linked versions -->
<beam.version>2.57.0</beam.version>
<beam-python.version>2.57.0</beam-python.version>
<beam-maven-repo></beam-maven-repo>
<beam.version>2.58.0</beam.version>
<beam-python.version>2.58.0rc2</beam-python.version>
<beam-maven-repo>https://repository.apache.org/content/repositories/orgapachebeam-1383/</beam-maven-repo>

<!-- Common dependency versions -->
<autovalue.version>1.10.4</autovalue.version>
Expand All @@ -59,7 +60,7 @@
<checkstyle.version>10.7.0</checkstyle.version>
<commons-codec.version>1.16.0</commons-codec.version>
<commons-io.version>2.11.0</commons-io.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<conscrypt.version>2.5.2</conscrypt.version>
<derby.version>10.14.2.0</derby.version>
<grpc.gen.version>1.13.1</grpc.gen.version>
Expand All @@ -82,7 +83,7 @@
<truth.version>1.1.5</truth.version>

<!-- TODO(https://github.com/apache/beam/issues/30700): Remove once Beam gets the latest version of gax. -->
<gax.version>2.45.0</gax.version>
<gax.version>2.48.0</gax.version>

<!-- Drop pinned version once maven-dependency-plugin gets past plexus-archiver 4.8.0 -->
<plexus-archiver.version>4.8.0</plexus-archiver.version>
Expand Down Expand Up @@ -591,7 +592,7 @@
<profile>
<id>validateCandidate</id>
<activation>
<activeByDefault>false</activeByDefault>
<jdk>[1.)</jdk>
</activation>
<repositories>
<repository>
Expand All @@ -603,7 +604,7 @@
<profile>
<id>splunkDeps</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>[1.)</jdk>
</activation>
<repositories>
<repository>
Expand Down
2 changes: 1 addition & 1 deletion v1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@
<profile>
<id>missing-artifact-repos</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>[1.)</jdk>
</activation>
<repositories>
<repository>
Expand Down
2 changes: 1 addition & 1 deletion v2/cdc-parent/cdc-embedded-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<profile>
<id>mysql</id>
<activation>
<activeByDefault>true</activeByDefault>
<jdk>[1.)</jdk>
</activation>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.python.PythonExternalTransform;
import org.apache.beam.sdk.extensions.python.PythonExternalTransformOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
Expand Down Expand Up @@ -339,4 +341,8 @@ public PubsubMessage rowToPubSubMessage(Row row) {
return new PubsubMessage(payload.getBytes(), attributeMap, messageId);
}
}

public static void overwritepyVersion(PipelineOptions options) {
options.as(PythonExternalTransformOptions.class).setCustomBeamRequirement("2.58.0rc2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void main(String[] args) {
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(BigQueryToElasticsearchOptions.class);

PythonExternalTextTransformer.overwritepyVersion(options);
run(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.CsvConverters;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -145,7 +146,7 @@ public static void main(String[] args) {

GCSToElasticsearchOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(GCSToElasticsearchOptions.class);

PythonExternalTextTransformer.overwritepyVersion(options);
run(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.teleport.v2.elasticsearch.transforms.PubSubMessageToJsonDocument;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIndex;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -159,7 +160,7 @@ public static void main(String[] args) {
pubSubToElasticsearchOptions.getDataset(),
pubSubToElasticsearchOptions.getNamespace())
.getIndex());

PythonExternalTextTransformer.overwritepyVersion(pubSubToElasticsearchOptions);
validateOptions(pubSubToElasticsearchOptions);
run(pubSubToElasticsearchOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **embeddingColumn** : The fully qualified column name where the embeddings are stored. In the format cf:col.
* **embeddingByteSize** : The byte size of each entry in the embeddings array. Use 4 for Float, and 8 for Double. Defaults to: 4.
* **vectorSearchIndex** : The Vector Search Index where changes will be streamed, in the format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces) (Example: projects/123/locations/us-east1/indexes/456).
* **bigtableChangeStreamAppProfile** : The application profile is used to distinguish workload in Cloud Bigtable.
* **bigtableReadInstanceId** : The ID of the Cloud Bigtable instance that contains the table.
* **bigtableReadTableId** : The Cloud Bigtable table to read from.
* **bigtableChangeStreamAppProfile** : The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
* **bigtableReadInstanceId** : The source Bigtable instance ID.
* **bigtableReadTableId** : The source Bigtable table ID.

### Optional parameters

Expand All @@ -33,20 +33,20 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **intNumericRestrictsMappings** : The comma separated fully qualified column names of the columns that should be used as integer `numeric_restricts`, with their alias. In the format cf:col->alias.
* **floatNumericRestrictsMappings** : The comma separated fully qualified column names of the columns that should be used as float (4 bytes) `numeric_restricts`, with their alias. In the format cf:col->alias.
* **doubleNumericRestrictsMappings** : The comma separated fully qualified column names of the columns that should be used as double (8 bytes) `numeric_restricts`, with their alias. In the format cf:col->alias.
* **upsertMaxBatchSize** : The maximum number of upserts to buffer before upserting the batch to the Vector Search Index. Batches will be sent when there are either upsertBatchSize records ready, or any record has been waiting upsertBatchDelay time has passed. (Example: 10).
* **upsertMaxBatchSize** : The maximum number of upserts to buffer before upserting the batch to the Vector Search Index. Batches will be sent when there are either upsertBatchSize records ready, or any record has been waiting upsertBatchDelay time has passed. (Example: 10). Defaults to: 10.
* **upsertMaxBufferDuration** : The maximum delay before a batch of upserts is sent to Vector Search.Batches will be sent when there are either upsertBatchSize records ready, or any record has been waiting upsertBatchDelay time has passed. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h). (Example: 10s). Defaults to: 10s.
* **deleteMaxBatchSize** : The maximum number of deletes to buffer before deleting the batch from the Vector Search Index. Batches will be sent when there are either deleteBatchSize records ready, or any record has been waiting deleteBatchDelay time has passed. (Example: 10).
* **deleteMaxBatchSize** : The maximum number of deletes to buffer before deleting the batch from the Vector Search Index. Batches will be sent when there are either deleteBatchSize records ready, or any record has been waiting deleteBatchDelay time has passed. (Example: 10). Defaults to: 10.
* **deleteMaxBufferDuration** : The maximum delay before a batch of deletes is sent to Vector Search.Batches will be sent when there are either deleteBatchSize records ready, or any record has been waiting deleteBatchDelay time has passed. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h). (Example: 10s). Defaults to: 10s.
* **dlqDirectory** : The path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
* **bigtableChangeStreamMetadataInstanceId** : The Cloud Bigtable instance to use for the change streams connector metadata table. Defaults to empty.
* **bigtableChangeStreamMetadataTableTableId** : The Cloud Bigtable change streams connector metadata table ID to use. If not provided, a Cloud Bigtable change streams connector metadata table will automatically be created during the pipeline flow. Defaults to empty.
* **bigtableChangeStreamCharset** : Bigtable change streams charset name when reading values and column qualifiers. Default is UTF-8.
* **bigtableChangeStreamStartTimestamp** : The starting DateTime, inclusive, to use for reading change streams (https://tools.ietf.org/html/rfc3339). For example, 2022-05-05T07:59:59Z. Defaults to the timestamp when the pipeline starts.
* **bigtableChangeStreamIgnoreColumnFamilies** : A comma-separated list of column family names changes to which won't be captured. Defaults to empty.
* **bigtableChangeStreamIgnoreColumns** : A comma-separated list of column names changes to which won't be captured. Defaults to empty.
* **bigtableChangeStreamName** : Allows to resume processing from the point where a previously running pipeline stopped.
* **bigtableChangeStreamResume** : When set to true< a new pipeline will resume processing from the point at which a previously running pipeline with the same bigtableChangeStreamName stopped. If pipeline with the given bigtableChangeStreamName never ran in the past, a new pipeline will fail to start. When set to false a new pipeline will be started. If pipeline with the same bigtableChangeStreamName already ran in the past for the given source, a new pipeline will fail to start. Defaults to false.
* **bigtableReadProjectId** : Project to read Cloud Bigtable data from. The default for this parameter is the project where the Dataflow pipeline is running.
* **bigtableChangeStreamMetadataInstanceId** : The Bigtable change streams metadata instance ID. Defaults to empty.
* **bigtableChangeStreamMetadataTableTableId** : The ID of the Bigtable change streams connector metadata table. If not provided, a Bigtable change streams connector metadata table is automatically created during pipeline execution. Defaults to empty.
* **bigtableChangeStreamCharset** : The Bigtable change streams charset name. Defaults to: UTF-8.
* **bigtableChangeStreamStartTimestamp** : The starting timestamp (https://tools.ietf.org/html/rfc3339), inclusive, to use for reading change streams. For example, `2022-05-05T07:59:59Z`. Defaults to the timestamp of the pipeline start time.
* **bigtableChangeStreamIgnoreColumnFamilies** : A comma-separated list of column family name changes to ignore. Defaults to empty.
* **bigtableChangeStreamIgnoreColumns** : A comma-separated list of column name changes to ignore. Defaults to empty.
* **bigtableChangeStreamName** : A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used.
* **bigtableChangeStreamResume** : When set to `true`, a new pipeline resumes processing from the point at which a previously running pipeline with the same `bigtableChangeStreamName` value stopped. If the pipeline with the given `bigtableChangeStreamName` value has never run, a new pipeline doesn't start. When set to `false`, a new pipeline starts. If a pipeline with the same `bigtableChangeStreamName` value has already run for the given source, a new pipeline doesn't start. Defaults to `false`.
* **bigtableReadProjectId** : The Bigtable project ID. The default is the project for the Dataflow job.



Expand Down Expand Up @@ -140,9 +140,9 @@ export DENY_RESTRICTS_MAPPINGS=<denyRestrictsMappings>
export INT_NUMERIC_RESTRICTS_MAPPINGS=<intNumericRestrictsMappings>
export FLOAT_NUMERIC_RESTRICTS_MAPPINGS=<floatNumericRestrictsMappings>
export DOUBLE_NUMERIC_RESTRICTS_MAPPINGS=<doubleNumericRestrictsMappings>
export UPSERT_MAX_BATCH_SIZE=<upsertMaxBatchSize>
export UPSERT_MAX_BATCH_SIZE=10
export UPSERT_MAX_BUFFER_DURATION=10s
export DELETE_MAX_BATCH_SIZE=<deleteMaxBatchSize>
export DELETE_MAX_BATCH_SIZE=10
export DELETE_MAX_BUFFER_DURATION=10s
export DLQ_DIRECTORY=""
export BIGTABLE_CHANGE_STREAM_METADATA_INSTANCE_ID=""
Expand Down Expand Up @@ -219,9 +219,9 @@ export DENY_RESTRICTS_MAPPINGS=<denyRestrictsMappings>
export INT_NUMERIC_RESTRICTS_MAPPINGS=<intNumericRestrictsMappings>
export FLOAT_NUMERIC_RESTRICTS_MAPPINGS=<floatNumericRestrictsMappings>
export DOUBLE_NUMERIC_RESTRICTS_MAPPINGS=<doubleNumericRestrictsMappings>
export UPSERT_MAX_BATCH_SIZE=<upsertMaxBatchSize>
export UPSERT_MAX_BATCH_SIZE=10
export UPSERT_MAX_BUFFER_DURATION=10s
export DELETE_MAX_BATCH_SIZE=<deleteMaxBatchSize>
export DELETE_MAX_BATCH_SIZE=10
export DELETE_MAX_BUFFER_DURATION=10s
export DLQ_DIRECTORY=""
export BIGTABLE_CHANGE_STREAM_METADATA_INSTANCE_ID=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

* **spannerInstanceId** : The instance ID of the Spanner database to read from.
* **spannerDatabaseId** : The database ID of the Spanner database to export.
* **spannerTableId** : The table name of the Spanner database to export.
* **sqlQuery** : The SQL query to use to read data from the Spanner database.
* **outputTableSpec** : The BigQuery output table location to write the output to. For example, `<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>`.Depending on the `createDisposition` specified, the output table might be created automatically using the user provided Avro schema.

### Optional parameters

* **spannerProjectId** : The ID of the project that the Spanner database resides in. The default value for this parameter is the project where the Dataflow pipeline is running.
* **spannerTableId** : The table name of the Spanner database to export. Ignored if sqlQuery is set.
* **spannerRpcPriority** : The request priority (https://cloud.google.com/spanner/docs/reference/rest/v1/RequestOptions) for Spanner calls. Possible values are `HIGH`, `MEDIUM`, and `LOW`. The default value is `HIGH`.
* **sqlQuery** : The SQL query to use to read data from the Spanner database. Required if spannerTableId is empty.
* **bigQuerySchemaPath** : The Cloud Storage path (gs://) to the JSON file that defines your BigQuery schema. (Example: gs://your-bucket/your-schema.json).
* **writeDisposition** : The BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload) value. For example, `WRITE_APPEND`, `WRITE_EMPTY`, or `WRITE_TRUNCATE`. Defaults to `WRITE_APPEND`.
* **createDisposition** : The BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). For example, `CREATE_IF_NEEDED` and `CREATE_NEVER`. Defaults to `CREATE_IF_NEEDED`.
Expand Down Expand Up @@ -112,13 +112,13 @@ export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/Cloud_Spanner_to_
### Required
export SPANNER_INSTANCE_ID=<spannerInstanceId>
export SPANNER_DATABASE_ID=<spannerDatabaseId>
export SPANNER_TABLE_ID=<spannerTableId>
export SQL_QUERY=<sqlQuery>
export OUTPUT_TABLE_SPEC=<outputTableSpec>

### Optional
export SPANNER_PROJECT_ID=""
export SPANNER_TABLE_ID=<spannerTableId>
export SPANNER_RPC_PRIORITY=<spannerRpcPriority>
export SQL_QUERY=<sqlQuery>
export BIG_QUERY_SCHEMA_PATH=<bigQuerySchemaPath>
export WRITE_DISPOSITION=WRITE_APPEND
export CREATE_DISPOSITION=CREATE_IF_NEEDED
Expand Down Expand Up @@ -161,13 +161,13 @@ export REGION=us-central1
### Required
export SPANNER_INSTANCE_ID=<spannerInstanceId>
export SPANNER_DATABASE_ID=<spannerDatabaseId>
export SPANNER_TABLE_ID=<spannerTableId>
export SQL_QUERY=<sqlQuery>
export OUTPUT_TABLE_SPEC=<outputTableSpec>

### Optional
export SPANNER_PROJECT_ID=""
export SPANNER_TABLE_ID=<spannerTableId>
export SPANNER_RPC_PRIORITY=<spannerRpcPriority>
export SQL_QUERY=<sqlQuery>
export BIG_QUERY_SCHEMA_PATH=<bigQuerySchemaPath>
export WRITE_DISPOSITION=WRITE_APPEND
export CREATE_DISPOSITION=CREATE_IF_NEEDED
Expand Down Expand Up @@ -228,11 +228,11 @@ resource "google_dataflow_flex_template_job" "cloud_spanner_to_bigquery_flex" {
parameters = {
spannerInstanceId = "<spannerInstanceId>"
spannerDatabaseId = "<spannerDatabaseId>"
spannerTableId = "<spannerTableId>"
sqlQuery = "<sqlQuery>"
outputTableSpec = "<outputTableSpec>"
# spannerProjectId = ""
# spannerTableId = "<spannerTableId>"
# spannerRpcPriority = "<spannerRpcPriority>"
# sqlQuery = "<sqlQuery>"
# bigQuerySchemaPath = "gs://your-bucket/your-schema.json"
# writeDisposition = "WRITE_APPEND"
# createDisposition = "CREATE_IF_NEEDED"
Expand Down
Loading