Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,13 @@ core:
type: integer
example: ~
default: "4096"
execution_api_server_url:
description: |
The url of the execution api server.
version_added: 3.0.0
type: string
example: ~
default: "http://localhost:9091/execution/"
database:
description: ~
options:
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _execute_work(log: logging.Logger, workload: workloads.ExecuteTask) -> None:
dag_rel_path=workload.dag_rel_path,
bundle_info=workload.bundle_info,
token=workload.token,
server=conf.get("workers", "execution_api_server_url", fallback="http://localhost:9091/execution/"),
server=conf.get("core", "execution_api_server_url"),
log_path=workload.log_path,
)

Expand Down
8 changes: 4 additions & 4 deletions chart/templates/configmaps/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ metadata:
{{- end }}
{{- $Global := . }}
data:
{{/*- Set a default for workers.execution_api_server_url pointing to the api-server service if it's not set -*/}}
{{/*- Set a default for core.execution_api_server_url pointing to the api-server service if it's not set -*/}}
{{- if semverCompare ">=3.0.0" .Values.airflowVersion -}}
{{- $config := merge .Values.config ( dict "workers" dict )}}
{{- if not (hasKey $config.workers "execution_api_server_url") -}}
{{- $_ := set $config.workers "execution_api_server_url" (printf "http://%s-api-server:%d/execution/" (include "airflow.fullname" .) (int .Values.ports._apiServer)) -}}
{{- $config := merge .Values.config ( dict "core" dict )}}
{{- if not (hasKey $config.core "execution_api_server_url") -}}
{{- $_ := set $config.core "execution_api_server_url" (printf "http://%s-api-server:%d/execution/" (include "airflow.fullname" .) (int .Values.ports._apiServer)) -}}
{{- end -}}
{{- end -}}
# These are system-specified config overrides.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def execute_workload(input: str) -> None:
dag_rel_path=workload.dag_rel_path,
bundle_info=workload.bundle_info,
token=workload.token,
server=conf.get("workers", "execution_api_server_url", fallback="http://localhost:9091/execution/"),
server=conf.get("core", "execution_api_server_url"),
log_path=workload.log_path,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,7 @@ def _run_job_via_supervisor(
dag_rel_path=workload.dag_rel_path,
bundle_info=workload.bundle_info,
token=workload.token,
server=conf.get(
"workers", "execution_api_server_url", fallback="http://localhost:9091/execution/"
),
server=conf.get("core", "execution_api_server_url"),
log_path=workload.log_path,
)
return 0
Expand Down
7 changes: 1 addition & 6 deletions task_sdk/src/airflow/sdk/execution_time/execute_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ def execute_workload(input: str) -> None:
dag_rel_path=workload.dag_rel_path,
bundle_info=workload.bundle_info,
token=workload.token,
# fallback to internal cluster service for api server
server=conf.get(
"workers",
"execution_api_server_url",
fallback="http://airflow-api-server.airflow.svc.cluster.local:9091/execution/",
),
server=conf.get("core", "execution_api_server_url"),
log_path=workload.log_path,
)

Expand Down