Skip to content

307 - Modular DAG log levels #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import (
DEFAULT_LOG_LEVEL,
EC2_TYPES,
NODE_POOL_DEFAULT,
NODE_POOL_HIGH_WORKLOAD,
POD_LABEL,
POD_NAMESPACE,
SPS_DOCKER_CWL_IMAGE,
build_ec2_type_label,
get_affinity,
)

from airflow import DAG

# Note: each Pod is assigned the same label to assure that (via the anti-affinity requirements)
# two Pods with the same label cannot run on the same Node
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.5.3"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner).
# This is fixed to the EFS /scratch directory in this DAG.
Expand All @@ -52,6 +50,7 @@
}
)

LOG_LEVEL_TYPE = {10: "DEBUG", 20: "INFO"}

# Default DAG configuration
dag_default_args = {
Expand Down Expand Up @@ -81,6 +80,14 @@
title="CWL workflow parameters",
description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"),
),
"log_level": Param(
DEFAULT_LOG_LEVEL,
type="integer",
enum=list(LOG_LEVEL_TYPE.keys()),
values_display={key: f"{key} ({value})" for key, value in LOG_LEVEL_TYPE.items()},
title="Processing log levels",
description=("Log level for DAG processing"),
),
"request_instance_type": Param(
"t3.medium",
type="string",
Expand Down Expand Up @@ -136,6 +143,9 @@ def setup(ti=None, **context):
ti.xcom_push(key="ecr_login", value=ecr_login)
logging.info("ECR login: %s", ecr_login)

# select log level based on debug
logging.info(f"Selecting log level: {context['params']['log_level']}.")


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)

Expand All @@ -154,6 +164,8 @@ def setup(ti=None, **context):
"{{ params.cwl_workflow }}",
"-j",
"{{ params.cwl_args }}",
"-l",
"{{ params.log_level }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
Expand Down
156 changes: 52 additions & 104 deletions airflow/dags/cwl_dag_modular.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,37 @@
from datetime import datetime

import boto3
import unity_sps_utils
from airflow.models.baseoperator import chain
from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import (
DEFAULT_LOG_LEVEL,
DS_S3_BUCKET_PARAM,
EC2_TYPES,
LOG_LEVEL_TYPE,
NODE_POOL_DEFAULT,
NODE_POOL_HIGH_WORKLOAD,
POD_LABEL,
POD_NAMESPACE,
SPS_DOCKER_CWL_IMAGE,
build_ec2_type_label,
get_affinity,
)

from airflow import DAG

# Task constants
STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-daac/stage-in.cwl"
STAGE_OUT_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-out-stac-catalog/stage-out.cwl"
STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/307-log-levels/demos/stage_in_log_level.cwl"
STAGE_OUT_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/307-log-levels/demos/stage_out_cwl_log_level.cwl"
LOCAL_DIR = "/shared-task-data"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner).
# This is fixed to the EFS /scratch directory in this DAG.
WORKING_DIR = "/scratch"
# WORKING_DIR = "/scratch"

# Default parameters
DEFAULT_STAC_JSON = "https://raw.githubusercontent.com/unity-sds/unity-tutorial-application/refs/heads/main/test/stage_in/stage_in_results.json"
Expand All @@ -50,92 +63,13 @@
}
)

EC2_TYPES = {
"t3.micro": {
"desc": "General Purpose",
"cpu": 1,
"memory": 1,
},
"t3.small": {
"desc": "General Purpose",
"cpu": 2,
"memory": 2,
},
"t3.medium": {
"desc": "General Purpose",
"cpu": 2,
"memory": 4,
},
"t3.large": {
"desc": "General Purpose",
"cpu": 2,
"memory": 8,
},
"t3.xlarge": {
"desc": "General Purpose",
"cpu": 4,
"memory": 16,
},
"t3.2xlarge": {
"desc": "General Purpose",
"cpu": 8,
"memory": 32,
},
"r7i.xlarge": {
"desc": "Memory Optimized",
"cpu": 4,
"memory": 32,
},
"r7i.2xlarge": {
"desc": "Memory Optimized",
"cpu": 8,
"memory": 64,
},
"r7i.4xlarge": {
"desc": "Memory Optimized",
"cpu": 16,
"memory": 128,
},
"r7i.8xlarge": {
"desc": "Memory Optimized",
"cpu": 32,
"memory": 256,
},
"c6i.xlarge": {
"desc": "Compute Optimized",
"cpu": 4,
"memory": 8,
},
"c6i.2xlarge": {
"desc": "Compute Optimized",
"cpu": 8,
"memory": 16,
},
"c6i.4xlarge": {
"desc": "Compute Optimized",
"cpu": 16,
"memory": 32,
},
"c6i.8xlarge": {
"desc": "Compute Optimized",
"cpu": 32,
"memory": 64,
},
}

# Default DAG configuration
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}


# "t3.large": "t3.large (General Purpose: 2vCPU, 8GiB)",
def build_ec2_type_label(key):
return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)"


