Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airflow/providers/apache/pig/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@
Changelog
---------

4.0.0
.....

Breaking changes
~~~~~~~~~~~~~~~~

You cannot use ``pig_properties`` any more as connection extras. If you want to add extra parameters
to ``pig`` command, you need to do it via ``pig_properties`` (string list) of the PigCliHook (new parameter)
or via ``pig_opts`` (string with options separated by spaces) or ``pig_properties`` (string list) in
the PigOperator . Any use of ``pig_properties`` extras in connection will raise an exception,
informing that you need to remove them and pass them as parameters.

Both ``pig_properties`` and ``pig_opts`` are now templated fields in the PigOperator.

3.0.0
.....

Expand Down
29 changes: 18 additions & 11 deletions airflow/providers/apache/pig/hooks/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,38 @@


class PigCliHook(BaseHook):
"""
Simple wrapper around the pig CLI.

Note that you can also set default pig CLI properties using the
``pig_properties`` to be used in your connection as in
``{"pig_properties": "-Dpig.tmpfilecompression=true"}``
"""Simple wrapper around the pig CLI.

:param pig_cli_conn_id: Connection id used by the hook
:param pig_properties: additional properties added after pig cli command as list of strings.
"""

conn_name_attr = "pig_cli_conn_id"
default_conn_name = "pig_cli_default"
conn_type = "pig_cli"
hook_name = "Pig Client Wrapper"

def __init__(self, pig_cli_conn_id: str = default_conn_name) -> None:
def __init__(
self, pig_cli_conn_id: str = default_conn_name, pig_properties: list[str] | None = None
) -> None:
super().__init__()
conn = self.get_connection(pig_cli_conn_id)
self.pig_properties = conn.extra_dejson.get("pig_properties", "")
conn_pig_properties = conn.extra_dejson.get("pig_properties")
if conn_pig_properties:
raise RuntimeError(
"The PigCliHook used to have possibility of passing `pig_properties` to the Hook,"
" however with the 4.0.0 version of `apache-pig` provider it has been removed. You should"
" use ``pig_opts`` (space separated string) or ``pig_properties`` (string list) in the"
" PigOperator. You can also pass ``pig-properties`` in the PigCliHook `init`. Currently,"
f" the {pig_cli_conn_id} connection has those extras: `{conn_pig_properties}`."
)
self.pig_properties = pig_properties if pig_properties else []
self.conn = conn
self.sub_process = None

def run_cli(self, pig: str, pig_opts: str | None = None, verbose: bool = True) -> Any:
"""
Run an pig script using the pig cli
Run a pig script using the pig cli

>>> ph = PigCliHook()
>>> result = ph.run_cli("ls /;", pig_opts="-x mapreduce")
Expand All @@ -67,8 +75,7 @@ def run_cli(self, pig: str, pig_opts: str | None = None, verbose: bool = True) -
pig_cmd = [pig_bin]

if self.pig_properties:
pig_properties_list = self.pig_properties.split()
pig_cmd.extend(pig_properties_list)
pig_cmd.extend(self.pig_properties)
if pig_opts:
pig_opts_list = pig_opts.split()
pig_cmd.extend(pig_opts_list)
Expand Down
9 changes: 6 additions & 3 deletions airflow/providers/apache/pig/operators/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ class PigOperator(BaseOperator):
you may want to use this along with the
``DAG(user_defined_macros=myargs)`` parameter. View the DAG
object documentation for more details.
:param pig_opts: pig options, such as: -x tez, -useHCatalog, ...
:param pig_opts: pig options, such as: -x tez, -useHCatalog, ... - space separated list
:param pig_properties: pig properties, additional pig properties passed as list
"""

template_fields: Sequence[str] = ("pig",)
template_fields: Sequence[str] = ("pig", "pig_opts", "pig_properties")
template_ext: Sequence[str] = (
".pig",
".piglatin",
Expand All @@ -55,6 +56,7 @@ def __init__(
pig_cli_conn_id: str = "pig_cli_default",
pigparams_jinja_translate: bool = False,
pig_opts: str | None = None,
pig_properties: list[str] | None = None,
**kwargs: Any,
) -> None:

Expand All @@ -63,6 +65,7 @@ def __init__(
self.pig = pig
self.pig_cli_conn_id = pig_cli_conn_id
self.pig_opts = pig_opts
self.pig_properties = pig_properties
self.hook: PigCliHook | None = None

def prepare_template(self):
Expand All @@ -71,7 +74,7 @@ def prepare_template(self):

def execute(self, context: Context):
self.log.info("Executing: %s", self.pig)
self.hook = PigCliHook(pig_cli_conn_id=self.pig_cli_conn_id)
self.hook = PigCliHook(pig_cli_conn_id=self.pig_cli_conn_id, pig_properties=self.pig_properties)
self.hook.run_cli(pig=self.pig, pig_opts=self.pig_opts)

def on_kill(self):
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/apache/pig/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ description: |
`Apache Pig <https://pig.apache.org/>`__

versions:
- 4.0.0
- 3.0.0
- 2.0.4
- 2.0.3
Expand Down
13 changes: 10 additions & 3 deletions tests/providers/apache/pig/hooks/test_pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def get_connection(self, unused_id):

def test_init(self):
self.pig_hook()
self.extra_dejson.get.assert_called_once_with("pig_properties", "")

@mock.patch("subprocess.Popen")
def test_run_cli_success(self, popen_mock):
Expand Down Expand Up @@ -80,8 +79,7 @@ def test_run_cli_with_properties(self, popen_mock):
proc_mock.stdout.readline.return_value = b""
popen_mock.return_value = proc_mock

hook = self.pig_hook()
hook.pig_properties = test_properties
hook = self.pig_hook(pig_properties=["one", "two"])

stdout = hook.run_cli("")
assert stdout == ""
Expand All @@ -90,6 +88,15 @@ def test_run_cli_with_properties(self, popen_mock):
for pig_prop in test_properties.split():
assert pig_prop in popen_first_arg

def test_runtime_exception_not_raised_by_default(self):
PigCliHook()

@mock.patch("airflow.providers.apache.pig.hooks.pig.PigCliHook.get_connection")
def test_runtime_exception_when_properties_passed_by_connection(self, mock_get_connection):
mock_get_connection.return_value.extra_dejson = {"pig_properties": "one two three"}
with pytest.raises(RuntimeError):
PigCliHook()

@mock.patch("subprocess.Popen")
def test_run_cli_verbose(self, popen_mock):
test_stdout_lines = [b"one", b"two", b""]
Expand Down