diff --git a/samples/snippets/PubSubToGCS.py b/samples/snippets/PubSubToGCS.py index f003e3362..a3c8179b2 100644 --- a/samples/snippets/PubSubToGCS.py +++ b/samples/snippets/PubSubToGCS.py @@ -34,24 +34,25 @@ def __init__(self, window_size): self.window_size = int(window_size * 60) def expand(self, pcoll): - return (pcoll - # Assigns window info to each Pub/Sub message based on its - # publish timestamp. - | 'Window into Fixed Intervals' >> beam.WindowInto( - window.FixedWindows(self.window_size)) - | 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps())) - # Use a dummy key to group the elements in the same window. - # Note that all the elements in one window must fit into memory - # for this. If the windowed elements do not fit into memory, - # please consider using `beam.util.BatchElements`. - # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements - | 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem)) - | 'Groupby' >> beam.GroupByKey() - | 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val)) + return ( + pcoll + # Assigns window info to each Pub/Sub message based on its + # publish timestamp. + | "Window into Fixed Intervals" + >> beam.WindowInto(window.FixedWindows(self.window_size)) + | "Add timestamps to messages" >> beam.ParDo(AddTimestamps()) + # Use a dummy key to group the elements in the same window. + # Note that all the elements in one window must fit into memory + # for this. If the windowed elements do not fit into memory, + # please consider using `beam.util.BatchElements`. + # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements + | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem)) + | "Groupby" >> beam.GroupByKey() + | "Abandon Dummy Key" >> beam.MapTuple(lambda _, val: val) + ) class AddTimestamps(beam.DoFn): - def process(self, element, publish_time=beam.DoFn.TimestampParam): """Processes each incoming windowed element by extracting the Pub/Sub message and its publish timestamp into a dictionary. `publish_time` @@ -60,61 +61,72 @@ def process(self, element, publish_time=beam.DoFn.TimestampParam): """ yield { - 'message_body': element.decode('utf-8'), - 'publish_time': datetime.datetime.utcfromtimestamp( - float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"), + "message_body": element.decode("utf-8"), + "publish_time": datetime.datetime.utcfromtimestamp( + float(publish_time) + ).strftime("%Y-%m-%d %H:%M:%S.%f"), } class WriteBatchesToGCS(beam.DoFn): - def __init__(self, output_path): self.output_path = output_path def process(self, batch, window=beam.DoFn.WindowParam): """Write one batch per file to a Google Cloud Storage bucket. """ - ts_format = '%H:%M' + ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) - filename = '-'.join([self.output_path, window_start, window_end]) + filename = "-".join([self.output_path, window_start, window_end]) - with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f: + with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode="w") as f: for element in batch: - f.write('{}\n'.format(json.dumps(element)).encode('utf-8')) + f.write("{}\n".format(json.dumps(element)).encode("utf-8")) def run(input_topic, output_path, window_size=1.0, pipeline_args=None): # `save_main_session` is set to true because some DoFn's rely on # globally imported modules. pipeline_options = PipelineOptions( - pipeline_args, streaming=True, save_main_session=True) + pipeline_args, streaming=True, save_main_session=True + ) with beam.Pipeline(options=pipeline_options) as pipeline: - (pipeline - | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic) - | 'Window into' >> GroupWindowsIntoBatches(window_size) - | 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path))) + ( + pipeline + | "Read PubSub Messages" + >> beam.io.ReadFromPubSub(topic=input_topic) + | "Window into" >> GroupWindowsIntoBatches(window_size) + | "Write to GCS" >> beam.ParDo(WriteBatchesToGCS(output_path)) + ) -if __name__ == '__main__': # noqa +if __name__ == "__main__": # noqa logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( - '--input_topic', - help='The Cloud Pub/Sub topic to read from.\n' - '"projects//topics/".') + "--input_topic", + help="The Cloud Pub/Sub topic to read from.\n" + '"projects//topics/".', + ) parser.add_argument( - '--window_size', + "--window_size", type=float, default=1.0, - help='Output file\'s window size in number of minutes.') + help="Output file's window size in number of minutes.", + ) parser.add_argument( - '--output_path', - help='GCS Path of the output file including filename prefix.') + "--output_path", + help="GCS Path of the output file including filename prefix.", + ) known_args, pipeline_args = parser.parse_known_args() - run(known_args.input_topic, known_args.output_path, known_args.window_size, - pipeline_args) + run( + known_args.input_topic, + known_args.output_path, + known_args.window_size, + pipeline_args, + ) # [END pubsub_to_gcs] diff --git a/samples/snippets/PubSubToGCS_test.py b/samples/snippets/PubSubToGCS_test.py index 644cf0865..d39fb5681 100644 --- a/samples/snippets/PubSubToGCS_test.py +++ b/samples/snippets/PubSubToGCS_test.py @@ -12,89 +12,55 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing as mp +import mock import os -import pytest -import subprocess as sp -import tempfile -import time import uuid import apache_beam as beam -from google.cloud import pubsub_v1 - - -PROJECT = os.environ['GCLOUD_PROJECT'] -BUCKET = os.environ['CLOUD_STORAGE_BUCKET'] -TOPIC = 'test-topic' -UUID = uuid.uuid4().hex - - -@pytest.fixture -def publisher_client(): - yield pubsub_v1.PublisherClient() - - -@pytest.fixture -def topic_path(publisher_client): - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - - try: - publisher_client.delete_topic(topic_path) - except Exception: - pass - - response = publisher_client.create_topic(topic_path) - yield response.name - - -def _infinite_publish_job(publisher_client, topic_path): - while True: - future = publisher_client.publish( - topic_path, data='Hello World!'.encode('utf-8')) - future.result() - time.sleep(10) - - -def test_run(publisher_client, topic_path): - """This is an integration test that runs `PubSubToGCS.py` in its entirety. - It checks for output files on GCS. - """ - - # Use one process to publish messages to a topic. - publish_process = mp.Process( - target=lambda: _infinite_publish_job(publisher_client, topic_path)) - - # Use another process to run the streaming pipeline that should write one - # file to GCS every minute (according to the default window size). - pipeline_process = mp.Process( - target=lambda: sp.call([ - 'python', 'PubSubToGCS.py', - '--project', PROJECT, - '--runner', 'DirectRunner', - '--temp_location', tempfile.mkdtemp(), - '--input_topic', topic_path, - '--output_path', 'gs://{}/pubsub/{}/output'.format(BUCKET, UUID), - ]) +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_stream import TestStream +from apache_beam.testing.test_utils import TempDir +from apache_beam.transforms.window import TimestampedValue + +import PubSubToGCS + +PROJECT = os.environ["GCLOUD_PROJECT"] +BUCKET = os.environ["CLOUD_STORAGE_BUCKET"] +UUID = uuid.uuid1().hex + + +@mock.patch("apache_beam.Pipeline", TestPipeline) +@mock.patch( + "apache_beam.io.ReadFromPubSub", + lambda topic: ( + TestStream() + .advance_watermark_to(0) + .advance_processing_time(30) + .add_elements([TimestampedValue(b"a", 1575937195)]) + .advance_processing_time(30) + .add_elements([TimestampedValue(b"b", 1575937225)]) + .advance_processing_time(30) + .add_elements([TimestampedValue(b"c", 1575937255)]) + .advance_watermark_to_infinity() + ), +) +def test_pubsub_to_gcs(): + PubSubToGCS.run( + input_topic="unused", # mocked by TestStream + output_path="gs://{}/pubsub/{}/output".format(BUCKET, UUID), + window_size=1, # 1 minute + pipeline_args=[ + "--project", + PROJECT, + "--temp_location", + TempDir().get_path(), + ], ) - publish_process.start() - pipeline_process.start() - - # Times out the streaming pipeline after 90 seconds. - pipeline_process.join(timeout=90) - # Immediately kills the publish process after the pipeline shuts down. - publish_process.join(timeout=0) - - pipeline_process.terminate() - publish_process.terminate() - # Check for output files on GCS. gcs_client = beam.io.gcp.gcsio.GcsIO() - # This returns a dictionary. - files = gcs_client.list_prefix('gs://{}/pubsub/{}'.format(BUCKET, UUID)) + files = gcs_client.list_prefix("gs://{}/pubsub/{}".format(BUCKET, UUID)) assert len(files) > 0 - # Clean up. Delete topic. Delete files. - publisher_client.delete_topic(topic_path) + # Clean up. gcs_client.delete_batch(list(files)) diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index d1ce1adeb..7ffca5118 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1 +1 @@ -apache-beam[gcp]==2.15.0 +apache-beam[gcp,test]==2.16.0