Skip to content

Commit

Permalink
[AIRFLOW-200] Make hook/operator imports lazy, and print proper excep…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
criccomini committed Jun 27, 2016
1 parent d15f8ca commit dce633e
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 133 deletions.
22 changes: 4 additions & 18 deletions airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#


# Contrib hooks are not imported by default. They should be accessed
# directly: from airflow.contrib.hooks.hook_module import Hook





import sys


# ------------------------------------------------------------------------
Expand All @@ -33,11 +31,6 @@
# for compatibility.
#
# ------------------------------------------------------------------------

# Imports the hooks dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_hooks = {
'ftp_hook': ['FTPHook'],
'ftps_hook': ['FTPSHook'],
Expand All @@ -54,12 +47,5 @@

import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _hooks)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'contrib.hooks' has been "
"deprecated. Please import from "
"'contrib.hooks.[hook_module]' instead. Support for direct imports "
"will be dropped entirely in Airflow 2.0.".format(i=_i))
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)
22 changes: 7 additions & 15 deletions airflow/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


# Contrib operators are not imported by default. They should be accessed
# directly: from airflow.contrib.operators.operator_module import Operator


import sys


# ------------------------------------------------------------------------
#
# #TODO #FIXME Airflow 2.0
Expand All @@ -27,27 +31,15 @@
# for compatibility.
#
# ------------------------------------------------------------------------

# Imports the operators dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_operators = {
'ssh_execute_operator': ['SSHExecuteOperator'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator'],
'fs': ['FileSensor']
'fs_operator': ['FileSensor']
}

import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _operators)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'contrib.operators' has been "
"deprecated. Please import from "
"'contrib.operators.[operator_module]' instead. Support for direct "
"imports will be dropped entirely in Airflow 2.0.".format(i=_i))
from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _operators)
31 changes: 9 additions & 22 deletions airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import sys

# Only import Core Airflow Operators that don't have extra requirements.
# All other operators must be imported directly.
from .base_hook import BaseHook
from .dbapi_hook import DbApiHook
from .http_hook import HttpHook
from .sqlite_hook import SqliteHook

# ------------------------------------------------------------------------
#
Expand All @@ -34,10 +32,8 @@
# abstracting the underlying modules


from airflow.utils.helpers import import_module_attrs as _import_module_attrs
from airflow.hooks.base_hook import BaseHook # noqa to expose in package

_hooks = {
'base_hook': ['BaseHook'],
'hive_hooks': [
'HiveCliHook',
'HiveMetastoreHook',
Expand All @@ -50,29 +46,20 @@
'postgres_hook': ['PostgresHook'],
'presto_hook': ['PrestoHook'],
'samba_hook': ['SambaHook'],
# 'sqlite_hook': ['SqliteHook'],
'sqlite_hook': ['SqliteHook'],
'S3_hook': ['S3Hook'],
# 'http_hook': ['HttpHook'],
'http_hook': ['HttpHook'],
'druid_hook': ['DruidHook'],
'jdbc_hook': ['JdbcHook'],
'dbapi_hook': ['DbApiHook'],
'mssql_hook': ['MsSqlHook'],
'oracle_hook': ['OracleHook'],
}


import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _hooks)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'airflow.hooks' has been "
"deprecated. Please import from "
"'airflow.hooks.[hook_module]' instead. Support for direct imports "
"will be dropped entirely in Airflow 2.0.".format(i=_i))

from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _hooks)

def _integrate_plugins():
"""Integrate plugins to the context"""
Expand Down
70 changes: 21 additions & 49 deletions airflow/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


# Only import Core Airflow Operators that don't have extra requirements.
# All other operators must be imported directly.
import sys
from airflow.models import BaseOperator
from .bash_operator import BashOperator
from .python_operator import (
BranchPythonOperator,
PythonOperator,
ShortCircuitOperator)
from .check_operator import (
CheckOperator,
ValueCheckOperator,
IntervalCheckOperator)
from .dagrun_operator import TriggerDagRunOperator
from .dummy_operator import DummyOperator
from .email_operator import EmailOperator
from .http_operator import SimpleHttpOperator
import airflow.operators.sensors
from .subdag_operator import SubDagOperator




# ------------------------------------------------------------------------
Expand All @@ -46,36 +29,33 @@
#
# ------------------------------------------------------------------------


# Imports operators dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

# These need to be integrated first as other operators depend on them
# _import_module_attrs(globals(), {
# 'check_operator': [
# 'CheckOperator',
# 'ValueCheckOperator',
# 'IntervalCheckOperator',
# ],
# })

_operators = {
# 'bash_operator': ['BashOperator'],
# 'python_operator': [
# 'PythonOperator',
# 'BranchPythonOperator',
# 'ShortCircuitOperator',
# ],
'bash_operator': ['BashOperator'],
'check_operator': [
'CheckOperator',
'ValueCheckOperator',
'IntervalCheckOperator',
],
'python_operator': [
'PythonOperator',
'BranchPythonOperator',
'ShortCircuitOperator',
],
'hive_operator': ['HiveOperator'],
'pig_operator': ['PigOperator'],
'presto_check_operator': [
'PrestoCheckOperator',
'PrestoValueCheckOperator',
'PrestoIntervalCheckOperator',
],
# 'dagrun_operator': ['TriggerDagRunOperator'],
# 'dummy_operator': ['DummyOperator'],
# 'email_operator': ['EmailOperator'],
'dagrun_operator': ['TriggerDagRunOperator'],
'dummy_operator': ['DummyOperator'],
'email_operator': ['EmailOperator'],
'hive_to_samba_operator': ['Hive2SambaOperator'],
'mysql_operator': ['MySqlOperator'],
'sqlite_operator': ['SqliteOperator'],
Expand All @@ -95,13 +75,13 @@
'TimeSensor',
'WebHdfsSensor',
],
# 'subdag_operator': ['SubDagOperator'],
'subdag_operator': ['SubDagOperator'],
'hive_stats_operator': ['HiveStatsCollectionOperator'],
's3_to_hive_operator': ['S3ToHiveTransfer'],
'hive_to_mysql': ['HiveToMySqlTransfer'],
'presto_to_mysql': ['PrestoToMySqlTransfer'],
's3_file_transform_operator': ['S3FileTransformOperator'],
# 'http_operator': ['SimpleHttpOperator'],
'http_operator': ['SimpleHttpOperator'],
'hive_to_druid': ['HiveToDruidTransfer'],
'jdbc_operator': ['JdbcOperator'],
'mssql_operator': ['MsSqlOperator'],
Expand All @@ -113,16 +93,8 @@

import os as _os
if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
from zope.deprecation import deprecated as _deprecated
_imported = _import_module_attrs(globals(), _operators)
for _i in _imported:
_deprecated(
_i,
"Importing {i} directly from 'airflow.operators' has been "
"deprecated. Please import from "
"'airflow.operators.[operator_module]' instead. Support for direct "
"imports will be dropped entirely in Airflow 2.0.".format(i=_i))

from airflow.utils.helpers import AirflowImporter
airflow_importer = AirflowImporter(sys.modules[__name__], _operators)

def _integrate_plugins():
"""Integrate plugins to the context"""
Expand Down
Loading

0 comments on commit dce633e

Please sign in to comment.