Skip to content

Commit

Permalink
Migrate Influx example DAGs to new design apache#22449
Browse files Browse the repository at this point in the history
  • Loading branch information
chethanuk committed Jun 2, 2022
1 parent f294a26 commit 643feb6
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 30 deletions.
16 changes: 0 additions & 16 deletions airflow/providers/influxdb/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-influxdb/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/influxdb/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/influxdb>

.. toctree::
:maxdepth: 1
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-influxdb/operators/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ SQL commands in a `InfluxDB <https://www.influxdata.com/>`__ database.

An example of running the query using the operator:

.. exampleinclude:: /../../airflow/providers/influxdb/example_dags/example_influxdb_query.py
.. exampleinclude:: /../../tests/system/providers/influxdb/example_influxdb_query.py
:language: python
:start-after: [START howto_operator_influxdb]
:end-before: [END howto_operator_influxdb]
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import os
from datetime import datetime

from airflow.decorators import task
Expand Down Expand Up @@ -47,11 +48,25 @@ def test_influxdb_hook():
influxdb_hook.delete_bucket(bucket_name)


ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "influxdb_example_dag"

with DAG(
dag_id='influxdb_example_dag',
dag_id=DAG_ID,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
max_active_runs=1,
tags=['example'],
) as dag:
test_influxdb_hook()

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,35 @@
# specific language governing permissions and limitations
# under the License.

import os
from datetime import datetime

from airflow.models.dag import DAG
from airflow.providers.influxdb.operators.influxdb import InfluxDBOperator

dag = DAG(
'example_influxdb_operator',

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_influxdb_operator"

with DAG(
DAG_ID,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
)
) as dag:

# [START howto_operator_influxdb]

query_influxdb_task = InfluxDBOperator(
influxdb_conn_id='influxdb_conn_id',
task_id='query_influxdb',
sql='from(bucket:"test-influx") |> range(start: -10m, stop: {{ds}})',
dag=dag,
)

# [START howto_operator_influxdb]
# [END howto_operator_influxdb]

query_influxdb_task = InfluxDBOperator(
influxdb_conn_id='influxdb_conn_id',
task_id='query_influxdb',
sql='from(bucket:"test-influx") |> range(start: -10m, stop: {{ds}})',
dag=dag,
)
from tests.system.utils import get_test_run # noqa: E402

# [END howto_operator_influxdb]
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit 643feb6

Please sign in to comment.