Skip to content

Commit

Permalink
Dataplex Tiering template: fix fo the handling of the DATE fields.
Browse files Browse the repository at this point in the history
The CL updates the BigQuery to GCS transform to handle the fields with the type DATE. There is misalignment between Apache Beam and BigQuery API when translating
the DATE BigQuery type to Avro type. Beam translates it as `string` while BigQuery translates it as `int` with a "logicalType" `date`.

PiperOrigin-RevId: 427026994
  • Loading branch information
oleg-semenov authored and cloud-teleport committed Feb 7, 2022
1 parent c360883 commit 51f613a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
*/
package com.google.cloud.teleport.v2.transforms;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.cloud.teleport.v2.utils.BigQueryToGcsDirectoryNaming;
import com.google.cloud.teleport.v2.utils.BigQueryToGcsFileNaming;
import com.google.cloud.teleport.v2.utils.Schemas;
import com.google.cloud.teleport.v2.values.BigQueryTable;
import com.google.cloud.teleport.v2.values.BigQueryTablePartition;
import com.google.cloud.teleport.v2.values.DataplexCompression;
import com.google.common.annotations.VisibleForTesting;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.AvroIO;
Expand Down Expand Up @@ -200,7 +203,7 @@ private PCollection<KV<BigQueryTablePartition, String>> transformPartition(

private TypedRead<GenericRecord> getDefaultRead() {
TypedRead<GenericRecord> read =
BigQueryIO.read(SchemaAndRecord::getRecord)
BigQueryIO.read(this::genericRecordWithFixedDates)
.withTemplateCompatibility()
// Performance hit due to validation is too big. When exporting a table with thousands
// of partitions launching the job takes more than 12 minutes (Flex template timeout).
Expand All @@ -215,6 +218,35 @@ private TypedRead<GenericRecord> getDefaultRead() {
return testServices == null ? read : read.withTestServices(testServices);
}

/**
* When Beam's BigQueryIO reads from BQ it derives the Avro schema by itself, where it maps BQ's
* `DATE` type to Avro's `string` type, so the GenericRecords outputed by the BigQueryIO contain
* `string` fields for the `DATE` columns. The Avro schema obtained from the BQ directly -- {@code
* table.getSchema()} has the `DATE` columns mapped to type Avro's `int` with logical type `date`.
* To fix this mismatch this cmethod converts the `string` dates fields to `int` with logical type
* `date` fields.
*
* <p>Note that for the TIMESTAMP type both Beam's BigQueryIO and BQ API map it to `long` so there
* is no mismatch.
*/
private GenericRecord genericRecordWithFixedDates(SchemaAndRecord schemaAndRecord) {
GenericRecord input = schemaAndRecord.getRecord();
GenericRecord output = new GenericData.Record(table.getSchema());
for (TableFieldSchema fieldSchema : schemaAndRecord.getTableSchema().getFields()) {
if ("DATE".equals(fieldSchema.getType())) {
Object value = input.get(fieldSchema.getName());
if (!(value instanceof CharSequence)) {
throw new IllegalStateException(
"The class of input value of type DATE is " + value.getClass());
}
output.put(fieldSchema.getName(), (int) LocalDate.parse((CharSequence) value).toEpochDay());
} else {
output.put(fieldSchema.getName(), input.get(fieldSchema.getName()));
}
}
return output;
}

private Write<Void, GenericRecord> getDefaultWrite() {
return FileIO.<GenericRecord>write()
.withNumShards(1); // Must be 1 as we can only have 1 file per partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void setUp() throws InterruptedException, IOException {
ImmutableList.of(
new TableFieldSchema().setName("ts").setType("TIMESTAMP"),
new TableFieldSchema().setName("s1").setType("STRING"),
new TableFieldSchema().setName("d1").setType("DATE"),
new TableFieldSchema().setName("i1").setType("INTEGER")));

avroSchema =
Expand All @@ -156,6 +157,7 @@ public void setUp() throws InterruptedException, IOException {
"{\"type\":\"record\",\"name\":\"__root__\",\"fields\":"
+ "[{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}]},"
+ "{\"name\":\"s1\",\"type\":[\"null\",\"string\"]},"
+ "{\"name\":\"d1\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}]},"
+ "{\"name\":\"i1\",\"type\":[\"null\",\"long\"]}]}");

long modTime = System.currentTimeMillis() * 1000;
Expand Down Expand Up @@ -197,20 +199,20 @@ public void setUp() throws InterruptedException, IOException {

defaultRecords =
new TableRow[] {
new TableRow().set("ts", 1L).set("s1", "1001").set("i1", 2001L),
new TableRow().set("ts", 2L).set("s1", "1002").set("i1", 2002L),
new TableRow().set("ts", 3L).set("s1", "1003").set("i1", 2003L),
new TableRow().set("ts", 4L).set("s1", "1004").set("i1", null),
new TableRow().set("ts", 5L).set("s1", "1005").set("i1", 2005L)
new TableRow().set("ts", 1L).set("s1", "1001").set("d1", "1970-01-01").set("i1", 2001L),
new TableRow().set("ts", 2L).set("s1", "1002").set("d1", "1970-01-02").set("i1", 2002L),
new TableRow().set("ts", 3L).set("s1", "1003").set("d1", "1970-01-03").set("i1", 2003L),
new TableRow().set("ts", 4L).set("s1", "1004").set("d1", "1970-01-04").set("i1", null),
new TableRow().set("ts", 5L).set("s1", "1005").set("d1", "1970-01-05").set("i1", 2005L)
};

defaultExpectedRecords =
new String[] {
"{\"ts\": 1, \"s1\": \"1001\", \"i1\": 2001}",
"{\"ts\": 2, \"s1\": \"1002\", \"i1\": 2002}",
"{\"ts\": 3, \"s1\": \"1003\", \"i1\": 2003}",
"{\"ts\": 4, \"s1\": \"1004\", \"i1\": null}",
"{\"ts\": 5, \"s1\": \"1005\", \"i1\": 2005}"
"{\"ts\": 1, \"s1\": \"1001\", \"d1\": 0, \"i1\": 2001}",
"{\"ts\": 2, \"s1\": \"1002\", \"d1\": 1, \"i1\": 2002}",
"{\"ts\": 3, \"s1\": \"1003\", \"d1\": 2, \"i1\": 2003}",
"{\"ts\": 4, \"s1\": \"1004\", \"d1\": 3, \"i1\": null}",
"{\"ts\": 5, \"s1\": \"1005\", \"d1\": 4, \"i1\": 2005}"
};

FakeDatasetService.setUp();
Expand Down Expand Up @@ -406,14 +408,14 @@ public void testE2E_withEnforceSamePartitionKeyEnabled_producesRenamedColumns()

String[] expectedRecords1 =
new String[] {
"{\"ts_pkey\": 1, \"s1\": \"1001\", \"i1\": 2001}",
"{\"ts_pkey\": 2, \"s1\": \"1002\", \"i1\": 2002}"
"{\"ts_pkey\": 1, \"s1\": \"1001\", \"d1\": 0, \"i1\": 2001}",
"{\"ts_pkey\": 2, \"s1\": \"1002\", \"d1\": 1, \"i1\": 2002}"
};
String[] expectedRecords2 =
new String[] {
"{\"ts_pkey\": 3, \"s1\": \"1003\", \"i1\": 2003}",
"{\"ts_pkey\": 4, \"s1\": \"1004\", \"i1\": null}",
"{\"ts_pkey\": 5, \"s1\": \"1005\", \"i1\": 2005}"
"{\"ts_pkey\": 3, \"s1\": \"1003\", \"d1\": 2, \"i1\": 2003}",
"{\"ts_pkey\": 4, \"s1\": \"1004\", \"d1\": 3, \"i1\": null}",
"{\"ts_pkey\": 5, \"s1\": \"1005\", \"d1\": 4, \"i1\": 2005}"
};

PAssert.that(actualRecords1).containsInAnyOrder(expectedRecords1);
Expand Down

0 comments on commit 51f613a

Please sign in to comment.