Skip to content

Conversation

shnapz
Copy link
Contributor

@shnapz shnapz commented Jan 30, 2024

writeTypedRows is not always suitable. It is using GCP's insertAll. Some GCP APIs do not reflect recent loads this way, like:
table.getNumRows() returns 0
Old post.
Data loaded as upload from file does not cause the same problem.

Copy link

codecov bot commented Jan 30, 2024

Codecov Report

Attention: 20 lines in your changes are missing coverage. Please review.

Comparison is base (5c112ff) 62.63% compared to head (cb0cb05) 62.53%.
Report is 9 commits behind head on main.

Files Patch % Lines
...ala/com/spotify/scio/bigquery/client/LoadOps.scala 0.00% 20 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5218      +/-   ##
==========================================
- Coverage   62.63%   62.53%   -0.10%     
==========================================
  Files         301      301              
  Lines       10845    10867      +22     
  Branches      768      744      -24     
==========================================
+ Hits         6793     6796       +3     
- Misses       4052     4071      +19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

build.sbt Outdated
val googleCloudPubSubVersion = "1.107.13"
val googleCloudSpannerVersion = "6.55.0"
val googleCloudStorageVersion = "2.30.1"
val googleCloudStorageVersion = "2.26.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in sync with Beam

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your link points to GCP libraries-bom 26.22.0 but beam 2.53 uses 26.28.0 here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, reverting this

@shnapz shnapz requested a review from clairemcginty January 30, 2024 23:32
def uploadTypedRows[T <: HasAnnotation: TypeTag](
tableSpec: String,
rows: List[T],
tempLocation: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is temp, shouldn't we clean if afterward ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about bucket retention policy, but yeah, deleting it would be more optimal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on the other hand if I do:

  avro(
        List(blobId.toGsUtilUri),
        tableSpec,
        schema = Some(bqt.schema),
        createDisposition = createDisposition,
        writeDisposition = writeDisposition
  )

  storage.delete(blobId)

I am not confident that avro is fully synchronous and BQ doesn't read that file on the background

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute contains jobService.waitForJobs(loadJob) so I think this is fine.

I'm wondering also if we should create a SaveOps to allow saving some file formats (avro/json)

@shnapz shnapz requested a review from RustedBones February 8, 2024 23:03
* Upload List of rows to Cloud Storage as Avro file and load to BigQuery table. Note that element
* type `T` must be annotated with [[BigQueryType]].
*/
def uploadTypedRows[T <: HasAnnotation: TypeTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO this naming can be simplified. Other APIs here do not have the upload prefix.
Usage will be from BigQuery with bq.load.uploadTypeRows.

I think this should be named

Suggested change
def uploadTypedRows[T <: HasAnnotation: TypeTag](
def typedRows[T <: HasAnnotation: TypeTag](

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants