@@ -175,7 +175,7 @@ def __init__(
175175 self .parameters = self ._process_parameters ()
176176 self .config_parameters = self ._process_config_parameters ()
177177 self .triggers , self .trigger_options = self ._process_triggers ()
178- self ._schedule , self ._timezone = self ._get_schedule ()
178+ self ._schedule , self ._timezone , self . _concurrency_policy = self ._get_schedule ()
179179
180180 self ._base_labels = self ._base_kubernetes_labels ()
181181 self ._base_annotations = self ._base_kubernetes_annotations ()
@@ -386,14 +386,18 @@ def _get_schedule(self):
386386 if schedule :
387387 # Remove the field "Year" if it exists
388388 schedule = schedule [0 ]
389- return " " .join (schedule .schedule .split ()[:5 ]), schedule .timezone
389+ return (
390+ " " .join (schedule .schedule .split ()[:5 ]),
391+ schedule .timezone ,
392+ schedule .concurrency_policy ,
393+ )
390394 return None , None
391395
392396 def schedule (self ):
393397 try :
394398 argo_client = ArgoClient (namespace = KUBERNETES_NAMESPACE )
395399 argo_client .schedule_workflow_template (
396- self .name , self ._schedule , self ._timezone
400+ self .name , self ._schedule , self ._timezone , self . _concurrency_policy
397401 )
398402 # Register sensor.
399403 # Metaflow will overwrite any existing sensor.
@@ -731,11 +735,14 @@ def _compile_workflow_template(self):
731735
732736 annotations = {}
733737 if self ._schedule is not None :
734- # timezone is an optional field and json dumps on None will result in null
735- # hence configuring it to an empty string
736- if self ._timezone is None :
737- self ._timezone = ""
738- cron_info = {"schedule" : self ._schedule , "tz" : self ._timezone }
738+ # timezone and concurrency_policy is an optional field and json
739+ # dumps on None will result in null hence configuring it to an empty
740+ # string
741+ cron_info = {
742+ "schedule" : self ._schedule ,
743+ "tz" : self ._timezone or "" ,
744+ "concurrency_policy" : self ._concurrency_policy or "" ,
745+ }
739746 annotations .update ({"metaflow/cron" : json .dumps (cron_info )})
740747
741748 if self .parameters :
0 commit comments