Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryIO uniformize direct and export reads #32360

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

RustedBones
Copy link
Contributor

@RustedBones RustedBones commented Aug 29, 2024

Refers to #26329, also fix #20100, #21076

When using readWithDatumReader and DIRECT_READ method, the transform would fail because the parseFn is expected. Refactor the IO so the avro datumReader can be use in both cases.

In some case, it is required to get the data with the desired schema. Currently, BQ io always uses the writer schema (or table schema). Create new APIs to set the reader schema.

This refactoring contains some breaking changes:

withFormat is not exposed anymore. Indeed, it is not possible to configure a TypedRead with a DatumReaderFactory and change the format later. Data format MUST be chosen when creating the transform.

In the TypedRead.Builder, replace the DatumReaderFactory with the BigQueryReaderFactory allowing to handle both avro and arrow in uniform fashion. This alters the BigQueryIOTranslation.
I need some help on that point to handle that in a better way.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@RustedBones
Copy link
Contributor Author

assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@RustedBones
Copy link
Contributor Author

Some BQ integration tests are failing.

I don't know schema & data of the following big_query_import_export.parallel_read_table_row_xxx tables so I can recreated the setup in a personal GCP project. Can someone give me a hand ?

@RustedBones RustedBones marked this pull request as draft September 3, 2024 07:18
@github-actions github-actions bot added examples and removed examples labels Sep 3, 2024
@RustedBones RustedBones marked this pull request as ready for review September 3, 2024 19:48
@github-actions github-actions bot added examples and removed examples labels Sep 4, 2024
// read table schema and infer coder if possible
Coder<T> c;
if (getCoder() == null) {
tableSchema = requestTableSchema(sourceDef, bqOptions, getSelectedFields());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fine to access the BQ table at graph creation time? (It was already doing that when beam schema was requested)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is a valid concern. I've heard use case where pipeline submission machine does not or has incomplete permission to the resource, and infer schema at graph creation time can cause issue. General guideline is the use case used to work should be able to work still (and vice versa)

@@ -1731,7 +1870,7 @@ public void processElement(ProcessContext c) throws Exception {
.setTable(
BigQueryHelpers.toTableResourceName(
queryResultTable.getTableReference()))
.setDataFormat(DataFormat.AVRO))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was arrow even supported ?

Comment on lines 40 to 43
// TODO make file source params generic (useAvroLogicalTypes)
abstract BoundedSource<T> getSource(
MatchResult.Metadata metadata,
TableSchema tableSchema,
Boolean useAvroLogicalTypes,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any proposal here ?
If there is a plan to support CSV export for instance, we'd have to pass the chosen field_delimiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a plan to support CSV export for instance

no major change on EXPORT mode planned afaik. Currently efforts focused on improving DIRECT_READ mode

* @throws BigQuerySchemaRetrievalException if schema retrieval fails
*/
Schema getBeamSchema(BigQueryOptions bqOptions);
TableSchema getTableSchema(BigQueryOptions bqOptions);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is in BQ realm, it makes more sense to return unaltered TableSchema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was proposal to make Beam Schema uniform in future Beam versions (3.x), unfortunately BigQueryIO is the biggest IO that do not follow this (and has its own TableSchema that does not implements Serializable). I would suggest keep "getBeamSchema". We can have "getTableSchema" as addition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May changes here:

  • generating avro schema from table schema has 2 options:

    • with logical types, as done in the BQ direct read, and BQ export with use_logical_type
    • without logical type, as done in BQ export. This conversion is destructive as many types fallback to String
  • converting GenericRecord to TableRow changed. It now expects the logical-type schema and thus can drop the need of the TableSchema for conversion

Comment on lines 329 to 292
byte[] formatBytes = configRow.getBytes("format");
if (methodBytes != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small bug here

@@ -89,8 +95,8 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator<Type
.addNullableBooleanField("use_legacy_sql")
.addNullableBooleanField("with_template_compatibility")
.addNullableByteArrayField("bigquery_services")
.addNullableByteArrayField("parse_fn")
.addNullableByteArrayField("datum_reader_factory")
.addNullableByteArrayField("bigquery_reader_factory")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a complex object to serialize. subject to serialization error if there's changes between versions

.put("TIMESTAMP", Type.LONG)
.put("RECORD", Type.RECORD)
.put("STRUCT", Type.RECORD)
.put("DATE", Type.STRING)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was probably the cause of #20677. Schema conversion was only taking the 1st occurence.
When writing a date we want a int with date logical type. We only want the string representation when reading an exported table without logical-types enabled.

Copy link
Contributor

Reminder, please take a look at this pr: @robertwb @ahmedabu98

Copy link
Contributor

Reminder, please take a look at this pr: @damondouglas @Abacn

Copy link
Contributor

github-actions bot commented Oct 2, 2024

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

Copy link
Contributor

github-actions bot commented Oct 9, 2024

Reminder, please take a look at this pr: @kennknowles @ahmedabu98

Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@ahmedabu98
Copy link
Contributor

waiting on author

It should be possible to read BQ avro data using a provided
compatible avro schema for both file and direct read.

Add readRows api

Improve coder inference

Self review

Fix concurrency issue

spotless

checkstyle

Ignore BigQueryIOTranslationTest

Add missing project option to execute test

Call table schema only if required

Fix avro export without logical type

checkstyle

Add back float support

FIx write test

Add arrow support in translation
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BigQueryIO TableRowParser should support Arrow and Avro data formats
3 participants