Skip to content

Commit

Permalink
Pub/Sub: update how to test with mock [(#2555)](GoogleCloudPlatform/p…
Browse files Browse the repository at this point in the history
…ython-docs-samples#2555)

* Update test with mock
* Clean up resources after tests
* Use unique resource names avoid test failures
* Delete subscriptions in cleanup phase
* Ensure unique topic name
* Update assert to remove bytestring notation
* Rewrite PubSubToGCS test using dataflow testing module
  • Loading branch information
anguillanneuf authored and busunkim96 committed Dec 11, 2019
1 parent 6507cfc commit fe3f1e5
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 114 deletions.
88 changes: 50 additions & 38 deletions samples/snippets/PubSubToGCS.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,25 @@ def __init__(self, window_size):
self.window_size = int(window_size * 60)

def expand(self, pcoll):
return (pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| 'Window into Fixed Intervals' >> beam.WindowInto(
window.FixedWindows(self.window_size))
| 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps()))
# Use a dummy key to group the elements in the same window.
# Note that all the elements in one window must fit into memory
# for this. If the windowed elements do not fit into memory,
# please consider using `beam.util.BatchElements`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
| 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem))
| 'Groupby' >> beam.GroupByKey()
| 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val))
return (
pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| "Window into Fixed Intervals"
>> beam.WindowInto(window.FixedWindows(self.window_size))
| "Add timestamps to messages" >> beam.ParDo(AddTimestamps())
# Use a dummy key to group the elements in the same window.
# Note that all the elements in one window must fit into memory
# for this. If the windowed elements do not fit into memory,
# please consider using `beam.util.BatchElements`.
# https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
| "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
| "Groupby" >> beam.GroupByKey()
| "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val)
)


class AddTimestamps(beam.DoFn):

def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""Processes each incoming windowed element by extracting the Pub/Sub
message and its publish timestamp into a dictionary. `publish_time`
Expand All @@ -60,61 +61,72 @@ def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""

yield {
'message_body': element.decode('utf-8'),
'publish_time': datetime.datetime.utcfromtimestamp(
float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"),
"message_body": element.decode("utf-8"),
"publish_time": datetime.datetime.utcfromtimestamp(
float(publish_time)
).strftime("%Y-%m-%d %H:%M:%S.%f"),
}


class WriteBatchesToGCS(beam.DoFn):

def __init__(self, output_path):
self.output_path = output_path

def process(self, batch, window=beam.DoFn.WindowParam):
"""Write one batch per file to a Google Cloud Storage bucket. """

ts_format = '%H:%M'
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
filename = '-'.join([self.output_path, window_start, window_end])
filename = "-".join([self.output_path, window_start, window_end])

with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f:
with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for element in batch:
f.write('{}\n'.format(json.dumps(element)).encode('utf-8'))
f.write("{}\n".format(json.dumps(element)).encode("utf-8"))


def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
# `save_main_session` is set to true because some DoFn's rely on
# globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True)
pipeline_args, streaming=True, save_main_session=True
)

with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
| 'Window into' >> GroupWindowsIntoBatches(window_size)
| 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path)))
(
pipeline
| "Read PubSub Messages"
>> beam.io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupWindowsIntoBatches(window_size)
| "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path))
)


if __name__ == '__main__': # noqa
if __name__ == "__main__": # noqa
logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic',
help='The Cloud Pub/Sub topic to read from.\n'
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".')
"--input_topic",
help="The Cloud Pub/Sub topic to read from.\n"
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".',
)
parser.add_argument(
'--window_size',
"--window_size",
type=float,
default=1.0,
help='Output file\'s window size in number of minutes.')
help="Output file's window size in number of minutes.",
)
parser.add_argument(
'--output_path',
help='GCS Path of the output file including filename prefix.')
"--output_path",
help="GCS Path of the output file including filename prefix.",
)
known_args, pipeline_args = parser.parse_known_args()

