@@ -444,20 +444,19 @@ class Batch(object):
444
444
def __init__ (self , topic , client , max_interval = _INFINITY ,
445
445
max_messages = _INFINITY , max_size = 1024 * 1024 * 9 ):
446
446
self .topic = topic
447
+ self .client = client
447
448
self .messages = []
448
449
self .message_ids = []
449
- self .client = client
450
450
451
451
# Set the autocommit rules. If the interval or number of messages
452
452
# is exceeded, then the .publish() method will imply a commit.
453
453
self ._max_interval = max_interval
454
454
self ._max_messages = max_messages
455
455
self ._max_size = max_size
456
456
457
- # Set the initial starting timestamp (used against the interval)
458
- # and initial size.
459
- self ._start_timestamp = time .time ()
460
- self ._current_size = 0
457
+ # Set up the initial state, initializing messages, the starting
458
+ # timestamp, etc.
459
+ self ._reset_state ()
461
460
462
461
def __enter__ (self ):
463
462
return self
@@ -469,6 +468,13 @@ def __exit__(self, exc_type, exc_val, exc_tb):
469
468
def __iter__ (self ):
470
469
return iter (self .message_ids )
471
470
471
+ def _reset_state (self ):
472
+ """Reset the state of this batch."""
473
+
474
+ del self .messages [:]
475
+ self ._start_timestamp = time .time ()
476
+ self ._current_size = 0
477
+
472
478
def publish (self , message , ** attrs ):
473
479
"""Emulate publishing a message, but save it.
474
480
@@ -526,6 +532,4 @@ def commit(self, client=None):
526
532
api = client .publisher_api
527
533
message_ids = api .topic_publish (self .topic .full_name , self .messages [:])
528
534
self .message_ids .extend (message_ids )
529
- del self .messages [:]
530
- self ._start_timestamp = time .time ()
531
- self ._current_size = 0
535
+ self ._reset_state ()
0 commit comments