dag = DAG(
dag_id="cwl_dag_modular",
description="CWL DAG Modular",
Expand Down Expand Up @@ -168,6 +102,14 @@ def build_ec2_type_label(key):
"The processing job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"
),
),
"log_level": Param(
DEFAULT_LOG_LEVEL,
type="integer",
enum=list(LOG_LEVEL_TYPE.keys()),
values_display={key: f"{key} ({value})" for key, value in LOG_LEVEL_TYPE.items()},
title="Processing log levels",
description=("Log level for modular DAG processing"),
),
"request_instance_type": Param(
"t3.medium",
type="string",
Expand Down Expand Up @@ -196,7 +138,7 @@ def select_node_pool(ti, request_storage, request_instance_type):
"""
Select node pool based on resources requested in input parameters.
"""
node_pool = unity_sps_utils.NODE_POOL_DEFAULT
node_pool = NODE_POOL_DEFAULT
storage = int(request_storage[0:-2]) # 100Gi -> 100
ti.xcom_push(key="container_storage", value=storage)
logging.info(f"Selecting container storage={storage}")
Expand All @@ -209,7 +151,7 @@ def select_node_pool(ti, request_storage, request_instance_type):

logging.info(f"Requesting storage={storage}Gi memory={memory}Gi CPU={cpu}")
if (storage > 30) or (memory > 32) or (cpu > 8):
node_pool = unity_sps_utils.NODE_POOL_HIGH_WORKLOAD
node_pool = NODE_POOL_HIGH_WORKLOAD
ti.xcom_push(key="node_pool", value=node_pool)
logging.info(f"Selecting node pool={node_pool}")

Expand All @@ -231,9 +173,9 @@ def select_stage_out(ti):

project = os.environ["AIRFLOW_VAR_UNITY_PROJECT"]
venue = os.environ["AIRFLOW_VAR_UNITY_VENUE"]
staging_bucket = ssm_client.get_parameter(Name=unity_sps_utils.DS_S3_BUCKET_PARAM, WithDecryption=True)[
"Parameter"
]["Value"]
staging_bucket = ssm_client.get_parameter(Name=DS_S3_BUCKET_PARAM, WithDecryption=True)["Parameter"][
"Value"
]

stage_out_args = json.dumps({"project": project, "venue": venue, "staging_bucket": staging_bucket})
logging.info(f"Selecting stage out args={stage_out_args}")
Expand All @@ -260,16 +202,19 @@ def setup(ti=None, **context):
# retrieve stage out aws api key and account id
select_stage_out(ti)

# select log level based on debug
logging.info(f"Selecting log level: {context['params']['log_level']}.")


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


cwl_task_processing = unity_sps_utils.SpsKubernetesPodOperator(
cwl_task_processing = KubernetesPodOperator(
retries=0,
task_id="cwl_task_processing",
namespace=unity_sps_utils.POD_NAMESPACE,
namespace=POD_NAMESPACE,
name="cwl-task-pod",
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE,
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
Expand All @@ -286,38 +231,41 @@ def setup(ti=None, **context):
"{{ params.process_args }}",
"-o",
STAGE_OUT_WORKFLOW,
"-d",
"-a",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_out_args') }}",
"-l",
"{{ params.log_level }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
# volume_mounts=[
# k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
# ],
# volumes=[
# k8s.V1Volume(
# name="workers-volume",
# persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
# )
# ],
dag=dag,
node_selector={
"karpenter.sh/nodepool": "{{ti.xcom_pull(task_ids='Setup', key='node_pool')}}",
"node.kubernetes.io/instance-type": "{{ti.xcom_pull(task_ids='Setup', key='instance_type')}}",
},
labels={"pod": unity_sps_utils.POD_LABEL},
labels={"pod": POD_LABEL},
annotations={"karpenter.sh/do-not-disrupt": "true"},
# note: 'affinity' cannot yet be templated
affinity=unity_sps_utils.get_affinity(
affinity=get_affinity(
capacity_type=["spot"],
# instance_type=["t3.2xlarge"],
anti_affinity_label=unity_sps_utils.POD_LABEL,
anti_affinity_label=POD_LABEL,
),
on_finish_action="keep_pod",
is_delete_operator_pod=False,
do_xcom_push=True,
)


Expand Down
17 changes: 14 additions & 3 deletions airflow/docker/cwl/docker_cwl_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
# Set to a local path on the Pod EBS volume
WORKING_DIR="/data"

set -ex
while getopts w:j:e:o: flag
while getopts w:j:e:o:l: flag
do
case "${flag}" in
w) cwl_workflow=${OPTARG};;
j) job_args=${OPTARG};;
e) ecr_login=${OPTARG};;
o) json_output=${OPTARG};;
l) log_level=${OPTARG};;
esac
done

# determine logging level
if [ "$log_level" -eq 10 ]; then
set -ex
else
set -e
fi

# create working directory if it doesn't exist
mkdir -p "$WORKING_DIR"
cd $WORKING_DIR
Expand Down Expand Up @@ -75,7 +82,11 @@ fi
# List contents when done
pwd
ls -lR
cwl-runner --debug --tmp-outdir-prefix "$PWD"/ --no-read-only "$cwl_workflow" "$job_args"
if [ "$log_level" -eq 10 ]; then
cwl-runner --debug --tmp-outdir-prefix "$PWD"/ --no-read-only "$cwl_workflow" "$job_args"
else
cwl-runner --quiet --tmp-outdir-prefix "$PWD"/ --no-read-only "$cwl_workflow" "$job_args"
fi
ls -lR

# Optionally, save the requested output file to a location
Expand Down
Loading