Skip to content

Commit

Permalink
Update translation IO
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Oct 21, 2024
1 parent 7fe0491 commit 49554f5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,17 +280,20 @@ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
}
}

if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") < 0) {
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.61.0") < 0) {
// best effort migration
// if user was specifying a custom datum_reader_factory, that would fail
byte[] formatBytes = configRow.getBytes("format");
DataFormat dataFormat = null;
if (formatBytes != null) {
dataFormat = (DataFormat) fromByteArray(formatBytes);
}

byte[] parseFnBytes = configRow.getBytes("parse_fn");
if (parseFnBytes != null) {
if (parseFnBytes == null) {
// parseFn is null only when creating IO with readWithDatumReader
throw new RuntimeException(
"Upgrading BigqueryIO readWithDatumReader transforms is not supported.");
} else {
SerializableFunction<SchemaAndRecord, ?> parseFn =
(SerializableFunction<SchemaAndRecord, ?>) fromByteArray(parseFnBytes);
BigQueryReaderFactory<?> readerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testReCreateReadTransformFromRowTable() {
Row row = translator.toConfigRow(readTransform);

PipelineOptions options = PipelineOptionsFactory.create();
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.60.0");
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.61.0");
BigQueryIO.TypedRead<TableRow> readTransformFromRow =
(BigQueryIO.TypedRead<TableRow>) translator.fromConfigRow(row, options);
assertNotNull(readTransformFromRow.getTable());
Expand Down Expand Up @@ -189,7 +189,7 @@ public void testReCreateReadTransformFromRowQuery() {
Row row = translator.toConfigRow(readTransform);

PipelineOptions options = PipelineOptionsFactory.create();
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.60.0");
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.61.0");
BigQueryIO.TypedRead<?> readTransformFromRow = translator.fromConfigRow(row, options);
assertEquals("dummyquery", readTransformFromRow.getQuery().get());
assertNotNull(readTransformFromRow.getBigQueryReaderFactory());
Expand Down

0 comments on commit 49554f5

Please sign in to comment.