Skip to content

Commit

Permalink
BigQueryIO allow to use an avro schema when reading
Browse files Browse the repository at this point in the history
It should be possible to read BQ avro data using a provided
compatible avro schema for both file and direct read.

Add readRows api

Improve coder inference

Self review

Fix concurrency issue

spotless

checkstyle

Ignore BigQueryIOTranslationTest

Add missing project option to execute test

Call table schema only if required

Fix avro export without logical type

checkstyle

Add back float support

FIx write test

Add arrow support in translation
  • Loading branch information
RustedBones committed Oct 16, 2024
1 parent 7d0bfd0 commit 5bb43c5
Show file tree
Hide file tree
Showing 33 changed files with 1,052 additions and 701 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,16 @@ public static RecordBatchRowIterator rowsFromSerializedRecordBatch(
InputStream inputStream,
RootAllocator allocator)
throws IOException {
return rowsFromSerializedRecordBatch(
arrowSchema, ArrowSchemaTranslator.toBeamSchema(arrowSchema), inputStream, allocator);
}

public static RecordBatchRowIterator rowsFromSerializedRecordBatch(
org.apache.arrow.vector.types.pojo.Schema arrowSchema,
Schema schema,
InputStream inputStream,
RootAllocator allocator)
throws IOException {
VectorSchemaRoot vectorRoot = VectorSchemaRoot.create(arrowSchema, allocator);
VectorLoader vectorLoader = new VectorLoader(vectorRoot);
vectorRoot.clear();
Expand All @@ -261,7 +271,7 @@ public static RecordBatchRowIterator rowsFromSerializedRecordBatch(
vectorLoader.load(arrowMessage);
}
}
return rowsFromRecordBatch(ArrowSchemaTranslator.toBeamSchema(arrowSchema), vectorRoot);
return rowsFromRecordBatch(schema, vectorRoot);
}

public static org.apache.arrow.vector.types.pojo.Schema arrowSchemaFromInput(InputStream input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ public interface DatumReaderFactory<T> extends Serializable {
// Use cases of AvroSource are:
// 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.
// 2) AvroSource<Foo> Reading records of a generated Avro class Foo.
// 3) AvroSource<T> Reading GenericRecord records with an unspecified schema
// 3) AvroSource<T> Reading GenericRecord records with an (un)specified schema
// and converting them to type T.
// | Case 1 | Case 2 | Case 3 |
// type | GenericRecord | Foo | GenericRecord |
// readerSchemaString | non-null | non-null | null |
// readerSchemaString | non-null | non-null | either |
// parseFn | null | null | non-null |
// outputCoder | either | either | non-null |
// readerFactory | either | either | either |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.testing.PAssert;
Expand Down Expand Up @@ -179,20 +178,18 @@ private void readSketchFromBigQuery(String tableId, Long expectedCount) {
"SELECT HLL_COUNT.INIT(%s) AS %s FROM %s",
DATA_FIELD_NAME, QUERY_RESULT_FIELD_NAME, tableSpec);

SerializableFunction<SchemaAndRecord, byte[]> parseQueryResultToByteArray =
input ->
SerializableFunction<GenericRecord, byte[]> parseQueryResultToByteArray =
record ->
// BigQuery BYTES type corresponds to Java java.nio.ByteBuffer type
HllCount.getSketchFromByteBuffer(
(ByteBuffer) input.getRecord().get(QUERY_RESULT_FIELD_NAME));
HllCount.getSketchFromByteBuffer((ByteBuffer) record.get(QUERY_RESULT_FIELD_NAME));

TestPipelineOptions options =
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);

Pipeline p = Pipeline.create(options);
PCollection<Long> result =
p.apply(
BigQueryIO.read(parseQueryResultToByteArray)
.withFormat(DataFormat.AVRO)
BigQueryIO.readAvro(parseQueryResultToByteArray)
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.extensions.avro.io.AvroSink;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.MimeTypes;

Expand All @@ -36,7 +36,7 @@ class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
String basename,
Schema schema,
SerializableFunction<AvroWriteRequest<T>, AvroT> toAvroRecord,
SerializableFunction<Schema, DatumWriter<AvroT>> writerFactory)
AvroSink.DatumWriterFactory<AvroT> writerFactory)
throws Exception {
super(basename, MimeTypes.BINARY);

Expand Down
Loading

0 comments on commit 5bb43c5

Please sign in to comment.