Skip to content

Commit f7e85af

Browse files
refactor: Switch write api usage to use AppendRowsStream helper
1 parent 248c8ea commit f7e85af

File tree

1 file changed

+21
-22
lines changed

1 file changed

+21
-22
lines changed

bigframes/session/loader.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import google.cloud.bigquery as bigquery
5050
import google.cloud.bigquery.table
5151
from google.cloud.bigquery_storage_v1 import types as bq_storage_types
52+
from google.cloud.bigquery_storage_v1 import writer as bq_storage_writer
5253
import pandas
5354
import pyarrow as pa
5455

@@ -488,31 +489,29 @@ def stream_worker(work: Iterator[pa.RecordBatch]) -> str:
488489
stream = self._write_client.create_write_stream(
489490
parent=parent, write_stream=requested_stream
490491
)
491-
stream_name = stream.name
492-
493-
def request_generator():
494-
current_offset = 0
495-
for batch in work:
496-
request = bq_storage_types.AppendRowsRequest(
497-
write_stream=stream.name, offset=current_offset
498-
)
492+
base_request = bq_storage_types.AppendRowsRequest(
493+
write_stream=stream.name,
494+
)
495+
base_request.arrow_rows.writer_schema.serialized_schema = serialized_schema
499496

500-
request.arrow_rows.writer_schema.serialized_schema = (
501-
serialized_schema
502-
)
503-
request.arrow_rows.rows.serialized_record_batch = (
504-
batch.serialize().to_pybytes()
505-
)
497+
stream_manager = bq_storage_writer.AppendRowsStream(
498+
client=self._write_client, initial_request_template=base_request
499+
)
500+
stream_name = stream.name
501+
current_offset = 0
502+
futures: list[bq_storage_writer.AppendRowsFuture] = []
503+
for batch in work:
504+
request = bq_storage_types.AppendRowsRequest(offset=current_offset)
505+
request.arrow_rows.rows.serialized_record_batch = (
506+
batch.serialize().to_pybytes()
507+
)
506508

507-
yield request
508-
current_offset += batch.num_rows
509+
futures.append(stream_manager.send(request))
510+
current_offset += batch.num_rows
511+
for future in futures:
512+
future.result()
509513

510-
responses = self._write_client.append_rows(requests=request_generator())
511-
for resp in responses:
512-
if resp.row_errors:
513-
raise ValueError(
514-
f"Errors in stream {stream_name}: {resp.row_errors}"
515-
)
514+
stream_manager.close()
516515
self._write_client.finalize_write_stream(name=stream_name)
517516
return stream_name
518517

0 commit comments

Comments
 (0)