From 49554f5e8e75e1ffc83a0579c7a7ce1f8407176c Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 21 Oct 2024 15:15:57 +0200 Subject: [PATCH] Update translation IO --- .../beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java | 9 ++++++--- .../sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java | 4 ++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 4924845a573c..243a3a09d8ed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -280,9 +280,8 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { } } - if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") < 0) { + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.61.0") < 0) { // best effort migration - // if user was specifying a custom datum_reader_factory, that would fail byte[] formatBytes = configRow.getBytes("format"); DataFormat dataFormat = null; if (formatBytes != null) { @@ -290,7 +289,11 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { } byte[] parseFnBytes = configRow.getBytes("parse_fn"); - if (parseFnBytes != null) { + if (parseFnBytes == null) { + // parseFn is null only when creating IO with readWithDatumReader + throw new RuntimeException( + "Upgrading BigqueryIO readWithDatumReader transforms is not supported."); + } else { SerializableFunction parseFn = (SerializableFunction) fromByteArray(parseFnBytes); BigQueryReaderFactory readerFactory; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 6f92aacd1015..6699b115bcc1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -156,7 +156,7 @@ public void testReCreateReadTransformFromRowTable() { Row row = translator.toConfigRow(readTransform); PipelineOptions options = PipelineOptionsFactory.create(); - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.60.0"); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.61.0"); BigQueryIO.TypedRead readTransformFromRow = (BigQueryIO.TypedRead) translator.fromConfigRow(row, options); assertNotNull(readTransformFromRow.getTable()); @@ -189,7 +189,7 @@ public void testReCreateReadTransformFromRowQuery() { Row row = translator.toConfigRow(readTransform); PipelineOptions options = PipelineOptionsFactory.create(); - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.60.0"); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.61.0"); BigQueryIO.TypedRead readTransformFromRow = translator.fromConfigRow(row, options); assertEquals("dummyquery", readTransformFromRow.getQuery().get()); assertNotNull(readTransformFromRow.getBigQueryReaderFactory());