Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ R/.Rbuildignore

.DS_Store
.env
.venv
node_modules
main.js.map

Expand Down
5 changes: 4 additions & 1 deletion metaflow/plugins/argo/argo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ def trigger_workflow_template(self, name, usertype, username, parameters={}):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def schedule_workflow_template(self, name, schedule=None, timezone=None):
def schedule_workflow_template(
self, name, schedule=None, timezone=None, concurrency_policy=None
):
# Unfortunately, Kubernetes client does not handle optimistic
# concurrency control by itself unlike kubectl
client = self._client.get()
Expand All @@ -321,6 +323,7 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None):
"suspend": schedule is None,
"schedule": schedule,
"timezone": timezone,
"concurrencyPolicy": concurrency_policy,
"failedJobsHistoryLimit": 10000, # default is unfortunately 1
"successfulJobsHistoryLimit": 10000, # default is unfortunately 3
"workflowSpec": {"workflowTemplateRef": {"name": name}},
Expand Down
20 changes: 15 additions & 5 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def __init__(
self.parameters = self._process_parameters()
self.config_parameters = self._process_config_parameters()
self.triggers, self.trigger_options = self._process_triggers()
self._schedule, self._timezone = self._get_schedule()
self._schedule, self._timezone, self._concurrency_policy = self._get_schedule()

self._base_labels = self._base_kubernetes_labels()
self._base_annotations = self._base_kubernetes_annotations()
Expand Down Expand Up @@ -386,14 +386,18 @@ def _get_schedule(self):
if schedule:
# Remove the field "Year" if it exists
schedule = schedule[0]
return " ".join(schedule.schedule.split()[:5]), schedule.timezone
return None, None
return (
" ".join(schedule.schedule.split()[:5]),
schedule.timezone,
schedule.concurrency_policy,
)
return None, None, None

def schedule(self):
try:
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
argo_client.schedule_workflow_template(
self.name, self._schedule, self._timezone
self.name, self._schedule, self._timezone, self._concurrency_policy
)
# Register sensor.
# Metaflow will overwrite any existing sensor.
Expand Down Expand Up @@ -735,7 +739,13 @@ def _compile_workflow_template(self):
# hence configuring it to an empty string
if self._timezone is None:
self._timezone = ""
cron_info = {"schedule": self._schedule, "tz": self._timezone}
if self._concurrency_policy is None:
self._concurrency_policy = ""
cron_info = {
"schedule": self._schedule,
"tz": self._timezone,
"concurrency_policy": self._concurrency_policy,
}
annotations.update({"metaflow/cron": json.dumps(cron_info)})

if self.parameters:
Expand Down
2 changes: 2 additions & 0 deletions metaflow/plugins/aws/step_functions/schedule_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class ScheduleDecorator(FlowDecorator):
"daily": True,
"hourly": False,
"timezone": None,
"concurrency_policy": None,
}

def flow_init(
Expand All @@ -50,3 +51,4 @@ def flow_init(

# Argo Workflows supports the IANA timezone standard, e.g. America/Los_Angeles
self.timezone = self.attributes["timezone"]
self.concurrency_policy = self.attributes["concurrency_policy"]