Skip to content

Commit 8a0c08b

Browse files
authored
Support configuring flush_count and max_row_bytes of WriteToBigTable (#34761)
1 parent 893beb1 commit 8a0c08b

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

sdks/python/apache_beam/io/gcp/bigtableio.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,14 @@
5555
from apache_beam.typehints.row_type import RowTypeConstraint
5656

5757
_LOGGER = logging.getLogger(__name__)
58+
FLUSH_COUNT = 1000
59+
MAX_ROW_BYTES = 5242880 # 5MB
5860

5961
try:
6062
from google.cloud.bigtable import Client
6163
from google.cloud.bigtable.row import Cell, PartialRowData
6264
from google.cloud.bigtable.batcher import MutationsBatcher
6365

64-
FLUSH_COUNT = 1000
65-
MAX_ROW_BYTES = 5242880 # 5MB
66-
6766
except ImportError:
6867
_LOGGER.warning(
6968
'ImportError: from google.cloud.bigtable import Client', exc_info=True)
@@ -78,20 +77,27 @@ class _BigTableWriteFn(beam.DoFn):
7877
project_id(str): GCP Project ID
7978
instance_id(str): GCP Instance ID
8079
table_id(str): GCP Table ID
80+
flush_count(int): Max number of rows to flush
81+
max_row_bytes(int) Max number of row mutations size to flush
8182
8283
"""
83-
def __init__(self, project_id, instance_id, table_id):
84+
def __init__(
85+
self, project_id, instance_id, table_id, flush_count, max_row_bytes):
8486
""" Constructor of the Write connector of Bigtable
8587
Args:
8688
project_id(str): GCP Project of to write the Rows
8789
instance_id(str): GCP Instance to write the Rows
8890
table_id(str): GCP Table to write the `DirectRows`
91+
flush_count(int): Max number of rows to flush
92+
max_row_bytes(int) Max number of row mutations size to flush
8993
"""
9094
super().__init__()
9195
self.beam_options = {
9296
'project_id': project_id,
9397
'instance_id': instance_id,
94-
'table_id': table_id
98+
'table_id': table_id,
99+
'flush_count': flush_count,
100+
'max_row_bytes': max_row_bytes,
95101
}
96102
self.table = None
97103
self.batcher = None
@@ -144,8 +150,8 @@ def start_bundle(self):
144150
self.batcher = MutationsBatcher(
145151
self.table,
146152
batch_completed_callback=self.write_mutate_metrics,
147-
flush_count=FLUSH_COUNT,
148-
max_row_bytes=MAX_ROW_BYTES)
153+
flush_count=self.beam_options['flush_count'],
154+
max_row_bytes=self.beam_options['max_row_bytes'])
149155

150156
def process(self, row):
151157
self.written.inc()
@@ -200,7 +206,10 @@ def __init__(
200206
instance_id,
201207
table_id,
202208
use_cross_language=False,
203-
expansion_service=None):
209+
expansion_service=None,
210+
flush_count=FLUSH_COUNT,
211+
max_row_bytes=MAX_ROW_BYTES,
212+
):
204213
"""Initialize an WriteToBigTable transform.
205214
206215
:param table_id:
@@ -215,6 +224,12 @@ def __init__(
215224
The address of the expansion service in the case of using cross-language.
216225
If no expansion service is provided, will attempt to run the default GCP
217226
expansion service.
227+
:type flush_count: int
228+
:param flush_count: (Optional) Max number of rows to flush.
229+
Default is FLUSH_COUNT (1000 rows).
230+
:type max_row_bytes: int
231+
:param max_row_bytes: (Optional) Max number of row mutations size to flush.
232+
Default is MAX_ROW_BYTES (5 MB).
218233
"""
219234
super().__init__()
220235
self._table_id = table_id
@@ -229,6 +244,9 @@ def __init__(
229244
SchemaAwareExternalTransform.discover_config(
230245
self._expansion_service, self.URN))
231246

247+
self._flush_count = flush_count
248+
self._max_row_bytes = max_row_bytes
249+
232250
def expand(self, input):
233251
if self._use_cross_language:
234252
external_write = SchemaAwareExternalTransform(
@@ -250,7 +268,11 @@ def expand(self, input):
250268
input
251269
| beam.ParDo(
252270
_BigTableWriteFn(
253-
self._project_id, self._instance_id, self._table_id)))
271+
self._project_id,
272+
self._instance_id,
273+
self._table_id,
274+
flush_count=self._flush_count,
275+
max_row_bytes=self._max_row_bytes)))
254276

255277
class _DirectRowMutationsToBeamRow(beam.DoFn):
256278
def process(self, direct_row):

sdks/python/apache_beam/io/gcp/bigtableio_test.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,11 @@ def test_write(self):
288288
def test_write_metrics(self):
289289
MetricsEnvironment.process_wide_container().reset()
290290
write_fn = bigtableio._BigTableWriteFn(
291-
self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
291+
self._PROJECT_ID,
292+
self._INSTANCE_ID,
293+
self._TABLE_ID,
294+
flush_count=1000,
295+
max_row_bytes=5242880)
292296
write_fn.table = self.table
293297
write_fn.start_bundle()
294298
number_of_rows = 2
@@ -367,6 +371,24 @@ def verify_write_call_metric(
367371
self.assertTrue(
368372
found, "Did not find write call metric with status: %s" % status)
369373

374+
def test_custom_flush_config(self):
375+
direct_rows = [self.generate_row(0)]
376+
with patch.object(
377+
MutationsBatcher, '__init__', return_value=None) as mock_init, \
378+
patch.object(MutationsBatcher, 'mutate'), \
379+
patch.object(MutationsBatcher, 'close'), TestPipeline() as p:
380+
_ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable(
381+
self._PROJECT_ID,
382+
self._INSTANCE_ID,
383+
self._TABLE_ID,
384+
flush_count=1001,
385+
max_row_bytes=5000001)
386+
387+
mock_init.assert_called_once()
388+
call_args = mock_init.call_args.kwargs
389+
assert call_args['flush_count'] == 1001
390+
assert call_args['max_row_bytes'] == 5000001
391+
370392

371393
if __name__ == '__main__':
372394
unittest.main()

0 commit comments

Comments
 (0)