-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQueryIO uniformize direct and export reads #32360
base: master
Are you sure you want to change the base?
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
5dbe389
to
989ccdd
Compare
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Some BQ integration tests are failing. I don't know schema & data of the following |
// read table schema and infer coder if possible | ||
Coder<T> c; | ||
if (getCoder() == null) { | ||
tableSchema = requestTableSchema(sourceDef, bqOptions, getSelectedFields()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it fine to access the BQ table at graph creation time? (It was already doing that when beam schema was requested)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is a valid concern. I've heard use case where pipeline submission machine does not or has incomplete permission to the resource, and infer schema at graph creation time can cause issue. General guideline is the use case used to work should be able to work still (and vice versa)
@@ -1731,7 +1870,7 @@ public void processElement(ProcessContext c) throws Exception { | |||
.setTable( | |||
BigQueryHelpers.toTableResourceName( | |||
queryResultTable.getTableReference())) | |||
.setDataFormat(DataFormat.AVRO)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was arrow even supported ?
// TODO make file source params generic (useAvroLogicalTypes) | ||
abstract BoundedSource<T> getSource( | ||
MatchResult.Metadata metadata, | ||
TableSchema tableSchema, | ||
Boolean useAvroLogicalTypes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any proposal here ?
If there is a plan to support CSV export for instance, we'd have to pass the chosen field_delimiter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a plan to support CSV export for instance
no major change on EXPORT mode planned afaik. Currently efforts focused on improving DIRECT_READ mode
* @throws BigQuerySchemaRetrievalException if schema retrieval fails | ||
*/ | ||
Schema getBeamSchema(BigQueryOptions bqOptions); | ||
TableSchema getTableSchema(BigQueryOptions bqOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is in BQ realm, it makes more sense to return unaltered TableSchema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was proposal to make Beam Schema uniform in future Beam versions (3.x), unfortunately BigQueryIO is the biggest IO that do not follow this (and has its own TableSchema that does not implements Serializable). I would suggest keep "getBeamSchema". We can have "getTableSchema" as addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May changes here:
-
generating avro schema from table schema has 2 options:
- with logical types, as done in the BQ direct read, and BQ export with
use_logical_type
- without logical type, as done in BQ export. This conversion is destructive as many types fallback to
String
- with logical types, as done in the BQ direct read, and BQ export with
-
converting
GenericRecord
toTableRow
changed. It now expects the logical-type schema and thus can drop the need of theTableSchema
for conversion
byte[] formatBytes = configRow.getBytes("format"); | ||
if (methodBytes != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small bug here
@@ -89,8 +95,8 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator<Type | |||
.addNullableBooleanField("use_legacy_sql") | |||
.addNullableBooleanField("with_template_compatibility") | |||
.addNullableByteArrayField("bigquery_services") | |||
.addNullableByteArrayField("parse_fn") | |||
.addNullableByteArrayField("datum_reader_factory") | |||
.addNullableByteArrayField("bigquery_reader_factory") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a complex object to serialize. subject to serialization error if there's changes between versions
.put("TIMESTAMP", Type.LONG) | ||
.put("RECORD", Type.RECORD) | ||
.put("STRUCT", Type.RECORD) | ||
.put("DATE", Type.STRING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was probably the cause of #20677. Schema conversion was only taking the 1st occurence.
When writing a date
we want a int
with date
logical type. We only want the string
representation when reading an exported table without logical-types enabled.
Reminder, please take a look at this pr: @robertwb @ahmedabu98 |
Reminder, please take a look at this pr: @damondouglas @Abacn |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
|
Reminder, please take a look at this pr: @kennknowles @ahmedabu98 |
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
|
waiting on author |
448caf1
to
5bb43c5
Compare
5bb43c5
to
929d028
Compare
It should be possible to read BQ avro data using a provided compatible avro schema for both file and direct read. Add readRows api Improve coder inference Self review Fix concurrency issue spotless checkstyle Ignore BigQueryIOTranslationTest Add missing project option to execute test Call table schema only if required Fix avro export without logical type checkstyle Add back float support FIx write test Add arrow support in translation
834277f
to
49554f5
Compare
Refers to #26329, also fix #20100, #21076
When using
readWithDatumReader
andDIRECT_READ
method, the transform would fail because theparseFn
is expected. Refactor the IO so the avrodatumReader
can be use in both cases.In some case, it is required to get the data with the desired schema. Currently, BQ io always uses the writer schema (or table schema). Create new APIs to set the reader schema.
This refactoring contains some breaking changes:
withFormat
is not exposed anymore. Indeed, it is not possible to configure aTypedRead
with aDatumReaderFactory
and change the format later. Data format MUST be chosen when creating the transform.In the
TypedRead.Builder
, replace theDatumReaderFactory
with theBigQueryReaderFactory
allowing to handle both avro and arrow in uniform fashion. This alters theBigQueryIOTranslation
.I need some help on that point to handle that in a better way.