Skip to content

Commit

Permalink
Merge pull request apache#1586 from criccomini/AIRFLOW-200
Browse files Browse the repository at this point in the history
  • Loading branch information
criccomini committed Jun 29, 2016
2 parents be6766a + dc84fde commit 7b382b4
Show file tree
Hide file tree
Showing 42 changed files with 258 additions and 204 deletions.
5 changes: 3 additions & 2 deletions airflow/contrib/example_dags/example_qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
# limitations under the License.

from airflow import DAG
from airflow.operators import DummyOperator, PythonOperator, BranchPythonOperator
from airflow.contrib.operators import QuboleOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.contrib.operators.qubole_operator import QuboleOperator
from datetime import datetime, timedelta
import filecmp
import random
Expand Down
3 changes: 2 additions & 1 deletion airflow/contrib/example_dags/example_twitter_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
# --------------------------------------------------------------------------------

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, date, timedelta

Expand Down
22 changes: 4 additions & 18 deletions airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#


# Contrib hooks are not imported by default. They should be accessed
# directly: from airflow.contrib.hooks.hook_module import Hook





import sys


# ------------------------------------------------------------------------
Expand All @@ -33,11 +31,6 @@
# for compatibility.
#
# ------------------------------------------------------------------------

# Imports the hooks dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_hooks = {
'ftp_hook': ['FTPHook'],
'ftps_hook': ['FTPSHook'],
Expand All @@ -54,12 +47,5 @@

import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _hooks)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'contrib.hooks' has been "
"deprecated. Please import from "
"'contrib.hooks.[hook_module]' instead. Support for direct imports "
"will be dropped entirely in Airflow 2.0.".format(i=_i))
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)
22 changes: 7 additions & 15 deletions airflow/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


# Contrib operators are not imported by default. They should be accessed
# directly: from airflow.contrib.operators.operator_module import Operator


import sys


# ------------------------------------------------------------------------
#
# #TODO #FIXME Airflow 2.0
Expand All @@ -27,27 +31,15 @@
# for compatibility.
#
# ------------------------------------------------------------------------

# Imports the operators dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_operators = {
'ssh_execute_operator': ['SSHExecuteOperator'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator'],
'fs': ['FileSensor']
'fs_operator': ['FileSensor']
}

import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _operators)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'contrib.operators' has been "
"deprecated. Please import from "
"'contrib.operators.[operator_module]' instead. Support for direct "
"imports will be dropped entirely in Airflow 2.0.".format(i=_i))
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _operators)
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_check_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.utils.decorators import apply_defaults


Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/qubole_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks import QuboleHook
from airflow.contrib.hooks.qubole_hook import QuboleHook


class QuboleOperator(BaseOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/vertica_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import logging

from airflow.contrib.hooks import VerticaHook
from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/vertica_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from tempfile import NamedTemporaryFile

from airflow.hooks.hive_hooks import HiveCliHook
from airflow.contrib.hooks import VerticaHook
from airflow.contrib.hooks.vertica_hook import VerticaHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down
3 changes: 2 additions & 1 deletion airflow/example_dags/example_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from builtins import range
from airflow.operators import BashOperator, DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

Expand Down
3 changes: 2 additions & 1 deletion airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import BranchPythonOperator, DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import random
Expand Down
3 changes: 2 additions & 1 deletion airflow/example_dags/example_branch_python_dop_operator_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.
#

from airflow.operators import BranchPythonOperator, DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_http_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
### Example HTTP operator and sensor
"""
from airflow import DAG
from airflow.operators import SimpleHttpOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG("example_passing_params_via_test_command",
default_args={"owner" : "airflow",
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from __future__ import print_function
from builtins import range
from airflow.operators import PythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta

Expand Down
3 changes: 2 additions & 1 deletion airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import ShortCircuitOperator, DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
import airflow.utils.helpers
from datetime import datetime, timedelta
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_skip_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from airflow.operators import DummyOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.exceptions import AirflowSkipException
Expand Down
3 changes: 2 additions & 1 deletion airflow/example_dags/example_subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from datetime import datetime

from airflow.models import DAG
from airflow.operators import DummyOperator, SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator

from airflow.example_dags.subdags.subdag import subdag

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"""

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

import pprint
Expand Down
3 changes: 2 additions & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators import BashOperator, PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime

Expand Down
6 changes: 3 additions & 3 deletions airflow/example_dags/example_xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def puller(**kwargs):
v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning'])
assert (v1, v2) == (value_1, value_2)

push1 = airflow.operators.PythonOperator(
push1 = airflow.operators.python_operator.PythonOperator(
task_id='push', dag=dag, python_callable=push)

push2 = airflow.operators.PythonOperator(
push2 = airflow.operators.python_operator.PythonOperator(
task_id='push_by_returning', dag=dag, python_callable=push_by_returning)

pull = airflow.operators.PythonOperator(
pull = airflow.operators.python_operator.PythonOperator(
task_id='puller', dag=dag, python_callable=puller)

pull.set_upstream([push1, push2])
2 changes: 1 addition & 1 deletion airflow/example_dags/subdags/subdag.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

from airflow.models import DAG
from airflow.operators import DummyOperator
from airflow.operators.dummy_operator import DummyOperator


def subdag(parent_dag_name, child_dag_name, args):
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Used for unit tests"""
from airflow.operators import BashOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
from airflow import DAG
from airflow.operators import BashOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
Expand Down
31 changes: 9 additions & 22 deletions airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import sys

# Only import Core Airflow Operators that don't have extra requirements.
# All other operators must be imported directly.
from .base_hook import BaseHook
from .dbapi_hook import DbApiHook
from .http_hook import HttpHook
from .sqlite_hook import SqliteHook

# ------------------------------------------------------------------------
#
Expand All @@ -34,10 +32,8 @@
# abstracting the underlying modules


from airflow.utils.helpers import import_module_attrs as _import_module_attrs
from airflow.hooks.base_hook import BaseHook # noqa to expose in package

_hooks = {
'base_hook': ['BaseHook'],
'hive_hooks': [
'HiveCliHook',
'HiveMetastoreHook',
Expand All @@ -50,29 +46,20 @@
'postgres_hook': ['PostgresHook'],
'presto_hook': ['PrestoHook'],
'samba_hook': ['SambaHook'],
# 'sqlite_hook': ['SqliteHook'],
'sqlite_hook': ['SqliteHook'],
'S3_hook': ['S3Hook'],
# 'http_hook': ['HttpHook'],
'http_hook': ['HttpHook'],
'druid_hook': ['DruidHook'],
'jdbc_hook': ['JdbcHook'],
'dbapi_hook': ['DbApiHook'],
'mssql_hook': ['MsSqlHook'],
'oracle_hook': ['OracleHook'],
}


import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _hooks)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'airflow.hooks' has been "
"deprecated. Please import from "
"'airflow.hooks.[hook_module]' instead. Support for direct imports "
"will be dropped entirely in Airflow 2.0.".format(i=_i))

from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)

def _integrate_plugins():
"""Integrate plugins to the context"""
Expand Down
Loading

0 comments on commit 7b382b4

Please sign in to comment.