Skip to content

FEATURE: Refresh aws token  #54

Open
@duferdev

Description

@duferdev

Feature description

  • what is the functionality you are trying to add/what is the problem you are trying to solve?

So basically we are facing this issue located in the k8s python client:

kubernetes-client/python#741

This has been opened for more than one year. As far as I understand, once the client is instanced, there is no way to refresh it's credentials given that it caches them. Because of that, when credentials expire, client calls respond with 401. Meanwhile, people started to develop workarounds on their solutions. The most popular seems to be creating a new client with fresh credentials every time we need to call the client.

So basically that's what i'm proposing. Because this workaround could be ugly for people using the operator that is not suffering this issue, we should consider make some kind of feature toggling from operator params or something like that.

This is my naive proposal given my short knowledge on this but any other proposal would be appreciated.

  • what's the importance? what dose it affect?

Because, in our case, we can't run any job that last more than 15 minutes, so our pipelines crash and our pods weren't deleted and got staled (incrementing our infra costs since we are using aws fargate).

Describe alternatives you've considered
Try to catch 401 responses and refresh credentials there somehow.

Additional context

We are running everything on aws. We have MWAA (airflow 2.0) with dags running tasks like this:

 KubernetesLegacyJobOperator(
        task_id="task1",
        namespace="fargate",
        config_file=kube_config_yaml,
        get_logs=True,
        startup_timeout_seconds=300,
        body_filepath="/usr/local/airflow/dags/config/task1.yaml",
        dag=pipeline,
        is_delete_operator_pod=True,
        delete_policy="Always",
        execution_timeout=timedelta(hours=1)
    )

Yaml file looks like

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  annotations:
    kubernetes_job_operator.main_container: "spark-kubernetes-driver"
  name: "foo"
  namespace: fargate
