Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: ignoreUnknownValues not working when using CreateDisposition.CREATE_IF_NEEDED #27892

Open
1 of 15 tasks
sarfarazahammed opened this issue Aug 8, 2023 · 11 comments
Open
1 of 15 tasks

Comments

@sarfarazahammed
Copy link

sarfarazahammed commented Aug 8, 2023

What happened?

We are currently executing a dataflow operation to transfer data from Kafka to BigQuery. Within this data flow, we have established a predetermined schema for BigQuery, which we use for generating new tables. The event from Kafka is updated with a new schema, our intention is to maintain the data in the existing tables while disregarding the newly introduced values.

In our initial implementation using STREAMING_INSERTS, we effectively addressed this scenario by configuring ignoreUnknownValues as true. This approach allowed us to manage this use case. However, upon transitioning to STORAGE_WRITE_API to achieve precisely once inserts, we encountered a challenge. Specifically, the aforementioned approach of utilizing ignoreUnknownValues is no longer effective, resulting in potential data loss.

The error we are encountering is as follows:

com.google.cloud.bigquery.storage.v1.ConnectionWorker doneCallback

Connection finished with error com.google.api.gax.rpc.InvalidArgumentException: 
io.grpc.StatusRuntimeException: 
INVALID_ARGUMENT: 
Input schema has more fields than BigQuery schema, extra fields: 'testfield' Entity: projects/{projectId}/{dataset}/tables/{tableName}/streams/{streamId} for stream projects/{projectId}/datasets/{staging_events_visitor}/tables/{tableId}/streams/{streamId} with write id: {writeId}

Steps to produce it:

  1. Create a TableSchema
  2. use .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withSchema(schema).withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) while writing to BQ
  3. Execute beam to transfer some data with the current schema
  4. Add new field to TableSchema
  5. Try to execute pipeline that consumes event with new field in it and adds it to existing table.
  6. You will see the error

Adding a table to provide more information and assume ignoreUnknownValues is set for all cases

withSchema set? field exist in TableSchema field exist in BQ CreateDisposition Remarks w.r.t StorageWriteAPI Remarks w.r.tStreaming Insert
yes no no ANY No issue No issue
yes no yes ANY value of the field is set as null in BQ No issue[value of the field is getting set]
yes yes yes ANY No issue No issue
yes yes no ANY INVALID_ARGUMENT: Input schema has more fields than BigQuery schema No issue
no   no CREATE_NEVER No issue No issue
no   yes CREATE_NEVER No issue [value of field is being set in BQ] No issue [value of field is being set in BQ]

*ANY represent, same behavior with of disposition(CREATE_NEVER / CREATE_IF_NEEDED)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@sarfarazahammed
Copy link
Author

.add-labels bigquery, io

@github-actions
Copy link
Contributor

github-actions bot commented Aug 8, 2023

Label cannot be managed because it does not exist in the repo. Please check your spelling.

@sarfarazahammed sarfarazahammed changed the title [Bug]: ignoreUnknownValues not working when using withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withSchema(successVisitorTableSchema) [Bug]: ignoreUnknownValues not working when using CreateDisposition.CREATE_IF_NEEDED Aug 8, 2023
@sarfarazahammed
Copy link
Author

.add-labels bigquery

@sarfarazahammed
Copy link
Author

.add-labels io

@github-actions github-actions bot added the io label Aug 9, 2023
@kennknowles
Copy link
Member

@Abacn @reuvenlax what are your thoughts on this change in options for the new API?

@reuvenlax
Copy link
Contributor

Trying to understand the actual issue. Is the problem that the table is precreated but the schema passed into BigQueryIO contains fields not in the actual table? ignoreUnknownValues is meant for the case when the "records" contain unknown values - the assumption was that the value passed into withSchema matched the actual table schema.

@kennknowles
Copy link
Member

Is the problem that the table is precreated but the schema passed into BigQueryIO contains fields not in the actual table?

That's what this reads like to me, yes. So I guess it would be a different option, if we wanted to add it. Specifically it could allow someone's cronjob to keep functioning though dropping data if they update the schema of the table it targets?

@reuvenlax
Copy link
Contributor

So this is an expected use of the sink. It wasn't really an expected us of STREAMING_INSERTS either, however since STREAMING_INSERTS does absolutely no schema vaidation, it just happened to work.

@sarfarazahammed
Copy link
Author

sarfarazahammed commented Sep 13, 2023

So this is an expected use of the sink. It wasn't really an expected us of STREAMING_INSERTS either, however since STREAMING_INSERTS does absolutely no schema vaidation, it just happened to work.

I agree with you on this but but it seems like the way things are currently set up, the system is double-checking the schema before actually writing data. So, if we have a field in the BQ table and the json we're sending has data for that field, but the schema provided in the request doesn't include details about that field, it's storing the value as null. I'm curious, is this the intended behavior? Is there a reason behind it?

Coming to schema (withSchema), it shouldn't work when CREATE_NEVER is used as per documentation. It was doing the job with StreamingInsert. Now it's even validating the schema on every write. Why?

@reuvenlax
Copy link
Contributor

The storage write API requires us to pass in a schema when we open a connection to it, and that schema must match the actual table schema. This is the schema you pass into withSchema. ignoreUnknownValues allows input element to have fields that don't match the schema, in which case Beam will simply ignore those extra fields. This scenario worked with STREAMING_INSERTS because the old STREAMING_INSERTS api didn't require us to pass in a schema.

@codertimu
Copy link
Contributor

@reuvenlax What's the purpose of using withAutoSchemaUpdate(true) with STORAGE_API methods? According to the documentation:
withAutoSchemaUpdate(boolean autoSchemaUpdate)
If set to true, it enables automatic detection of schema updates in BigQuery tables. These updates are typically recognized within a few minutes. This feature is only supported when using one of the STORAGE_API insert methods.

But, it doesn't seem to function as described. Maybe the documentation needs to be updated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants