From 20b2a2bdd1673a2ffaedfd48abc1fdfacc2d50e0 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Sat, 12 Nov 2022 10:44:09 +0800 Subject: [PATCH] [feat] Add execute type to workflow (#9) Up to now, we can only submit workflow with parallel mode. this patch give users ability specific execute type to workflow --- docs/source/concept.rst | 32 +++++++++ docs/source/config.rst | 72 ++++++++++--------- src/pydolphinscheduler/configuration.py | 3 + .../core/process_definition.py | 39 +++++++++- src/pydolphinscheduler/default_config.yaml | 3 + src/pydolphinscheduler/java_gateway.py | 2 +- tests/core/test_process_definition.py | 19 +++++ tests/utils/test_yaml_parser.py | 1 + 8 files changed, 134 insertions(+), 37 deletions(-) diff --git a/docs/source/concept.rst b/docs/source/concept.rst index 9a9527d..de49c9c 100644 --- a/docs/source/concept.rst +++ b/docs/source/concept.rst @@ -90,6 +90,38 @@ Tenant is the user who run task command in machine or in virtual machine. it cou Make should tenant exists in target machine, otherwise it will raise an error when you try to run command +Execution Type +~~~~~~~~~~~~~~ + +Decision which behavior to run when process definition have multiple instances. when process definition +schedule interval is too short, it may cause multiple instances run at the same time. We can use this +parameter to control the behavior about how to run those process definition instances. Currently we +have four execution type: + +* ``parallel`` (default value): it means all instances will allow to run even though the previous + instance is not finished. +* ``serial_wait``: it means the all instance will wait for the previous instance to finish, and + all the waiting instances will be executed base on scheduling order. +* ``serial_discard``: it means the all instance will be discard(abandon) if the previous instance + is not finished. +* ``serial_priority``: it means the all instance will wait for the previous instance to finish, + and all the waiting instances will be executed base on process definition priority order. + +Parameter ``execution type`` can be set in + +* Direct assign statement. You can pick execute type from above and direct assign to parameter + ``execution_type``. + + .. code-block:: python + + pd = ProcessDefinition( + name="process-definition", + execution_type="parallel" + ) + +* Via environment variables, configurations file setting, for more detail about those way setting, you can see + you can read :doc:`config` section. + Tasks ----- diff --git a/docs/source/config.rst b/docs/source/config.rst index 29a143d..3f7fff8 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -78,41 +78,43 @@ All Configurations in Environment Variables All environment variables as below, and you could modify their value via `Bash `_ or `Python OS Module `_ -+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| Variable Section | Variable Name | description | -+==================+====================================+====================================================================================================================+ -| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. | -+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. | -+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. | -+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+ -| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. | -+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+ ++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| Variable Section | Variable Name | description | ++==================+====================================+=====================================================================================================================+ +| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. | ++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. | ++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. | ++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+ +| | ``PYDS_WORKFLOW_EXECUTION_TYPE`` | Default workflow execution type, will use its value when workflow does not specify the attribute ``execution_type``.| ++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+ .. note:: diff --git a/src/pydolphinscheduler/configuration.py b/src/pydolphinscheduler/configuration.py index 860f986..2f0c2c0 100644 --- a/src/pydolphinscheduler/configuration.py +++ b/src/pydolphinscheduler/configuration.py @@ -189,5 +189,8 @@ def get_bool(val: Any) -> bool: WORKFLOW_WARNING_TYPE = os.environ.get( "PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type") ) +WORKFLOW_EXECUTION_TYPE = os.environ.get( + "PYDS_WORKFLOW_EXECUTION_TYPE", configs.get("default.workflow.execution_type") +) # End Common Configuration Setting diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/process_definition.py index 62de7ed..3487435 100644 --- a/src/pydolphinscheduler/core/process_definition.py +++ b/src/pydolphinscheduler/core/process_definition.py @@ -57,6 +57,20 @@ class ProcessDefinition(Base): TODO: maybe we should rename this class, currently use DS object name. + :param execution_type: Decision which behavior to run when process definition have multiple instances. + when process definition schedule interval is too short, it may cause multiple instances run at the + same time. We can use this parameter to control the behavior about how to run those process definition + instances. Currently we have four execution type: + + * ``PARALLEL``: Default value, all instances will allow to run even though the previous + instance is not finished. + * ``SERIAL_WAIT``: All instance will wait for the previous instance to finish, and all + the waiting instances will be executed base on scheduling order. + * ``SERIAL_DISCARD``: All instances will be discard(abandon) if the previous instance is not + finished. + * ``SERIAL_PRIORITY``: means the all instance will wait for the previous instance to finish, and + all the waiting instances will be executed base on process definition priority order. + :param user: The user for current process definition. Will create a new one if it do not exists. If your parameter ``project`` already exists but project's create do not belongs to ``user``, will grant ``project`` to ``user`` automatically. @@ -86,6 +100,7 @@ class ProcessDefinition(Base): "worker_group", "warning_type", "warning_group_id", + "execution_type", "timeout", "release_state", "param", @@ -109,6 +124,7 @@ def __init__( worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE, warning_group_id: Optional[int] = 0, + execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE, timeout: Optional[int] = 0, release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE, param: Optional[Dict] = None, @@ -132,6 +148,17 @@ def __init__( else: self.warning_type = warning_type.strip().upper() self.warning_group_id = warning_group_id + if execution_type is None or execution_type.strip().upper() not in ( + "PARALLEL", + "SERIAL_WAIT", + "SERIAL_DISCARD", + "SERIAL_PRIORITY", + ): + raise PyDSParamException( + "Parameter `execution_type` with unexpect value `%s`", execution_type + ) + else: + self._execution_type = execution_type self.timeout = timeout self._release_state = release_state self.param = param @@ -225,6 +252,16 @@ def release_state(self, val: str) -> None: """Set attribute release_state.""" self._release_state = val.lower() + @property + def execution_type(self) -> str: + """Get attribute execution_type.""" + return self._execution_type.upper() + + @execution_type.setter + def execution_type(self, val: str) -> None: + """Set attribute execution_type.""" + self._execution_type = val + @property def param_json(self) -> Optional[List[Dict]]: """Return param json base on self.param.""" @@ -390,6 +427,7 @@ def submit(self) -> int: json.dumps(self.param_json), self.warning_type, self.warning_group_id, + self.execution_type, self.timeout, self.worker_group, self._tenant, @@ -399,7 +437,6 @@ def submit(self) -> int: json.dumps(self.task_definition_json), json.dumps(self.schedule_json) if self.schedule_json else None, None, - None, ) if len(self.resource_list) > 0: for res in self.resource_list: diff --git a/src/pydolphinscheduler/default_config.yaml b/src/pydolphinscheduler/default_config.yaml index 98d7b99..5ad3064 100644 --- a/src/pydolphinscheduler/default_config.yaml +++ b/src/pydolphinscheduler/default_config.yaml @@ -56,3 +56,6 @@ default: # change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are # ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL`` warning_type: NONE + # Default execution type about how to run multiple workflow instances, default value is ``parallel`` which + # mean run all workflow instances parallel and the other value is ``SERIAL_WAIT``, ``SERIAL_DISCARD``, ``SERIAL_PRIORITY`` + execution_type: parallel diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py index 54bb0a3..cd03d32 100644 --- a/src/pydolphinscheduler/java_gateway.py +++ b/src/pydolphinscheduler/java_gateway.py @@ -254,6 +254,7 @@ def create_or_update_process_definition( global_params: str, warning_type: str, warning_group_id: int, + execution_type: str, timeout: int, worker_group: str, tenant_code: str, @@ -262,7 +263,6 @@ def create_or_update_process_definition( task_definition_json: str, schedule: Optional[str] = None, other_params_json: Optional[str] = None, - execution_type: Optional[str] = None, ): """Create or update process definition through java gateway.""" return self.java_gateway.entry_point.createOrUpdateProcessDefinition( diff --git a/tests/core/test_process_definition.py b/tests/core/test_process_definition.py index 30445bf..c8fffc2 100644 --- a/tests/core/test_process_definition.py +++ b/tests/core/test_process_definition.py @@ -67,6 +67,7 @@ def test_process_definition_key_attr(func): ("worker_group", configuration.WORKFLOW_WORKER_GROUP), ("warning_type", configuration.WORKFLOW_WARNING_TYPE), ("warning_group_id", 0), + ("execution_type", configuration.WORKFLOW_EXECUTION_TYPE.upper()), ("release_state", 1), ], ) @@ -89,6 +90,7 @@ def test_process_definition_default_value(name, value): ("worker_group", str, "worker_group"), ("warning_type", str, "FAILURE"), ("warning_group_id", int, 1), + ("execution_type", str, "PARALLEL"), ("timeout", int, 1), ("param", dict, {"key": "value"}), ( @@ -210,6 +212,22 @@ def test_warn_type_not_support_type(val: str): ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, warning_type=val) +@pytest.mark.parametrize( + "val", + [ + "ALLL", + "", + None, + ], +) +def test_execute_type_not_support_type(val: str): + """Test process definition param execute_type not support type error.""" + with pytest.raises( + PyDSParamException, match="Parameter `execution_type` with unexpect value.*?" + ): + ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, execution_type=val) + + @pytest.mark.parametrize( "param, expect", [ @@ -321,6 +339,7 @@ def test_process_definition_get_define_without_task(): "workerGroup": configuration.WORKFLOW_WORKER_GROUP, "warningType": configuration.WORKFLOW_WARNING_TYPE, "warningGroupId": 0, + "executionType": "PARALLEL", "timeout": 0, "releaseState": 1, "param": None, diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py index ad3aaf7..3abdda6 100644 --- a/tests/utils/test_yaml_parser.py +++ b/tests/utils/test_yaml_parser.py @@ -63,6 +63,7 @@ "default.workflow.release_state": ("online", "offline"), "default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"), "default.workflow.warning_type": ("NONE", "SUCCESS"), + "default.workflow.execution_type": ("parallel", "serial_wait"), }, ]