spec:
  arguments:
  ...
  deps:
    files:
    - "local:///etc/spark/conf/default-logstash-fields.properties"
  driver:
    coreLimit: 2000m
    coreRequest: 1995m
    cores: 2
    env:
    ....
    initContainers:
    - command:
      - sh
      - "-c"
      - "echo hi"
      image: busybox
      name: "volume-mount-hack"
      volumeMounts:
      - mountPath: /tmp/committer
        name: "staging-vol"
        readOnly: false
    labels:
      metrics: prometheus
      version: '3.1.1'
    memory: 4086M
    podSecurityContext:
      fsGroup: 185
    serviceAccount: fargate
    sidecars:
    - command:
      - "/fluent-bit/bin/fluent-bit"
      - "-c"
      - "/tmp/fluent-bit/fluent-bit-custom.conf"
      image: "fluent/fluent-bit:1.7"
      name: "fluent-bit"
      resources:
        limits:
          cpu: 50m
          memory: 60Mi
        requests:
          cpu: 5m
          memory: 10Mi
      volumeMounts:
      - mountPath: "/tmp/spark-logs"
        name: "spark-logs"
        readOnly: false
      - mountPath: "/tmp/fluent-bit"
        name: "fluent-bit"
        readOnly: false
    volumeMounts:
    - mountPath: "/tmp/spark-logs"
      name: "spark-logs"
      readOnly: false
    - mountPath: "/tmp/fluent-bit"
      name: "fluent-bit"
      readOnly: false
    - mountPath: /tmp/committer
      name: "staging-vol"
      readOnly: false
  dynamicAllocation:
    enabled: true
    initialExecutors: 4
    maxExecutors: 4
    minExecutors: 2
  executor:
    coreLimit: 2000m
    coreRequest: 1995m
    cores: 2
    deleteOnTermination: true
    labels:
      metrics: prometheus
      version: '3.1.1'
    memory: 6134M
    podSecurityContext:
      fsGroup: 185
    serviceAccount: fargate
    sidecars:
    - command:
      - "/fluent-bit/bin/fluent-bit"
      - "-c"
      - "/tmp/fluent-bit/fluent-bit-custom.conf"
      image: "fluent/fluent-bit:1.7"
      name: "fluent-bit"
      resources:
        limits:
          cpu: 50m
          memory: 60Mi
        requests:
          cpu: 5m
          memory: 10Mi
      volumeMounts:
      - mountPath: "/tmp/spark-logs"
        name: "spark-logs"
        readOnly: false
      - mountPath: "/tmp/fluent-bit"
        name: "fluent-bit"
        readOnly: false
    volumeMounts:
    - mountPath: "/tmp/spark-logs"
      name: "spark-logs"
      readOnly: false
    - mountPath: "/tmp/fluent-bit"
      name: "fluent-bit"
      readOnly: false
    - mountPath: /tmp/committer
      name: "staging-vol"
      readOnly: false
  hadoopConf:
    fs.s3.maxRetries: '10'
    fs.s3a.aws.credentials.provider: ...
    fs.s3a.block.size: 64M
    fs.s3a.buffer.dir: /tmp/committer/buffer
    fs.s3a.committer.magic.enabled: 'false'
    fs.s3a.committer.name: partitioned
    fs.s3a.committer.staging.abort.pending.uploads: 'false'
    "fs.s3a.committer.staging.conflict-mode": replace
    fs.s3a.committer.staging.tmp.path: "file:///tmp/committer/staging"
    fs.s3a.connection.ssl.enabled: 'false'
    fs.s3a.experimental.fadvise: random
    fs.s3a.fast.upload.buffer: disk
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    fs.s3a.multipart.purge: 'false'
    fs.s3a.retry.throttle.interval: 10000ms
  image: "spark:3.1.1-v1.0.7"
  imagePullPolicy: IfNotPresent
  mainApplicationFile: "s3a://foo.jar"
  mainClass: com.foo.FooApp
  mode: cluster
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      ...
  restartPolicy:
    type: Never
  sparkConf:
    ...
  sparkConfigMap: "spark-conf-map-foo"
  sparkVersion: '3.1.1'
  template:
    metadata:
      labels:
        app: "foo-pod"
  type: Scala
  volumes:
  - emptyDir: {}
    name: "spark-logs"
  - configMap:
      name: "fluent-bit-conf-map"
    name: "fluent-bit"
  - name: "staging-vol"
    persistentVolumeClaim:
      claimName: "data-staging-share"

As you can see, we are running spark applications.

Here is the kube conf file:

apiVersion: v1
clusters:
- cluster:
    certificate-authority-data: ***
  name: ****
contexts:
- context:
    cluster: ****
    user: ****
  name: aws
current-context: aws
kind: Config
preferences: {}
users:
- name: ****
  user:
    exec:
      apiVersion: ****
      args:
      - --region
      - ***
      - eks
      - get-token
      - --cluster-name
      - ****
      command: /usr/local/airflow/.local/bin/aws

Here is the way we get the token and the reason why it last for just 15 min

https://awscli.amazonaws.com/v2/documentation/api/latest/reference/eks/get-token.html

And here is one example for a failed client call log looks like:

[2021-09-15 10:31:15,682] {{client.py:483}} ERROR - Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 334, in query_loop
    collection_formats={},

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 420, in request
    body=body)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 270, in DELETE
    body=body)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
    raise ApiException(http_resp=r)

kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '49ea69bb-f188-4620-aa0e-e4e57ba77e95', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Wed, 15 Sep 2021 10:31:15 GMT', 'Content-Length': '129'})
HTTP response body: {'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, 'status': 'Failure', 'message': 'Unauthorized', 'reason': 'Unauthorized', 'code': 401}



During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/zthreading/tasks.py", line 173, in _run_as_thread
    rslt = self.action(*args, **kwargs)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 230, in _exdcute_query
    self.query_loop(client)

  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow_kubernetes_job_operator/kube_api/client.py", line 375, in query_loop
    raise err

airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiClientException: airflow_kubernetes_job_operator.kube_api.operations.DeleteNamespaceResource, Unauthorized: Unauthorized

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requesthelp wantedExtra attention is needed

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions