Skip to content

Commit

Permalink
feat(engine): adds asynchronous message correlation API
Browse files Browse the repository at this point in the history
* adds RuntimeService method for asynchronous message correlation to multiple
  process instances
* adds asynchronous message correlation builder
* enables `executionsOnly` message correlation internally (no public API)
* adds message correlation batch resources (job handler, batch type,
  permissions, JSON converter)
* enables the batch jobs to be scheduled exclusively by the job executor
  by adding a process instance id to the jobs if they correlate to exactly
  one process instance
* refactors runWithoutAuthorization to accept a command as well

related to CAM-13863, CAM-13819
  • Loading branch information
tmetzke committed Sep 13, 2021
1 parent 26e047a commit 03c5e73
Show file tree
Hide file tree
Showing 46 changed files with 1,530 additions and 266 deletions.
12 changes: 12 additions & 0 deletions engine/src/main/java/org/camunda/bpm/engine/RuntimeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.camunda.bpm.engine.runtime.ExecutionQuery;
import org.camunda.bpm.engine.runtime.Incident;
import org.camunda.bpm.engine.runtime.IncidentQuery;
import org.camunda.bpm.engine.runtime.MessageCorrelationAsyncBuilder;
import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
import org.camunda.bpm.engine.runtime.ModificationBuilder;
import org.camunda.bpm.engine.runtime.NativeExecutionQuery;
Expand Down Expand Up @@ -2169,6 +2170,17 @@ Batch setVariablesAsync(List<String> processInstanceIds,
*/
void correlateMessage(String messageName, String businessKey, Map<String, Object> correlationKeys, Map<String, Object> processVariables);

/**
* Define a complex asynchronous message correlation using a fluent builder.
*
* @param messageName the name of the message. Corresponds to the 'name' element
* of the message defined in BPMN 2.0 Xml.
* Can be null to correlate by other criteria only.
*
* @return the fluent builder for defining the asynchronous message correlation.
*/
MessageCorrelationAsyncBuilder createMessageCorrelationAsync(String messageName);

/**
* Define a modification of a process instance in terms of activity cancellations
* and instantiations via a fluent builder. Instructions are executed in the order they are specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ public enum BatchPermissions implements Permission {
CREATE_BATCH_SET_REMOVAL_TIME("CREATE_BATCH_SET_REMOVAL_TIME", 65536),

/** Indicates that CREATE_BATCH_SET_VARIABLES interactions are permitted */
CREATE_BATCH_SET_VARIABLES("CREATE_BATCH_SET_VARIABLES", 131_072);
CREATE_BATCH_SET_VARIABLES("CREATE_BATCH_SET_VARIABLES", 131_072),

/** Indicates that CREATE_BATCH_CORRELATE_MESSAGE interactions are permitted */
CREATE_BATCH_CORRELATE_MESSAGE("CREATE_BATCH_CORRELATE_MESSAGE", 262_144);

protected static final Resource[] RESOURCES = new Resource[] { Resources.BATCH };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public interface Batch {
String TYPE_DECISION_SET_REMOVAL_TIME = "decision-set-removal-time";
String TYPE_BATCH_SET_REMOVAL_TIME = "batch-set-removal-time";
String TYPE_SET_VARIABLES = "set-variables";
String TYPE_CORRELATE_MESSAGE = "correlate-message";

/**
* @return the id of the batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,74 +66,76 @@ public interface UserOperationLogEntry {

/** @deprecated Please use {@link EntityTypes#TASK} instead. */
@Deprecated
public static String ENTITY_TYPE_TASK = EntityTypes.TASK;
String ENTITY_TYPE_TASK = EntityTypes.TASK;
/** @deprecated Please use {@link EntityTypes#IDENTITY_LINK} instead. */
@Deprecated
public static String ENTITY_TYPE_IDENTITY_LINK = EntityTypes.IDENTITY_LINK;
String ENTITY_TYPE_IDENTITY_LINK = EntityTypes.IDENTITY_LINK;
/** @deprecated Please use {@link EntityTypes#ATTACHMENT} instead. */
@Deprecated
public static String ENTITY_TYPE_ATTACHMENT = EntityTypes.ATTACHMENT;

public static String OPERATION_TYPE_ASSIGN = "Assign";
public static String OPERATION_TYPE_CLAIM = "Claim";
public static String OPERATION_TYPE_COMPLETE = "Complete";
public static String OPERATION_TYPE_CREATE = "Create";
public static String OPERATION_TYPE_DELEGATE = "Delegate";
public static String OPERATION_TYPE_DELETE = "Delete";
public static String OPERATION_TYPE_RESOLVE = "Resolve";
public static String OPERATION_TYPE_SET_OWNER = "SetOwner";
public static String OPERATION_TYPE_SET_PRIORITY = "SetPriority";
public static String OPERATION_TYPE_UPDATE = "Update";
public static String OPERATION_TYPE_ACTIVATE = "Activate";
public static String OPERATION_TYPE_SUSPEND = "Suspend";
public static String OPERATION_TYPE_MIGRATE = "Migrate";
public static String OPERATION_TYPE_ADD_USER_LINK = "AddUserLink";
public static String OPERATION_TYPE_DELETE_USER_LINK = "DeleteUserLink";
public static String OPERATION_TYPE_ADD_GROUP_LINK = "AddGroupLink";
public static String OPERATION_TYPE_DELETE_GROUP_LINK = "DeleteGroupLink";
public static String OPERATION_TYPE_SET_DUEDATE = "SetDueDate";
public static String OPERATION_TYPE_RECALC_DUEDATE = "RecalculateDueDate";
public static String OPERATION_TYPE_UNLOCK = "Unlock";
public static String OPERATION_TYPE_EXECUTE = "Execute";
public static String OPERATION_TYPE_EVALUATE = "Evaluate";

public static String OPERATION_TYPE_ADD_ATTACHMENT = "AddAttachment";
public static String OPERATION_TYPE_DELETE_ATTACHMENT = "DeleteAttachment";

public static String OPERATION_TYPE_SUSPEND_JOB_DEFINITION = "SuspendJobDefinition";
public static String OPERATION_TYPE_ACTIVATE_JOB_DEFINITION = "ActivateJobDefinition";
public static String OPERATION_TYPE_SUSPEND_PROCESS_DEFINITION = "SuspendProcessDefinition";
public static String OPERATION_TYPE_ACTIVATE_PROCESS_DEFINITION = "ActivateProcessDefinition";

public static String OPERATION_TYPE_CREATE_HISTORY_CLEANUP_JOB = "CreateHistoryCleanupJobs";
public static String OPERATION_TYPE_UPDATE_HISTORY_TIME_TO_LIVE = "UpdateHistoryTimeToLive";
public static String OPERATION_TYPE_DELETE_HISTORY = "DeleteHistory";

public static String OPERATION_TYPE_MODIFY_PROCESS_INSTANCE = "ModifyProcessInstance";
public static String OPERATION_TYPE_RESTART_PROCESS_INSTANCE = "RestartProcessInstance";
public static String OPERATION_TYPE_SUSPEND_JOB = "SuspendJob";
public static String OPERATION_TYPE_ACTIVATE_JOB = "ActivateJob";
public static String OPERATION_TYPE_SET_JOB_RETRIES = "SetJobRetries";
public static String OPERATION_TYPE_SET_EXTERNAL_TASK_RETRIES = "SetExternalTaskRetries";
public static String OPERATION_TYPE_SET_VARIABLE = "SetVariable";
String ENTITY_TYPE_ATTACHMENT = EntityTypes.ATTACHMENT;

String OPERATION_TYPE_ASSIGN = "Assign";
String OPERATION_TYPE_CLAIM = "Claim";
String OPERATION_TYPE_COMPLETE = "Complete";
String OPERATION_TYPE_CREATE = "Create";
String OPERATION_TYPE_DELEGATE = "Delegate";
String OPERATION_TYPE_DELETE = "Delete";
String OPERATION_TYPE_RESOLVE = "Resolve";
String OPERATION_TYPE_SET_OWNER = "SetOwner";
String OPERATION_TYPE_SET_PRIORITY = "SetPriority";
String OPERATION_TYPE_UPDATE = "Update";
String OPERATION_TYPE_ACTIVATE = "Activate";
String OPERATION_TYPE_SUSPEND = "Suspend";
String OPERATION_TYPE_MIGRATE = "Migrate";
String OPERATION_TYPE_ADD_USER_LINK = "AddUserLink";
String OPERATION_TYPE_DELETE_USER_LINK = "DeleteUserLink";
String OPERATION_TYPE_ADD_GROUP_LINK = "AddGroupLink";
String OPERATION_TYPE_DELETE_GROUP_LINK = "DeleteGroupLink";
String OPERATION_TYPE_SET_DUEDATE = "SetDueDate";
String OPERATION_TYPE_RECALC_DUEDATE = "RecalculateDueDate";
String OPERATION_TYPE_UNLOCK = "Unlock";
String OPERATION_TYPE_EXECUTE = "Execute";
String OPERATION_TYPE_EVALUATE = "Evaluate";

String OPERATION_TYPE_ADD_ATTACHMENT = "AddAttachment";
String OPERATION_TYPE_DELETE_ATTACHMENT = "DeleteAttachment";

String OPERATION_TYPE_SUSPEND_JOB_DEFINITION = "SuspendJobDefinition";
String OPERATION_TYPE_ACTIVATE_JOB_DEFINITION = "ActivateJobDefinition";
String OPERATION_TYPE_SUSPEND_PROCESS_DEFINITION = "SuspendProcessDefinition";
String OPERATION_TYPE_ACTIVATE_PROCESS_DEFINITION = "ActivateProcessDefinition";

String OPERATION_TYPE_CREATE_HISTORY_CLEANUP_JOB = "CreateHistoryCleanupJobs";
String OPERATION_TYPE_UPDATE_HISTORY_TIME_TO_LIVE = "UpdateHistoryTimeToLive";
String OPERATION_TYPE_DELETE_HISTORY = "DeleteHistory";

String OPERATION_TYPE_MODIFY_PROCESS_INSTANCE = "ModifyProcessInstance";
String OPERATION_TYPE_RESTART_PROCESS_INSTANCE = "RestartProcessInstance";
String OPERATION_TYPE_SUSPEND_JOB = "SuspendJob";
String OPERATION_TYPE_ACTIVATE_JOB = "ActivateJob";
String OPERATION_TYPE_SET_JOB_RETRIES = "SetJobRetries";
String OPERATION_TYPE_SET_EXTERNAL_TASK_RETRIES = "SetExternalTaskRetries";
String OPERATION_TYPE_SET_VARIABLE = "SetVariable";
String OPERATION_TYPE_SET_VARIABLES = "SetVariables";

public static String OPERATION_TYPE_REMOVE_VARIABLE = "RemoveVariable";
public static String OPERATION_TYPE_MODIFY_VARIABLE = "ModifyVariable";
String OPERATION_TYPE_REMOVE_VARIABLE = "RemoveVariable";
String OPERATION_TYPE_MODIFY_VARIABLE = "ModifyVariable";

public static String OPERATION_TYPE_SUSPEND_BATCH = "SuspendBatch";
public static String OPERATION_TYPE_ACTIVATE_BATCH = "ActivateBatch";

public static String OPERATION_TYPE_CREATE_INCIDENT = "CreateIncident";
String OPERATION_TYPE_SUSPEND_BATCH = "SuspendBatch";
String OPERATION_TYPE_ACTIVATE_BATCH = "ActivateBatch";

public static String OPERATION_TYPE_SET_REMOVAL_TIME = "SetRemovalTime";
String OPERATION_TYPE_CREATE_INCIDENT = "CreateIncident";

String OPERATION_TYPE_SET_REMOVAL_TIME = "SetRemovalTime";

String OPERATION_TYPE_SET_ANNOTATION = "SetAnnotation";
String OPERATION_TYPE_CLEAR_ANNOTATION = "ClearAnnotation";

public static String CATEGORY_ADMIN = "Admin";
public static String CATEGORY_OPERATOR = "Operator";
public static String CATEGORY_TASK_WORKER = "TaskWorker";
String OPERATION_TYPE_CORRELATE_MESSAGE = "CorrelateMessage";

String CATEGORY_ADMIN = "Admin";
String CATEGORY_OPERATOR = "Operator";
String CATEGORY_TASK_WORKER = "TaskWorker";

/** The unique identifier of this log entry. */
String getId();
Expand Down Expand Up @@ -192,7 +194,7 @@ public interface UserOperationLogEntry {
* created with a common operationId. This allows grouping multiple entries which are part of a composite operation.
*/
String getOperationId();

/** External task reference. */
String getExternalTaskId();

Expand Down Expand Up @@ -221,7 +223,7 @@ public interface UserOperationLogEntry {

/** The time the historic user operation log will be removed. */
Date getRemovalTime();

/** The category this entry is associated with */
String getCategory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.camunda.bpm.engine.impl;

import java.util.concurrent.Callable;

import org.camunda.bpm.engine.BadUserRequestException;
import org.camunda.bpm.engine.form.FormData;
import org.camunda.bpm.engine.impl.cfg.CommandChecker;
Expand All @@ -42,13 +40,7 @@ public GetDeployedTaskFormCmd(String taskId) {

@Override
protected FormData getFormData() {
return commandContext.runWithoutAuthorization(new Callable<FormData>() {

@Override
public FormData call() throws Exception {
return new GetTaskFormCmd(taskId).execute(commandContext);
}
});
return commandContext.runWithoutAuthorization(new GetTaskFormCmd(taskId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. Camunda licenses this file to you under the Apache License,
* Version 2.0; you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.camunda.bpm.engine.impl;

import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureNotNull;

import java.util.List;
import java.util.Map;

import org.camunda.bpm.engine.batch.Batch;
import org.camunda.bpm.engine.history.HistoricProcessInstanceQuery;
import org.camunda.bpm.engine.impl.cmd.batch.CorrelateAllMessageBatchCmd;
import org.camunda.bpm.engine.impl.interceptor.CommandExecutor;
import org.camunda.bpm.engine.runtime.MessageCorrelationAsyncBuilder;
import org.camunda.bpm.engine.runtime.ProcessInstanceQuery;
import org.camunda.bpm.engine.variable.impl.VariableMapImpl;

public class MessageCorrelationAsyncBuilderImpl implements MessageCorrelationAsyncBuilder {

protected CommandExecutor commandExecutor;

protected String messageName;
protected Map<String, Object> payloadProcessInstanceVariables;

protected List<String> processInstanceIds;
protected ProcessInstanceQuery processInstanceQuery;
protected HistoricProcessInstanceQuery historicProcessInstanceQuery;

public MessageCorrelationAsyncBuilderImpl(CommandExecutor commandExecutor, String messageName) {
this(messageName);
ensureNotNull("commandExecutor", commandExecutor);
this.commandExecutor = commandExecutor;
}

private MessageCorrelationAsyncBuilderImpl(String messageName) {
this.messageName = messageName;
}

public MessageCorrelationAsyncBuilder processInstanceIds(List<String> ids) {
ensureNotNull("processInstanceIds", ids);
this.processInstanceIds = ids;
return this;
}

@Override
public MessageCorrelationAsyncBuilder processInstanceQuery(ProcessInstanceQuery processInstanceQuery) {
ensureNotNull("processInstanceQuery", processInstanceQuery);
this.processInstanceQuery = processInstanceQuery;
return this;
}

@Override
public MessageCorrelationAsyncBuilder historicProcessInstanceQuery(HistoricProcessInstanceQuery historicProcessInstanceQuery) {
ensureNotNull("historicProcessInstanceQuery", historicProcessInstanceQuery);
this.historicProcessInstanceQuery = historicProcessInstanceQuery;
return this;
}

public MessageCorrelationAsyncBuilder setVariable(String variableName, Object variableValue) {
ensureNotNull("variableName", variableName);
ensurePayloadProcessInstanceVariablesInitialized();
payloadProcessInstanceVariables.put(variableName, variableValue);
return this;
}

public MessageCorrelationAsyncBuilder setVariables(Map<String, Object> variables) {
if (variables != null) {
ensurePayloadProcessInstanceVariablesInitialized();
payloadProcessInstanceVariables.putAll(variables);
}
return this;
}

protected void ensurePayloadProcessInstanceVariablesInitialized() {
if (payloadProcessInstanceVariables == null) {
payloadProcessInstanceVariables = new VariableMapImpl();
}
}

@Override
public Batch correlateAllAsync() {
return commandExecutor.execute(new CorrelateAllMessageBatchCmd(this));
}

// getters //////////////////////////////////

public CommandExecutor getCommandExecutor() {
return commandExecutor;
}

public String getMessageName() {
return messageName;
}

public List<String> getProcessInstanceIds() {
return processInstanceIds;
}

public ProcessInstanceQuery getProcessInstanceQuery() {
return processInstanceQuery;
}

public HistoricProcessInstanceQuery getHistoricProcessInstanceQuery() {
return historicProcessInstanceQuery;
}

public Map<String, Object> getPayloadProcessInstanceVariables() {
return payloadProcessInstanceVariables;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package org.camunda.bpm.engine.impl;

import java.util.Arrays;
import java.util.List;
import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureNotNull;
import static org.camunda.bpm.engine.impl.util.EnsureUtil.ensureFalse;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.camunda.bpm.engine.impl.cmd.CommandLogger;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class MessageCorrelationBuilderImpl implements MessageCorrelationBuilder
protected boolean isTenantIdSet = false;

protected boolean startMessagesOnly = false;
protected boolean executionsOnly = false;

public MessageCorrelationBuilderImpl(CommandExecutor commandExecutor, String messageName) {
this(messageName);
Expand Down Expand Up @@ -203,10 +205,17 @@ public MessageCorrelationBuilder withoutTenantId() {

@Override
public MessageCorrelationBuilder startMessageOnly() {
ensureFalse("Either startMessageOnly or executionsOnly can be set", executionsOnly);
startMessagesOnly = true;
return this;
}

public MessageCorrelationBuilder executionsOnly() {
ensureFalse("Either startMessageOnly or executionsOnly can be set", startMessagesOnly);
executionsOnly = true;
return this;
}

@Override
public void correlate() {
correlateWithResult();
Expand Down Expand Up @@ -370,4 +379,8 @@ public boolean isTenantIdSet() {
return isTenantIdSet;
}

public boolean isExecutionsOnly() {
return executionsOnly;
}

}
Loading

0 comments on commit 03c5e73

Please sign in to comment.