Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ repos:
^providers/apache/hive/src/airflow/providers/apache/hive/transfers/vertica_to_hive\.py$|
^providers/apache/kafka/docs/connections/kafka\.rst$|
^providers/apache/spark/docs/decorators/pyspark\.rst$|
^providers/apache/spark/docs/connections/spark-submit.rst$|
^providers/apache/spark/src/airflow/providers/apache/spark/decorators/|
^providers/apache/spark/src/airflow/providers/apache/spark/hooks/|
^providers/apache/spark/src/airflow/providers/apache/spark/operators/|
Expand Down
18 changes: 11 additions & 7 deletions providers/apache/spark/docs/connections/spark-submit.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,21 @@ Spark binary (optional)
Kubernetes namespace (optional, only applies to spark on kubernetes applications)
Kubernetes namespace (``spark.kubernetes.namespace``) to divide cluster resources between multiple users (via resource quota).

When specifying the connection in environment variable you should specify
it using URI syntax.
.. note::

Note that all components of the URI should be URL-encoded. The URI and the mongo
connection string are not the same.
When specifying the connection in environment variable you should specify
it using URI syntax.
You can provide a standard Spark master URI directly.
The master URL will be parsed correctly without needing repeated prefixes such as ``spark://spark://...``
Ensure all URI components are URL-encoded.

For example:
For example:

.. code-block:: bash

export AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'

.. code-block:: bash

export AIRFLOW_CONN_SPARK_DEFAULT='spark://mysparkcluster.com:80?deploy-mode=cluster&spark_binary=command&namespace=kube+namespace'

.. warning::

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,20 @@ def _resolve_connection(self) -> dict[str, Any]:
# Master can be local, yarn, spark://HOST:PORT, mesos://HOST:PORT and
# k8s://https://<HOST>:<PORT>
conn = self.get_connection(self._conn_id)
if conn.port:
conn_data["master"] = f"{conn.host}:{conn.port}"
else:

# When connection is created from URI, the scheme (spark://, k8s://, etc.)
# is stored in conn_type, and conn.host contains only the hostname.
# When created from UI, conn_type is typically "spark" and conn.host
# may contain the full master URL (e.g., k8s://https://host).
if conn.conn_type == "spark" and conn.host and ("://" in conn.host or not conn.port):
# UI-based spark connection where host contains the full master URL
conn_data["master"] = conn.host
else:
# Reconstruct URL with conn_type as protocol
conn_data["master"] = f"{conn.conn_type}://{conn.host or ''}"

# Append port if provided
conn_data["master"] = f"{conn_data['master']}:{conn.port}" if conn.port else conn_data["master"]

# Determine optional yarn queue from the extra field
extra = conn.extra_dejson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,37 @@ def setup_connections(self, create_connection_without_db):
extra='{"keytab": "privileged_user.keytab"}',
)
)
create_connection_without_db(
Connection(
conn_id="spark_uri_with_protocol",
uri="spark://spark-master:7077",
)
)
create_connection_without_db(
Connection(
conn_id="spark_uri_yarn",
uri="yarn://yarn-master",
)
)
create_connection_without_db(
Connection(
conn_id="mesos_uri",
uri="mesos://mesos-host:5050",
)
)
create_connection_without_db(
Connection(
conn_id="k8s_uri",
uri="k8s://https://k8s-host:443",
)
)

create_connection_without_db(
Connection(
conn_id="local_uri",
uri="spark://local",
)
)

@pytest.mark.db_test
@patch(
Expand Down Expand Up @@ -532,6 +563,31 @@ def test_resolve_connection_spark_binary_spark3_submit_set_connection(self):
assert connection == expected_spark_connection
assert cmd[0] == "spark3-submit"

def test_resolve_connection_spark_uri_with_protocol(self):
hook = SparkSubmitHook(conn_id="spark_uri_with_protocol")
connection = hook._resolve_connection()
assert connection["master"] == "spark://spark-master:7077"

def test_resolve_connection_spark_uri_yarn(self):
hook = SparkSubmitHook(conn_id="spark_uri_yarn")
connection = hook._resolve_connection()
assert connection["master"] == "yarn://yarn-master"

def test_resolve_connection_mesos_uri(self):
hook = SparkSubmitHook(conn_id="mesos_uri")
connection = hook._resolve_connection()
assert connection["master"] == "mesos://mesos-host:5050"

def test_resolve_connection_k8s_uri(self):
hook = SparkSubmitHook(conn_id="k8s_uri")
connection = hook._resolve_connection()
assert connection["master"] == "k8s://https://k8s-host:443"

def test_resolve_connection_local_uri(self):
hook = SparkSubmitHook(conn_id="local_uri")
connection = hook._resolve_connection()
assert connection["master"] == "local"

def test_resolve_connection_custom_spark_binary_allowed_in_hook(self):
SparkSubmitHook(conn_id="spark_binary_set", spark_binary="another-custom-spark-submit")

Expand Down