Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Provide the Python equivalent to Java's BigQueryIO writeProtos #32781

Open
3 of 17 tasks
lazarillo opened this issue Oct 15, 2024 · 15 comments
Open
3 of 17 tasks

Comments

@lazarillo
Copy link
Contributor

What would you like to happen?

To my knowledge, there is no way to write protocol buffers to BigQuery using the Python SDK.

I am currently writing them by converting them to a dict (or to JSON if that were better, but using dict for now) and then writing the dict to BigQuery.

But the point of protocol buffers is that they reduce the data that we're sending over the wire by typically around 80%. So, converting back to a dict or JSON right before sending the data over the wire is counter-productive.

Is this feature already on the roadmap? Could it be placed on the roadmap? Is this a feature that I could help implement, or would it be far too complex with under-the-hood elements?

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

@lazarillo Is this for all the BigQueryIO method or just for StorageWrite? For the latter, it is quite easy since it is done through x-lang and Java already supports that.
cc @ahmedabu98

@ahmedabu98
Copy link
Contributor

@liferoad Java does support it but xlang introduces another bottle neck because it requires us to send Beam Rows from Python to Java.

We would have to convert protos in Python to Beam Rows and --> over to Java, where it would convert back to protos and write to BigQuery

@lazarillo
Copy link
Contributor Author

@liferoad , I do not use StorageWrite because I have found some situations where working with the cross-language APIs gets confusing and/or weird. So I try to stick with just one SDK for the entire pipeline. (I cannot recall one at the moment, sorry.)

So yes, I understand that I can make cross-language calls, but I am trying to stay away from that.

I hadn't even thought of @ahmedabu98 's point, but yes, it sounds like that wouldn't help the performance issue, anyway.

@lazarillo
Copy link
Contributor Author

lazarillo commented Oct 15, 2024

I actually have the following notes in my internal repo, which might give some idea as to why this sort of feature is useful.

As I'm sure anyone reading this thread is aware, the way that Google handles timestamps across all of its products is all over the place.

I actually have notes within our repo to make sure that I (or anyone else) does not forget all of these idiosyncracies:


Working with timestamps in Dataflow (and BigQuery and Pub/Sub and protobufs)

Timestamp expectations are all over the place within the Googleverse.

  • When working with protobufs, the most common is to use a protobuf Timestamp message, which has the fields seconds and nanos. (Note, it is nanos, not nanoseconds).
  • When working with the Pub/Sub-to-BigQuery direct subscription connector, you cannot use a protobuf Timestamp message, and you must instead provide an integer which is the number of microseconds since epoch (1970-01-01). Yes, microseconds.
  • When working within Dataflow itself (in Python), since there is no direct means to WriteToBigQuery with a protobuf message, you have to first convert to JSON or a dict or something similar. In this case (when working with JSON, or a dict, which is pushed to a JSON representation behind the scenes as best I can tell), the timestamp must either be an integer which is the number of seconds since epoch or it can be a float which is the number of milliseconds since epoch. So the expected resolution depends upon the primitive type! Best to avoid using a numeric timestamp in this case.

Yes, you read that correctly, when working with timestamps in the Googleverse, it must be represented either as nanoseconds, microseconds, milliseconds, or seconds, depending upon the resource you're talking to and the data type of the timestamp itself (and maybe the language you're using).

Best practice to deal with all of this:

  • If the item will ever be written directly into BigQuery from Pub/Sub via the direct subscription, you have no choice, it must be an integer representing the number of microseconds since epoch.
  • If the item is not written directly to BigQuery (it is processed in Dataflow), represent it as a proper protobuf Timestamp because when this is converted to a JSON or dict, the protobuf conversion code manages this by converting it to a string representation using the proper scale.
    • This creates a larger message over the wire, but it prevents any accidental errors depending upon source data type being int or float.

So the addition of this feature will drastically simplify our workflow of type checking, etc.

@liferoad
Copy link
Collaborator

We do not have a plan now to improve the BigQueryIO methods except StorageWrite. This is why I asked whether you use StorageWrite.

@lazarillo
Copy link
Contributor Author

OK. Bummer to hear that. I cannot be the only one to think that the challenges in working cross-language are not worth the hassle.

How viable is it to create a PR to implement this sort of thing?

  1. If I were to implement everything that I needed correctly and with proper unit tests, would it be merged into the repo?
  2. Is it feasible or imaginable to you that a senior level Python eng. could actually figure out what is happening under the hood here so that I might be able to correctly implement everything needed?

@liferoad
Copy link
Collaborator

You are definitely welcome to make any PR to improve this experience.

I think we have two issues here if we plan to work on StorageWrite:

  • cross-language experience: what are your pain points? How could we improve them?
  • Beam Row: allow proto can be passed in. I think this one is not easy.

