Skip to content

Commit

Permalink
[feat] Add execute type to workflow (#9)
Browse files Browse the repository at this point in the history
Up to now, we can only submit workflow with parallel
mode. this patch give users ability specific execute
type to workflow
  • Loading branch information
zhongjiajie authored Nov 12, 2022
1 parent 871e8f3 commit 20b2a2b
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 37 deletions.
32 changes: 32 additions & 0 deletions docs/source/concept.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,38 @@ Tenant is the user who run task command in machine or in virtual machine. it cou

Make should tenant exists in target machine, otherwise it will raise an error when you try to run command

Execution Type
~~~~~~~~~~~~~~

Decision which behavior to run when process definition have multiple instances. when process definition
schedule interval is too short, it may cause multiple instances run at the same time. We can use this
parameter to control the behavior about how to run those process definition instances. Currently we
have four execution type:

* ``parallel`` (default value): it means all instances will allow to run even though the previous
instance is not finished.
* ``serial_wait``: it means the all instance will wait for the previous instance to finish, and
all the waiting instances will be executed base on scheduling order.
* ``serial_discard``: it means the all instance will be discard(abandon) if the previous instance
is not finished.
* ``serial_priority``: it means the all instance will wait for the previous instance to finish,
and all the waiting instances will be executed base on process definition priority order.

Parameter ``execution type`` can be set in

* Direct assign statement. You can pick execute type from above and direct assign to parameter
``execution_type``.

.. code-block:: python
pd = ProcessDefinition(
name="process-definition",
execution_type="parallel"
)
* Via environment variables, configurations file setting, for more detail about those way setting, you can see
you can read :doc:`config` section.

Tasks
-----

Expand Down
72 changes: 37 additions & 35 deletions docs/source/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,41 +78,43 @@ All Configurations in Environment Variables

All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_

+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Variable Section | Variable Name | description |
+==================+====================================+====================================================================================================================+
| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
+------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| Variable Section | Variable Name | description |
+==================+====================================+=====================================================================================================================+
| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
+------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
+------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
+ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
| | ``PYDS_WORKFLOW_EXECUTION_TYPE`` | Default workflow execution type, will use its value when workflow does not specify the attribute ``execution_type``.|
+------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+

.. note::

Expand Down
3 changes: 3 additions & 0 deletions src/pydolphinscheduler/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,8 @@ def get_bool(val: Any) -> bool:
WORKFLOW_WARNING_TYPE = os.environ.get(
"PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
)
WORKFLOW_EXECUTION_TYPE = os.environ.get(
"PYDS_WORKFLOW_EXECUTION_TYPE", configs.get("default.workflow.execution_type")
)

# End Common Configuration Setting
39 changes: 38 additions & 1 deletion src/pydolphinscheduler/core/process_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ class ProcessDefinition(Base):
TODO: maybe we should rename this class, currently use DS object name.
:param execution_type: Decision which behavior to run when process definition have multiple instances.
when process definition schedule interval is too short, it may cause multiple instances run at the
same time. We can use this parameter to control the behavior about how to run those process definition
instances. Currently we have four execution type:
* ``PARALLEL``: Default value, all instances will allow to run even though the previous
instance is not finished.
* ``SERIAL_WAIT``: All instance will wait for the previous instance to finish, and all
the waiting instances will be executed base on scheduling order.
* ``SERIAL_DISCARD``: All instances will be discard(abandon) if the previous instance is not
finished.
* ``SERIAL_PRIORITY``: means the all instance will wait for the previous instance to finish, and
all the waiting instances will be executed base on process definition priority order.
:param user: The user for current process definition. Will create a new one if it do not exists. If your
parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
``project`` to ``user`` automatically.
Expand Down Expand Up @@ -86,6 +100,7 @@ class ProcessDefinition(Base):
"worker_group",
"warning_type",
"warning_group_id",
"execution_type",
"timeout",
"release_state",
"param",
Expand All @@ -109,6 +124,7 @@ def __init__(
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0,
execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
Expand All @@ -132,6 +148,17 @@ def __init__(
else:
self.warning_type = warning_type.strip().upper()
self.warning_group_id = warning_group_id
if execution_type is None or execution_type.strip().upper() not in (
"PARALLEL",
"SERIAL_WAIT",
"SERIAL_DISCARD",
"SERIAL_PRIORITY",
):
raise PyDSParamException(
"Parameter `execution_type` with unexpect value `%s`", execution_type
)
else:
self._execution_type = execution_type
self.timeout = timeout
self._release_state = release_state
self.param = param
Expand Down Expand Up @@ -225,6 +252,16 @@ def release_state(self, val: str) -> None:
"""Set attribute release_state."""
self._release_state = val.lower()

@property
def execution_type(self) -> str:
"""Get attribute execution_type."""
return self._execution_type.upper()

@execution_type.setter
def execution_type(self, val: str) -> None:
"""Set attribute execution_type."""
self._execution_type = val

@property
def param_json(self) -> Optional[List[Dict]]:
"""Return param json base on self.param."""
Expand Down Expand Up @@ -390,6 +427,7 @@ def submit(self) -> int:
json.dumps(self.param_json),
self.warning_type,
self.warning_group_id,
self.execution_type,
self.timeout,
self.worker_group,
self._tenant,
Expand All @@ -399,7 +437,6 @@ def submit(self) -> int:
json.dumps(self.task_definition_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
None,
None,
)
if len(self.resource_list) > 0:
for res in self.resource_list:
Expand Down
3 changes: 3 additions & 0 deletions src/pydolphinscheduler/default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,6 @@ default:
# change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are
# ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL``
warning_type: NONE
# Default execution type about how to run multiple workflow instances, default value is ``parallel`` which
# mean run all workflow instances parallel and the other value is ``SERIAL_WAIT``, ``SERIAL_DISCARD``, ``SERIAL_PRIORITY``
execution_type: parallel
2 changes: 1 addition & 1 deletion src/pydolphinscheduler/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def create_or_update_process_definition(
global_params: str,
warning_type: str,
warning_group_id: int,
execution_type: str,
timeout: int,
worker_group: str,
tenant_code: str,
Expand All @@ -262,7 +263,6 @@ def create_or_update_process_definition(
task_definition_json: str,
schedule: Optional[str] = None,
other_params_json: Optional[str] = None,
execution_type: Optional[str] = None,
):
"""Create or update process definition through java gateway."""
return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
Expand Down
Loading

0 comments on commit 20b2a2b

Please sign in to comment.