Description
Feature description
- what is the functionality you are trying to add/what is the problem you are trying to solve?
So basically we are facing this issue located in the k8s python client:
This has been opened for more than one year. As far as I understand, once the client is instanced, there is no way to refresh it's credentials given that it caches them. Because of that, when credentials expire, client calls respond with 401. Meanwhile, people started to develop workarounds on their solutions. The most popular seems to be creating a new client with fresh credentials every time we need to call the client.
So basically that's what i'm proposing. Because this workaround could be ugly for people using the operator that is not suffering this issue, we should consider make some kind of feature toggling from operator params or something like that.
This is my naive proposal given my short knowledge on this but any other proposal would be appreciated.
- what's the importance? what dose it affect?
Because, in our case, we can't run any job that last more than 15 minutes, so our pipelines crash and our pods weren't deleted and got staled (incrementing our infra costs since we are using aws fargate).
Describe alternatives you've considered
Try to catch 401 responses and refresh credentials there somehow.
Additional context
We are running everything on aws. We have MWAA (airflow 2.0) with dags running tasks like this:
KubernetesLegacyJobOperator(
task_id="task1",
namespace="fargate",
config_file=kube_config_yaml,
get_logs=True,
startup_timeout_seconds=300,
body_filepath="/usr/local/airflow/dags/config/task1.yaml",
dag=pipeline,
is_delete_operator_pod=True,
delete_policy="Always",
execution_timeout=timedelta(hours=1)
)
Yaml file looks like
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
annotations:
kubernetes_job_operator.main_container: "spark-kubernetes-driver"
name: "foo"
namespace: fargate
spec:
arguments:
...
deps:
files:
- "local:///etc/spark/conf/default-logstash-fields.properties"
driver:
coreLimit: 2000m
coreRequest: 1995m
cores: 2
env:
....
initContainers:
- command:
- sh
- "-c"
- "echo hi"
image: busybox
name: "volume-mount-hack"
volumeMounts:
- mountPath: /tmp/committer
name: "staging-vol"
readOnly: false
labels:
metrics: prometheus
version: '3.1.1'
memory: 4086M
podSecurityContext:
fsGroup: 185
serviceAccount: fargate
sidecars:
- command:
- "/fluent-bit/bin/fluent-bit"
- "-c"
- "/tmp/fluent-bit/fluent-bit-custom.conf"
image: "fluent/fluent-bit:1.7"
name: "fluent-bit"
resources:
limits:
cpu: 50m
memory: 60Mi
requests:
cpu: 5m
memory: 10Mi
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
- mountPath: /tmp/committer
name: "staging-vol"
readOnly: false
dynamicAllocation:
enabled: true
initialExecutors: 4
maxExecutors: 4
minExecutors: 2
executor:
coreLimit: 2000m
coreRequest: 1995m
cores: 2
deleteOnTermination: true
labels:
metrics: prometheus
version: '3.1.1'
memory: 6134M
podSecurityContext:
fsGroup: 185
serviceAccount: fargate
sidecars:
- command:
- "/fluent-bit/bin/fluent-bit"
- "-c"
- "/tmp/fluent-bit/fluent-bit-custom.conf"
image: "fluent/fluent-bit:1.7"
name: "fluent-bit"
resources:
limits:
cpu: 50m
memory: 60Mi
requests:
cpu: 5m
memory: 10Mi
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
volumeMounts:
- mountPath: "/tmp/spark-logs"
name: "spark-logs"
readOnly: false
- mountPath: "/tmp/fluent-bit"
name: "fluent-bit"
readOnly: false
- mountPath: /tmp/committer
name: "staging-vol"
readOnly: false
hadoopConf:
fs.s3.maxRetries: '10'
fs.s3a.aws.credentials.provider: ...
fs.s3a.block.size: 64M
fs.s3a.buffer.dir: /tmp/committer/buffer
fs.s3a.committer.magic.enabled: 'false'
fs.s3a.committer.name: partitioned
fs.s3a.committer.staging.abort.pending.uploads: 'false'
"fs.s3a.committer.staging.conflict-mode": replace
fs.s3a.committer.staging.tmp.path: "file:///tmp/committer/staging"
fs.s3a.connection.ssl.enabled: 'false'
fs.s3a.experimental.fadvise: random
fs.s3a.fast.upload.buffer: disk
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
fs.s3a.multipart.purge: 'false'
fs.s3a.retry.throttle.interval: 10000ms
image: "spark:3.1.1-v1.0.7"
imagePullPolicy: IfNotPresent
mainApplicationFile: "s3a://foo.jar"
mainClass: com.foo.FooApp
mode: cluster
monitoring:
exposeDriverMetrics: true
exposeExecutorMetrics: true
prometheus:
...
restartPolicy:
type: Never
sparkConf:
...
sparkConfigMap: "spark-conf-map-foo"
sparkVersion: '3.1.1'
template:
metadata:
labels:
app: "foo-pod"
type: Scala
volumes:
- emptyDir: {}
name: "spark-logs"
- configMap:
name: "fluent-bit-conf-map"
name: "fluent-bit"
- name: "staging-vol"
persistentVolumeClaim:
claimName: "data-staging-share"
As you can see, we are running spark applications.
Here is the kube conf file:
apiVersion: v1
clusters:
- cluster:
certificate-authority-data: ***
name: ****
contexts:
- context:
cluster: ****
user: ****
name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: ****
user:
exec:
apiVersion: ****
args:
- --region
- ***
- eks
- get-token
- --cluster-name
- ****
command: /usr/local/airflow/.local/bin/aws
Here is the way we get the token and the reason why it last for just 15 min
https://awscli.amazonaws.com/v2/documentation/api/latest/reference/eks/get-token.html
And here is one example for a failed client call log looks like:
[2021-09-15 10:31:15,682] {{client.py:483}} ERROR - Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 334, in query_loop
collection_formats={},
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 420, in request
body=body)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 270, in DELETE
body=body)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '49ea69bb-f188-4620-aa0e-e4e57ba77e95', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 15 Sep 2021 10:31:15 GMT', 'Content-Length': '129'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'Unauthorized', 'reason': 'Unauthorized', 'code': 401}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.7/site-packages/zthreading/tasks.py", line 173, in _run_as_thread
rslt = self.action(*args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _exdcute_query
self.query_loop(client)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 375, in query_loop
raise err
airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.DeleteNamespaceResource, Unauthorized: Unauthorized