Another idea (only for your case if it is simple to use BigQuery as a sink) is you could create your own IO following this BigQuery example: https://cloud.google.com/bigquery/docs/samples/bigquerystorage-append-rows-raw-proto2#bigquerystorage_append_rows_raw_proto2-python

@ahmedabu98
Copy link
Contributor

It could be pretty challenging if it's your first time delving into Beam code. Our BigQuery connectors are our most complex.

You also don't have a Python-native Storage Write API connector to work off of. And I think to work with protos, you'd need Storage Write API. From a cursory search, I don't think the legacy streaming API supports protos, but the Storage Write API Python client does.

With that said, it's definitely feasible and we'd welcome such an improvement. You could open a PR with the functionality and unit/integration tests and someone will review it. Just make sure it is well integrated with the existing WriteToBigQuery API

@ahmedabu98
Copy link
Contributor

Side note - Beam is shifting towards leaning more on cross-language transforms, so if there's something we can improve (I know there's plenty) we'd love the feedback. Your comment on timestamps was pretty comprehensive

@lazarillo
Copy link
Contributor Author

I will think more on my biggest pain points going cross-language, and provide a specific example when I remember / experience it.

But for now, I'll give one pain point: I am really bad at Java. I am a machine learning / data engineer. If I could, I'd work with modern languages like Go or Rust. But I need to use a language that is well supported for my career. So that means either Java, Python, or Scala. (Less so Scala these days, but it was quite big when Spark initially started.) I chose Python because it's easiest to work with, but also because I vehemently believe that code should be easy to read, and that object oriented is a choice that is only sometimes the right choice. So Python seemed better than Java. Lastly, the JVM was far more useful before Docker came onto the scene. So even that perk lost its luster.

I very much rely upon discovered code when documentation is insufficient. I can read just about any Python code, no matter how complex, and understand what it is doing and how I can interact with it (and whether I should or not). But with Java (or Go), I am too ignorant of the language: I need good docs in order to know what to do and how to interact with the SDK.

I think the Apache Beam docs are great. But Beam is such a complex beast that often I learn what I really need to know by going straight to the source code (which is linked to throughout the documentation, which is awesome).

Because of all this, I try to stay with the pure Python code as much as possible. It allows me more confidence to create my pipeline. Even a "simple" pipeline is quite complex, I've found in my experience.

@lazarillo
Copy link
Contributor Author

Thanks, @ahmedabu98 . The timestamps thing is by no means Beam's fault... that's a Google thing, and specifically a GCP thing. I know there is strong overlap. But still, Beam should not need to compensate for GCP because Beam needs to be cross-runner to be successful, IMHO.

I know I just said I'm not a fan of cross-language. But I do like the cross-runner work!

@lazarillo
Copy link
Contributor Author

I did try to use the StorageWrite API about 1 year ago, and I was having trouble getting it to work. That was in an Airflow job, not within Dataflow / Beam, and it was a while ago. I guess I'll try StorageWrite again, if it's where all the work is going.

Can I ask: why is there a strong push for cross-language, instead of creating separate SDKs for each language? Is it easier to maintain? I assume there a loss in efficiency or bandwidth by going cross language, right?

I ask because we were considering moving our Dataflow work to Go once Go is more fully supported. But if Go is supported via cross-language integrations, then I guess there is no speed-up or memory reasons to move to Go.

@liferoad
Copy link
Collaborator

Yes, it is easier to maintain. For our own tests, the efficiency is not a problem with cross-lang.
For Go, most IOs will be supported via cross-lang.
cc @lostluck @kennknowles @robertwb @chamikaramj they may have more comments.

@ahmedabu98
Copy link
Contributor

I try to stay with the pure Python code as much as possible

We're ideally aiming for a multi-language experience where the user is not even aware that they are using a cross-language transform. They might still need a JVM or docker, but ideally they'd just worry about writing pure Python code like you say

I did try to use the StorageWrite API about 1 year ago, and I was having trouble getting it to work

Might be worth a shot trying it again -- there was quite a bit of work done around 6 months ago to fix multiple issues with Storage Write API

why is there a strong push for cross-language

Yep it's a lot easier to maintain, and significantly lowers the threshold for new SDKs to become usable. Also new features added to the main transform can almost instantly become available to foreign SDKs.
With that said, there's nothing stopping SDKs from still developing and building native transforms.

I assume there a loss in efficiency or bandwidth by going cross language

There's some -- cross-language introduces a fusion break between SDKs, and elements need to get serialized to pass through.

were considering moving our Dataflow work to Go once Go is more fully supported

@lostluck can speak more on Go's roadmap

@lazarillo
Copy link
Contributor Author

OK, thanks everyone!

I'm fighting other issues at the moment, but once I get a chance, I'll revisit the StorageWriteAPI and see if it can work for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants