Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(bigtable): Handle default values #1940

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions v1/README_Cloud_Bigtable_to_GCS_Json.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **bigtableProjectId** : The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
* **bigtableInstanceId** : The ID of the Bigtable instance that contains the table.
* **bigtableTableId** : The ID of the Bigtable table to read from.
* **filenamePrefix** : The prefix of the JSON file name. For example, "table1-". If no value is provided, defaults to `part`.
* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).

### Optional parameters

* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **filenamePrefix** : The prefix of the JSON file name. For example, "table1-". If no value is provided, defaults to `part`.
* **userOption** : Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`.
* **columnsAliases** : A comma-separated list of columns that are required for the Vertex AI Vector Search index. The columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and, `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`.
* **bigtableAppProfileId** : The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.
Expand Down Expand Up @@ -111,10 +111,10 @@ export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/Cloud_Bigtable_to_GCS_
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export USER_OPTION=NONE
export COLUMNS_ALIASES=<columnsAliases>
export BIGTABLE_APP_PROFILE_ID=default
Expand Down Expand Up @@ -152,10 +152,10 @@ export REGION=us-central1
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export USER_OPTION=NONE
export COLUMNS_ALIASES=<columnsAliases>
export BIGTABLE_APP_PROFILE_ID=default
Expand Down Expand Up @@ -216,8 +216,8 @@ resource "google_dataflow_job" "cloud_bigtable_to_gcs_json" {
bigtableProjectId = "<bigtableProjectId>"
bigtableInstanceId = "<bigtableInstanceId>"
bigtableTableId = "<bigtableTableId>"
filenamePrefix = "part"
# outputDirectory = "gs://your-bucket/your-path/"
outputDirectory = "gs://your-bucket/your-path/"
# filenamePrefix = "part"
# userOption = "NONE"
# columnsAliases = "<columnsAliases>"
# bigtableAppProfileId = "default"
Expand Down
16 changes: 8 additions & 8 deletions v1/README_Cloud_Bigtable_to_Vector_Embeddings.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
* **bigtableProjectId** : The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.
* **bigtableInstanceId** : The ID of the Bigtable instance that contains the table.
* **bigtableTableId** : The ID of the Bigtable table to read from.
* **filenamePrefix** : The prefix of the JSON filename. For example: "table1-". If no value is provided, defaults to "part".
* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **idColumn** : The fully qualified column name where the ID is stored. In the format cf:col or _key.
* **embeddingColumn** : The fully qualified column name where the embeddings are stored. In the format cf:col or _key.

### Optional parameters

* **outputDirectory** : The Cloud Storage path where the output JSON files are stored. (Example: gs://your-bucket/your-path/).
* **filenamePrefix** : The prefix of the JSON filename. For example: "table1-". If no value is provided, defaults to "part".
* **crowdingTagColumn** : The fully qualified column name where the crowding tag is stored. In the format cf:col or _key.
* **embeddingByteSize** : The byte size of each entry in the embeddings array. For float, use the value 4. For double, use the value 8. Defaults to 4.
* **allowRestrictsMappings** : The comma-separated, fully qualified column names for the columns to use as the allow restricts, with their aliases. In the format cf:col->alias.
Expand Down Expand Up @@ -119,12 +119,12 @@ export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/Cloud_Bigtable_to_Vect
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>
export ID_COLUMN=<idColumn>
export EMBEDDING_COLUMN=<embeddingColumn>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export CROWDING_TAG_COLUMN=<crowdingTagColumn>
export EMBEDDING_BYTE_SIZE=4
export ALLOW_RESTRICTS_MAPPINGS=<allowRestrictsMappings>
Expand Down Expand Up @@ -174,12 +174,12 @@ export REGION=us-central1
export BIGTABLE_PROJECT_ID=<bigtableProjectId>
export BIGTABLE_INSTANCE_ID=<bigtableInstanceId>
export BIGTABLE_TABLE_ID=<bigtableTableId>
export FILENAME_PREFIX=part
export OUTPUT_DIRECTORY=<outputDirectory>
export ID_COLUMN=<idColumn>
export EMBEDDING_COLUMN=<embeddingColumn>

### Optional
export OUTPUT_DIRECTORY=<outputDirectory>
export FILENAME_PREFIX=part
export CROWDING_TAG_COLUMN=<crowdingTagColumn>
export EMBEDDING_BYTE_SIZE=4
export ALLOW_RESTRICTS_MAPPINGS=<allowRestrictsMappings>
Expand Down Expand Up @@ -245,10 +245,10 @@ resource "google_dataflow_job" "cloud_bigtable_to_vector_embeddings" {
bigtableProjectId = "<bigtableProjectId>"
bigtableInstanceId = "<bigtableInstanceId>"
bigtableTableId = "<bigtableTableId>"
filenamePrefix = "part"
outputDirectory = "gs://your-bucket/your-path/"
idColumn = "<idColumn>"
embeddingColumn = "<embeddingColumn>"
# outputDirectory = "gs://your-bucket/your-path/"
# filenamePrefix = "part"
# crowdingTagColumn = "<crowdingTagColumn>"
# embeddingByteSize = "4"
# allowRestrictsMappings = "<allowRestrictsMappings>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
Expand All @@ -32,11 +34,14 @@
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -108,10 +113,10 @@
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
optional = true,
description = "Cloud Storage directory for storing JSON files",
helpText = "The Cloud Storage path where the output JSON files are stored.",
example = "gs://your-bucket/your-path/")
@Required
ValueProvider<String> getOutputDirectory();

@SuppressWarnings("unused")
Expand All @@ -120,6 +125,7 @@
@TemplateParameter.Text(
order = 5,
groupName = "Target",
optional = true,
description = "JSON file prefix",
helpText =
"The prefix of the JSON file name. For example, \"table1-\". If no value is provided, defaults to `part`.")
Expand Down Expand Up @@ -208,28 +214,27 @@
read = read.withoutValidation();
}

// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String> outputFilePrefix = options.getFilenamePrefix();

ValueProvider<String> outputFilePathWithPrefix =
ValueProvider.NestedValueProvider.of(
ValueProvider<String> filePathPrefix =
DualInputNestedValueProvider.of(

Check warning on line 218 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java#L217-L218

Added lines #L217 - L218 were not covered by tests
options.getOutputDirectory(),
(SerializableFunction<String, String>)
folder -> {
if (!folder.endsWith("/")) {
// Appending the slash if not provided by user
folder = folder + "/";
}
return folder + outputFilePrefix.get();
});
options.getFilenamePrefix(),
new SerializableFunction<TranslatorInput<String, String>, String>() {

Check warning on line 221 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java#L220-L221

Added lines #L220 - L221 were not covered by tests
@Override
public String apply(TranslatorInput<String, String> input) {
return FileSystems.matchNewResource(input.getX(), true)
.resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
.toString();

Check warning on line 226 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java#L224-L226

Added lines #L224 - L226 were not covered by tests
}
});

String userOption = options.getUserOption();
pipeline
.apply("Read from Bigtable", read)
.apply(
"Transform to JSON",
MapElements.via(
new BigtableToJsonFn(userOption.equals("FLATTEN"), options.getColumnsAliases())))
.apply("Write to storage", TextIO.write().to(outputFilePathWithPrefix).withSuffix(".json"));
.apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

Check warning on line 237 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToJson.java#L237

Added line #L237 was not covered by tests

return pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import com.google.protobuf.ByteString;
import java.io.IOException;
Expand All @@ -36,11 +38,14 @@
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -114,10 +119,10 @@
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
optional = true,
description = "Cloud Storage directory for storing JSON files",
helpText = "The Cloud Storage path where the output JSON files are stored.",
example = "gs://your-bucket/your-path/")
@Required
ValueProvider<String> getOutputDirectory();

@SuppressWarnings("unused")
Expand All @@ -126,6 +131,7 @@
@TemplateParameter.Text(
order = 5,
groupName = "Target",
optional = true,
description = "JSON file prefix",
helpText =
"The prefix of the JSON filename. For example: \"table1-\". If no value is provided, defaults to \"part\".")
Expand Down Expand Up @@ -281,19 +287,18 @@
}

// Concatenating cloud storage folder with file prefix to get complete path
ValueProvider<String> outputFilePrefix = options.getFilenamePrefix();

ValueProvider<String> outputFilePathWithPrefix =
ValueProvider.NestedValueProvider.of(
ValueProvider<String> filePathPrefix =
DualInputNestedValueProvider.of(

Check warning on line 291 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java#L290-L291

Added lines #L290 - L291 were not covered by tests
options.getOutputDirectory(),
(SerializableFunction<String, String>)
folder -> {
if (!folder.endsWith("/")) {
// Appending the slash if not provided by user
folder = folder + "/";
}
return folder + outputFilePrefix.get();
});
options.getFilenamePrefix(),
new SerializableFunction<TranslatorInput<String, String>, String>() {

Check warning on line 294 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java#L293-L294

Added lines #L293 - L294 were not covered by tests
@Override
public String apply(TranslatorInput<String, String> input) {
return FileSystems.matchNewResource(input.getX(), true)
.resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
.toString();

Check warning on line 299 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java#L297-L299

Added lines #L297 - L299 were not covered by tests
}
});
pipeline
.apply("Read from Bigtable", read)
.apply(
Expand All @@ -309,7 +314,7 @@
options.getIntNumericRestrictsMappings(),
options.getFloatNumericRestrictsMappings(),
options.getDoubleNumericRestrictsMappings())))
.apply("Write to storage", TextIO.write().to(outputFilePathWithPrefix).withSuffix(".json"));
.apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

Check warning on line 317 in v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/com/google/cloud/teleport/bigtable/BigtableToVectorEmbeddings.java#L317

Added line #L317 was not covered by tests

return pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public void testUnflattenedBigtableToJson() throws IOException {
.addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId())
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-json-output-")
.addParameter("bigtableAppProfileId", appProfileId);

// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ public void testBigtableToVectorEmbeddings() throws IOException {
.addParameter("bigtableInstanceId", bigtableResourceManager.getInstanceId())
.addParameter("bigtableTableId", tableId)
.addParameter("outputDirectory", getGcsPath("output/"))
.addParameter("filenamePrefix", "bigtable-to-json-output-")
.addParameter("idColumn", "_key")
.addParameter("embeddingColumn", "cf1:embedding")
.addParameter("crowdingTagColumn", "cf2:crowding")
Expand Down
Loading