-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Description
It appears that SparkSubmitHook's implementation of kerberos support is orthogonal to the airflow kerberos support. It requires us to specify a keytab in the task which will not work for us because our worker container intentionally does not have a volume mount for the keytab. This means we will have to modify SparkSubmit[Hook/Operator] to use ticket cache.
Use case / motivation
I want to use spark submit operator natively w/ ccache.
**Proposed Changes **
I think this should be an addition of "use_krb5ccache" variable to hook and operator that defaults to 'False' for backwards compatibility.
When it is True we should add the following to the spark submit command construction:
if self.use_krb5ccache:
if not os.getenv('KRB5CCNAME'):
raise AirflowException("KRB5CCNAME environment variable not set while trying to us ticket from ccache.")
connection_cmd += [
"--conf",
"spark.kerberos.renewal.credentials=ccache"
]We should also fall back to use principal from security kerberos config if not specified in the task.
self._principal = principal if principal else conf.get('kerberos', 'principal')
Note I've tested something similar via cluster policy as a workaround in my current project:
def spark_tasks_use_ccache(task: BaseOperator):
"""Configure SparkSubmitOperator tasks to use kerberos ticket cache."""
if isinstance(task, SparkSubmitOperator):
# pylint: disable=protected-access
if task._conf: # noqa
task._conf["spark.kerberos.renewal.credentials"] = "ccache" # noqa
task._principal = conf.get('kerberos', 'principal')cc: @mik-laj @potiuk WDYT about contributing this sort of thing back to airflow? should we change this in the hook? would it be interesting to contribute useful example cluster policies to airflow?