Skip to content

Commit

Permalink
Set gRPC message options and keepalive. (#4269)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukesneeringer authored Oct 30, 2017
1 parent b53731d commit 743ade4
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 0 deletions.
37 changes: 37 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
from __future__ import absolute_import

import copy
import os
import pkg_resources
import threading

import grpc
import six

from google.api_core import grpc_helpers
from google.cloud.gapic.pubsub.v1 import publisher_client

from google.cloud.pubsub_v1 import _gapic
Expand Down Expand Up @@ -53,6 +56,28 @@ class Client(object):
Generally, you should not need to set additional keyword arguments.
"""
def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
# Sanity check: Is our goal to use the emulator?
# If so, create a grpc insecure channel with the emulator host
# as the target.
if os.environ.get('PUBSUB_EMULATOR_HOST'):
kwargs['channel'] = grpc.insecure_channel(
target=os.environ.get('PUBSUB_EMULATOR_HOST'),
)

# Use a custom channel.
# We need this in order to set appropriate default message size and
# keepalive options.
if 'channel' not in kwargs:
kwargs['channel'] = grpc_helpers.create_channel(
credentials=kwargs.get('credentials', None),
target=self.target,
scopes=publisher_client.PublisherClient._ALL_SCOPES,
options={
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': -1,
}.items(),
)

# Add the metrics headers, and instantiate the underlying GAPIC
# client.
kwargs['lib_name'] = 'gccl'
Expand All @@ -66,6 +91,18 @@ def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
self._batch_lock = threading.Lock()
self._batches = {}

@property
def target(self):
"""Return the target (where the API is).
Returns:
str: The location of the API.
"""
return '{host}:{port}'.format(
host=publisher_client.PublisherClient.SERVICE_ADDRESS,
port=publisher_client.PublisherClient.DEFAULT_SERVICE_PORT,
)

def batch(self, topic, message, create=True, autocommit=True):
"""Return the current batch for the provided topic.
Expand Down
39 changes: 39 additions & 0 deletions pubsub/google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
from __future__ import absolute_import

import pkg_resources
import os

import grpc

from google.api_core import grpc_helpers
from google.cloud.gapic.pubsub.v1 import subscriber_client

from google.cloud.pubsub_v1 import _gapic
Expand Down Expand Up @@ -49,6 +53,29 @@ class in order to define your own consumer. This is primarily
arguments.
"""
def __init__(self, policy_class=thread.Policy, **kwargs):
# Sanity check: Is our goal to use the emulator?
# If so, create a grpc insecure channel with the emulator host
# as the target.
if os.environ.get('PUBSUB_EMULATOR_HOST'):
kwargs['channel'] = grpc.insecure_channel(
target=os.environ.get('PUBSUB_EMULATOR_HOST'),
)

# Use a custom channel.
# We need this in order to set appropriate default message size and
# keepalive options.
if 'channel' not in kwargs:
kwargs['channel'] = grpc_helpers.create_channel(
credentials=kwargs.get('credentials', None),
target=self.target,
scopes=subscriber_client.SubscriberClient._ALL_SCOPES,
options={
'grpc.max_send_message_length': -1,
'grpc.max_receive_message_length': -1,
'grpc.keepalive_time_ms': 30000,
}.items(),
)

# Add the metrics headers, and instantiate the underlying GAPIC
# client.
kwargs['lib_name'] = 'gccl'
Expand All @@ -59,6 +86,18 @@ def __init__(self, policy_class=thread.Policy, **kwargs):
# messages.
self._policy_class = policy_class

@property
def target(self):
"""Return the target (where the API is).
Returns:
str: The location of the API.
"""
return '{host}:{port}'.format(
host=subscriber_client.SubscriberClient.SERVICE_ADDRESS,
port=subscriber_client.SubscriberClient.DEFAULT_SERVICE_PORT,
)

def subscribe(self, subscription, callback=None, flow_control=()):
"""Return a representation of an individual subscription.
Expand Down
15 changes: 15 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
import os

import mock

import pytest
Expand All @@ -38,6 +41,18 @@ def test_init():
assert client.batch_settings.max_messages == 1000


def test_init_emulator(monkeypatch):
monkeypatch.setenv('PUBSUB_EMULATOR_HOST', '/foo/bar/')
client = create_client()

# Establish that a gRPC request would attempt to hit the emulator host.
#
# Sadly, there seems to be no good way to do this without poking at
# the private API of gRPC.
channel = client.api.publisher_stub.Publish._channel
assert channel.target().decode('utf8') == '/foo/bar/'


def test_batch_accepting():
"""Establish that an existing batch is returned if it accepts messages."""
client = create_client()
Expand Down
12 changes: 12 additions & 0 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ def test_init():
assert client._policy_class is thread.Policy


def test_init_emulator(monkeypatch):
monkeypatch.setenv('PUBSUB_EMULATOR_HOST', '/baz/bacon/')
client = create_client()

# Establish that a gRPC request would attempt to hit the emulator host.
#
# Sadly, there seems to be no good way to do this without poking at
# the private API of gRPC.
channel = client.api.subscriber_stub.Pull._channel
assert channel.target().decode('utf8') == '/baz/bacon/'


def test_subscribe():
client = create_client()
subscription = client.subscribe('sub_name_a')
Expand Down

0 comments on commit 743ade4

Please sign in to comment.