Skip to content

Commit e2b3376

Browse files
authored
fix: separate create_stream() in pyarrow sample (#946)
* fix: separate create_stream() in pyarrow sample * remove append_rows() * lint * add blank row
1 parent 95bace7 commit e2b3376

File tree

1 file changed

+23
-16
lines changed

1 file changed

+23
-16
lines changed

bigquery_storage/pyarrow/append_rows_with_arrow.py

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,6 @@ def generate_write_requests(pyarrow_table):
174174
yield request
175175

176176

177-
def append_rows(bqstorage_write_client, table):
178-
append_rows_stream = create_stream(bqstorage_write_client, table)
179-
pyarrow_table = generate_pyarrow_table()
180-
futures = []
181-
182-
for request in generate_write_requests(pyarrow_table):
183-
response_future = append_rows_stream.send(request)
184-
futures.append(response_future)
185-
response_future.result()
186-
187-
return futures
188-
189-
190177
def verify_result(client, table, futures):
191178
bq_table = client.get_table(table)
192179

@@ -196,6 +183,7 @@ def verify_result(client, table, futures):
196183
# Verify table size.
197184
query = client.query(f"SELECT COUNT(1) FROM `{bq_table}`;")
198185
query_result = query.result().to_dataframe()
186+
199187
# There might be extra rows due to retries.
200188
assert query_result.iloc[0, 0] >= TABLE_LENGTH
201189

@@ -204,9 +192,28 @@ def verify_result(client, table, futures):
204192

205193

206194
def main(project_id, dataset):
195+
# Initialize clients.
207196
write_client = bqstorage_write_client()
208197
bq_client = bigquery.Client()
209-
table = make_table(project_id, dataset.dataset_id, bq_client)
210198

211-
futures = append_rows(write_client, table)
212-
verify_result(bq_client, table, futures)
199+
# Create BigQuery table.
200+
bq_table = make_table(project_id, dataset.dataset_id, bq_client)
201+
202+
# Generate local PyArrow table.
203+
pa_table = generate_pyarrow_table()
204+
205+
# Convert PyArrow table to Protobuf requests.
206+
requests = generate_write_requests(pa_table)
207+
208+
# Create writing stream to the BigQuery table.
209+
stream = create_stream(write_client, bq_table)
210+
211+
# Send requests.
212+
futures = []
213+
for request in requests:
214+
future = stream.send(request)
215+
futures.append(future)
216+
future.result() # Optional, will block until writing is complete.
217+
218+
# Verify results.
219+
verify_result(bq_client, bq_table, futures)

0 commit comments

Comments
 (0)