Skip to content

Conversation

@lukesneeringer
Copy link
Contributor

This PR adds some functionality to the Batch object:

  • The ability to specify max_messages and have the batch
    automatically call commit when the number of messages
    gets that high.
  • The ability to specify max_interval and have the batch
    automatically commit when a publish occurs and the batch
    is at least as old as the specified interval.

This is one of two changes requested by the PubSub team.

This PR adds some functionality to the Batch object:

  * The ability to specify `max_messages` and have the batch
    automatically call `commit` when the number of messages
    gets that high.
  * The ability to specify `max_interval` and have the batch
    automatically commit when a publish occurs and the batch
    is at least as old as the specified interval.

This is one of two changes requested by the PubSub team.
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Jan 26, 2017
def __init__(self, topic, client):
INFINITY = float('inf')

def __init__(self, topic, client, max_interval=INFINITY,

This comment was marked as spam.

This comment was marked as spam.

Copy link
Contributor

@dhermes dhermes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LG, ping me once addressed / we discuss?

:type max_messages: float
"""
def __init__(self, topic, client):
INFINITY = float('inf')

This comment was marked as spam.

self._max_messages = max_messages

# Set the initial starting timestamp (used against the interval).
self._start_timestamp = float(time.time())

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

:type attrs: dict (string -> string)
:param attrs: key-value pairs to send as message attributes
:rtype: None

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

if self._max_interval < self.INFINITY:
if float(time.time()) - self._start_timestamp > self._max_interval:
self._start_timestamp = float(time.time())
return self.commit()

This comment was marked as spam.

This comment was marked as spam.

# If the number of messages on the list is greater than the
# maximum allowed, autocommit (with the batch's client).
if len(self.messages) >= self._max_messages:
return self.commit()

This comment was marked as spam.

This comment was marked as spam.

def test_message_count_autocommit(self):
"""Establish that if the batch is assigned to take a maximum
number of messages, that it commits when it reaches that maximum.
"""

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

topic = _Topic(name='TOPIC')

# Track commits, but do not perform them.
Batch = self._get_target_class()

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

for i in range(0, 4):
batch.publish({
'attributes': {},
'data': 'Batch message %d.' % i,

This comment was marked as spam.

Batch = self._get_target_class()
with mock.patch.object(Batch, 'commit') as commit:
with self._make_one(topic, client=client, max_messages=5) as batch:
self.assertIsInstance(batch, self._get_target_class())

This comment was marked as spam.

This comment was marked as spam.

# Track commits, but do not perform them.
Batch = self._get_target_class()
with mock.patch.object(Batch, 'commit') as commit:
mock_time.return_value = 0.0

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


# If too much time has elapsed since the first message
# was added, autocommit.
if self._max_interval < self.INFINITY:

This comment was marked as spam.


# If the number of messages on the list is greater than the
# maximum allowed, autocommit (with the batch's client).
if len(self.messages) >= self._max_messages:

This comment was marked as spam.

# maximum allowed, autocommit (with the batch's client).
if len(self.messages) >= self._max_messages:
self.commit()
return

This comment was marked as spam.

topic = _Topic(name='TOPIC')

# Track commits, but do not perform them.
Batch = self._get_target_class()

This comment was marked as spam.

# Track commits, but do not perform them.
Batch = self._get_target_class()
with mock.patch.object(Batch, 'commit') as commit:
mock_time.return_value = 0.0

This comment was marked as spam.

Copy link
Contributor

@dhermes dhermes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM once CI goes green

@lukesneeringer lukesneeringer merged commit 50c8e88 into googleapis:master Jan 27, 2017
@lukesneeringer lukesneeringer deleted the pubsub-batching branch January 27, 2017 19:11
richkadel pushed a commit to richkadel/google-cloud-python that referenced this pull request May 6, 2017
* Pubsub batch autocommitting.

This PR adds some functionality to the Batch object:

  * The ability to specify `max_messages` and have the batch
    automatically call `commit` when the number of messages
    gets that high.
  * The ability to specify `max_interval` and have the batch
    automatically commit when a publish occurs and the batch
    is at least as old as the specified interval.

This is one of two changes requested by the PubSub team.

* Addressing comments from @dhermes.

* Remove unneeded -lt check @dhermes.

* Make INFINITY have a leading underscore. @dhermes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants