Skip to content

Commit abcb13a

Browse files
Merge pull request GoogleCloudPlatform#1 from akvelon/dev-t1
add python code
2 parents b3cd3e2 + dc0f37b commit abcb13a

File tree

6 files changed

+290
-0
lines changed

6 files changed

+290
-0
lines changed
1.61 KB
Binary file not shown.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright 2023 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
""" This Cloud Function example creates Pub/Sub messages.
15+
16+
Usage: Replace <PROJECT_ID> with the project ID of your project.
17+
"""
18+
19+
from google.cloud import pubsub_v1
20+
21+
project = "<PROJECT_ID>"
22+
topic = "dag-topic-trigger"
23+
24+
25+
def pubsub_publisher(request):
26+
"""Publish message from HTTP request to Pub/Sub topic.
27+
Args:
28+
request (flask.Request): HTTP request object.
29+
Returns:
30+
The response text with message published into Pub/Sub topic
31+
Response object using
32+
`make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
33+
"""
34+
request_json = request.get_json()
35+
print(request_json)
36+
if request.args and 'message' in request.args:
37+
data_str = request.args.get('message')
38+
elif request_json and 'message' in request_json:
39+
data_str = request_json['message']
40+
else:
41+
return "Message content not found! Use 'message' key to specify"
42+
43+
publisher = pubsub_v1.PublisherClient()
44+
# The `topic_path` method creates a fully qualified identifier
45+
# in the form `projects/{project_id}/topics/{topic_id}`
46+
topic_path = publisher.topic_path(project, topic)
47+
48+
# The required data format is a bytestring
49+
data = data_str.encode("utf-8")
50+
# When you publish a message, the client returns a future.
51+
message_length = len(data_str)
52+
future = publisher.publish(topic_path,
53+
data,
54+
message_length=str(message_length))
55+
print(future.result())
56+
57+
return f"Message {data} with message_length {message_length} published to {topic_path}."
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
# Copyright 2023 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import mock
16+
import pytest
17+
18+
import pubsub_publisher
19+
20+
21+
@pytest.fixture()
22+
def dump_request_args():
23+
class Request:
24+
args = {"message": "test with args"}
25+
26+
def get_json(self):
27+
return self.args
28+
29+
return Request()
30+
31+
32+
@pytest.fixture()
33+
def dump_request():
34+
class Request:
35+
args = None
36+
37+
def get_json(self):
38+
return {"message": "test with no args"}
39+
40+
return Request()
41+
42+
43+
@pytest.fixture()
44+
def dump_request_no_message():
45+
class Request:
46+
args = None
47+
48+
def get_json(self):
49+
return {"no_message": "test with no message key"}
50+
51+
return Request()
52+
53+
54+
# Pass None, an input that is not valid request
55+
def test_request_with_none():
56+
request = None
57+
with pytest.raises(Exception):
58+
pubsub_publisher.pubsub_publisher(request)
59+
60+
61+
def test_content_not_found(dump_request_no_message):
62+
output = "Message content not found! Use 'message' key to specify"
63+
assert pubsub_publisher.pubsub_publisher(dump_request_no_message) == output, f"The function didn't return '{output}'"
64+
65+
66+
@mock.patch("pubsub_publisher.pubsub_v1.PublisherClient.publish")
67+
@mock.patch("pubsub_publisher.pubsub_v1.PublisherClient.topic_path")
68+
def test_topic_path_args(topic_path, _, dump_request_args):
69+
pubsub_publisher.pubsub_publisher(dump_request_args)
70+
71+
topic_path.assert_called_once_with(
72+
"<PROJECT_ID>",
73+
"dag-topic-trigger",
74+
)
75+
76+
77+
@mock.patch("pubsub_publisher.pubsub_v1.PublisherClient.publish")
78+
def test_publish_args(publish, dump_request_args):
79+
pubsub_publisher.pubsub_publisher(dump_request_args)
80+
81+
publish.assert_called_once_with(
82+
"projects/<PROJECT_ID>/topics/dag-topic-trigger",
83+
dump_request_args.args.get("message").encode("utf-8"),
84+
message_length=str(len(dump_request_args.args.get("message"))),
85+
)
86+
87+
88+
@mock.patch("pubsub_publisher.pubsub_v1.PublisherClient.publish")
89+
@mock.patch("pubsub_publisher.pubsub_v1.PublisherClient.topic_path")
90+
def test_topic_path(topic_path, _, dump_request):
91+
pubsub_publisher.pubsub_publisher(dump_request)
92+
93+
topic_path.assert_called_once_with(
94+
"<PROJECT_ID>",
95+
"dag-topic-trigger",
96+
)
97+
98+
99+
@mock.patch("pubsub_publisher.pubsub_v1.PublisherClient.publish")
100+
def test_publish(publish, dump_request):
101+
pubsub_publisher.pubsub_publisher(dump_request)
102+
103+
publish.assert_called_once_with(
104+
"projects/<PROJECT_ID>/topics/dag-topic-trigger",
105+
dump_request.get_json().get("message").encode("utf-8"),
106+
message_length=str(len(dump_request.get_json().get("message"))),
107+
)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
requests-toolbelt==0.10.0
22
google-auth==2.6.2
3+
google-cloud-pubsub==2.13.11
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright 2023 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
""" Two Airflow DAGs that demonstrate the mechanism of triggering DAGs with Pub/Sub messages
15+
16+
Usage: Replace <PROJECT_ID> with the project ID of your project
17+
"""
18+
19+
from __future__ import annotations
20+
21+
from datetime import datetime
22+
import time
23+
24+
from airflow import DAG
25+
from airflow import XComArg
26+
from airflow.operators.python import PythonOperator
27+
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
28+
from airflow.providers.google.cloud.operators.pubsub import (
29+
PubSubCreateSubscriptionOperator,
30+
PubSubPullOperator,
31+
)
32+
33+
PROJECT_ID = "<PROJECT_ID>"
34+
TOPIC_ID = "dag-topic-trigger"
35+
SUBSCRIPTION = "trigger_dag_subscription"
36+
37+
38+
def handle_messages(pulled_messages, context):
39+
dag_ids = list()
40+
for idx, m in enumerate(pulled_messages):
41+
data = m.message.data.decode('utf-8')
42+
print(f'message {idx} data is {data}')
43+
dag_ids.append(data)
44+
return dag_ids
45+
46+
47+
# This DAG will run minutely and handle pub/sub messages by triggering target DAG
48+
with DAG('trigger_dag',
49+
start_date=datetime(2021, 1, 1),
50+
schedule_interval="* * * * *",
51+
max_active_runs=1,
52+
catchup=False) as trigger_dag:
53+
54+
# If subscription exists, we will use it. If not - create new one
55+
subscribe_task = PubSubCreateSubscriptionOperator(task_id="subscribe_task",
56+
project_id=PROJECT_ID,
57+
topic=TOPIC_ID,
58+
subscription=SUBSCRIPTION)
59+
60+
subscription = subscribe_task.output
61+
62+
# Proceed maximum 50 messages in callback function handle_messages
63+
# Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
64+
# https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
65+
pull_messages_operator = PubSubPullOperator(
66+
task_id="pull_messages_operator",
67+
project_id=PROJECT_ID,
68+
ack_messages=True,
69+
messages_callback=handle_messages,
70+
subscription=subscription,
71+
max_messages=50,
72+
)
73+
74+
# Here we use Dynamic Task Mapping to trigger DAGs according to messages content
75+
# https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
76+
trigger_target_dag = TriggerDagRunOperator\
77+
.partial(task_id='trigger_target')\
78+
.expand(trigger_dag_id=XComArg(pull_messages_operator))
79+
80+
(subscribe_task >> pull_messages_operator >> trigger_target_dag)
81+
82+
83+
def _some_heavy_task():
84+
print('Do some operation...')
85+
time.sleep(1)
86+
print('Done!')
87+
88+
89+
# Simple target DAG
90+
with DAG(
91+
'target_dag',
92+
start_date=datetime(2022, 1, 1),
93+
# Not scheduled, trigger only
94+
schedule_interval=None,
95+
catchup=False) as target_dag:
96+
97+
some_heavy_task = PythonOperator(task_id='some_heavy_task',
98+
python_callable=_some_heavy_task)
99+
100+
(some_heavy_task)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import internal_unit_testing
16+
17+
18+
def test_dag_import():
19+
"""Test that the DAG file can be successfully imported.
20+
This tests that the DAG can be parsed, but does not run it in an Airflow
21+
environment. This is a recommended confidence check by the official Airflow
22+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
23+
"""
24+
from . import pubsub_trigger_response_dag as module
25+
internal_unit_testing.assert_has_valid_dag(module)

0 commit comments

Comments
 (0)