run(known_args.input_topic, known_args.output_path, known_args.window_size,
pipeline_args)
run(
known_args.input_topic,
known_args.output_path,
known_args.window_size,
pipeline_args,
)
# [END pubsub_to_gcs]
116 changes: 41 additions & 75 deletions samples/snippets/PubSubToGCS_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,89 +12,55 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import multiprocessing as mp
import mock
import os
import pytest
import subprocess as sp
import tempfile
import time
import uuid

import apache_beam as beam
from google.cloud import pubsub_v1


PROJECT = os.environ['GCLOUD_PROJECT']
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
TOPIC = 'test-topic'
UUID = uuid.uuid4().hex


@pytest.fixture
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture
def topic_path(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.delete_topic(topic_path)
except Exception:
pass

response = publisher_client.create_topic(topic_path)
yield response.name


def _infinite_publish_job(publisher_client, topic_path):
while True:
future = publisher_client.publish(
topic_path, data='Hello World!'.encode('utf-8'))
future.result()
time.sleep(10)


def test_run(publisher_client, topic_path):
"""This is an integration test that runs `PubSubToGCS.py` in its entirety.
It checks for output files on GCS.
"""

# Use one process to publish messages to a topic.
publish_process = mp.Process(
target=lambda: _infinite_publish_job(publisher_client, topic_path))

# Use another process to run the streaming pipeline that should write one
# file to GCS every minute (according to the default window size).
pipeline_process = mp.Process(
target=lambda: sp.call([
'python', 'PubSubToGCS.py',
'--project', PROJECT,
'--runner', 'DirectRunner',
'--temp_location', tempfile.mkdtemp(),
'--input_topic', topic_path,
'--output_path', 'gs://{}/pubsub/{}/output'.format(BUCKET, UUID),
])
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.test_utils import TempDir
from apache_beam.transforms.window import TimestampedValue

import PubSubToGCS

PROJECT = os.environ["GCLOUD_PROJECT"]
BUCKET = os.environ["CLOUD_STORAGE_BUCKET"]
UUID = uuid.uuid1().hex


@mock.patch("apache_beam.Pipeline", TestPipeline)
@mock.patch(
"apache_beam.io.ReadFromPubSub",
lambda topic: (
TestStream()
.advance_watermark_to(0)
.advance_processing_time(30)
.add_elements([TimestampedValue(b"a", 1575937195)])
.advance_processing_time(30)
.add_elements([TimestampedValue(b"b", 1575937225)])
.advance_processing_time(30)
.add_elements([TimestampedValue(b"c", 1575937255)])
.advance_watermark_to_infinity()
),
)
def test_pubsub_to_gcs():
PubSubToGCS.run(
input_topic="unused", # mocked by TestStream
output_path="gs://{}/pubsub/{}/output".format(BUCKET, UUID),
window_size=1, # 1 minute
pipeline_args=[
"--project",
PROJECT,
"--temp_location",
TempDir().get_path(),
],
)

publish_process.start()
pipeline_process.start()

# Times out the streaming pipeline after 90 seconds.
pipeline_process.join(timeout=90)
# Immediately kills the publish process after the pipeline shuts down.
publish_process.join(timeout=0)

pipeline_process.terminate()
publish_process.terminate()

# Check for output files on GCS.
gcs_client = beam.io.gcp.gcsio.GcsIO()
# This returns a dictionary.
files = gcs_client.list_prefix('gs://{}/pubsub/{}'.format(BUCKET, UUID))
files = gcs_client.list_prefix("gs://{}/pubsub/{}".format(BUCKET, UUID))
assert len(files) > 0

# Clean up. Delete topic. Delete files.
publisher_client.delete_topic(topic_path)
# Clean up.
gcs_client.delete_batch(list(files))
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
apache-beam[gcp]==2.15.0
apache-beam[gcp,test]==2.16.0

0 comments on commit fe3f1e5

Please sign in to comment.