Skip to content

Commit

Permalink
Merge branch 'release/0.1.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Sep 30, 2024
2 parents dacb0ce + e2f6dff commit f9cb56a
Show file tree
Hide file tree
Showing 30 changed files with 259 additions and 383 deletions.
2 changes: 1 addition & 1 deletion api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>

<artifactId>process-engine-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
</parent>

<artifactId>process-engine-api-bom</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion engine-adapter/adapter-commons-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion engine-adapter/adapter-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
7 changes: 6 additions & 1 deletion engine-adapter/adapter-testing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -52,6 +52,11 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<scope>compile</scope>
</dependency>


<!-- For JGiven Spring -->
Expand Down
2 changes: 1 addition & 1 deletion engine-adapter/camunda-platform-7-embedded-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion engine-adapter/camunda-platform-7-remote-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
20 changes: 18 additions & 2 deletions engine-adapter/camunda-platform-8-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
<parent>
<groupId>dev.bpm-crafters.process-engine-api</groupId>
<artifactId>process-engine-api-root</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>process-engine-api-adapter-camunda-platform-c8-core</artifactId>
<name>Adapter: C8 Core</name>
<properties>
<zeebe-client-java.version>8.5.3</zeebe-client-java.version> <!-- camunda-tasklist-client-java doesn't support 8.5.6 currently -->
<zeebe-client-java.version>8.5.7</zeebe-client-java.version>
<camunda-tasklist-client-java.version>8.5.3.6</camunda-tasklist-client-java.version>
</properties>

Expand All @@ -34,11 +34,27 @@
<version>${zeebe-client-java.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-protocol</artifactId>
<version>${zeebe-client-java.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>camunda-tasklist-client-java</artifactId>
<version>${camunda-tasklist-client-java.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>io.camunda.spring</groupId>
<artifactId>java-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.camunda</groupId>
<artifactId>zeebe-client-java</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@ class PullUserTaskDelivery(
}

private fun TaskSubscriptionHandle.matches(task: Task): Boolean =
this.taskType == TaskType.USER && (this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionId)
this.taskType == TaskType.USER
&& (this.taskDescriptionKey == null || this.taskDescriptionKey == task.taskDefinitionId)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.bpmcrafters.processengineapi.adapter.c8.task.delivery

import dev.bpmcrafters.processengineapi.CommonRestrictions
import dev.bpmcrafters.processengineapi.adapter.c8.task.SubscribingUserTaskDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.RefreshableDelivery
import dev.bpmcrafters.processengineapi.adapter.commons.task.SubscriptionRepository
Expand All @@ -8,12 +9,16 @@ import dev.bpmcrafters.processengineapi.adapter.commons.task.filterBySubscriptio
import dev.bpmcrafters.processengineapi.task.TaskSubscription
import dev.bpmcrafters.processengineapi.task.TaskType
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.command.ActivateJobsCommandStep1.ActivateJobsCommandStep3
import io.camunda.zeebe.client.api.command.ClientStatusException
import io.camunda.zeebe.client.api.command.StreamJobsCommandStep1.StreamJobsCommandStep3
import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.JobWorker
import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1.JobWorkerBuilderStep3
import io.camunda.zeebe.protocol.Protocol
import io.grpc.Status
import mu.KLogging
import java.time.Duration

class SubscribingRefreshingUserTaskDelivery(
private val zeebeClient: ZeebeClient,
Expand All @@ -22,10 +27,7 @@ class SubscribingRefreshingUserTaskDelivery(
private val userTaskLockTimeoutMs: Long,
) : SubscribingUserTaskDelivery, RefreshableDelivery {

companion object : KLogging() {
const val ZEEBE_USER_TASK = "io.camunda.zeebe:userTask"
const val TIMEOUT_FACTOR = 2L
}
companion object : KLogging()

private var jobWorkerRegistry: Map<String, JobWorker> = emptyMap()

Expand All @@ -37,51 +39,55 @@ class SubscribingRefreshingUserTaskDelivery(
.filter { it.taskType == TaskType.USER }
.forEach { activeSubscription ->
// this is a job to subscribe to.
zeebeClient
val subscribedJobWorker = zeebeClient
.newWorker()
.jobType(ZEEBE_USER_TASK)
.handler { client, job ->
if (activeSubscription.matches(job)) {
subscriptionRepository.activateSubscriptionForTask("${job.key}", activeSubscription)
val variables = job.variablesAsMap.filterBySubscription(activeSubscription)
try {
logger.debug { "PROCESS-ENGINE-C8-041: Delivering user task ${job.key}." }
activeSubscription.action.accept(job.toTaskInformation(), variables)
logger.debug { "PROCESS-ENGINE-C8-042: Successfully delivered user task ${job.key}." }
} catch (e: Exception) {
logger.error { "PROCESS-ENGINE-C8-043: Failed to deliver user task ${job.key}: ${e.message}" }
client
.newFailCommand(job.key)
.retries(job.retries)
.send()
.join() // could not deliver
subscriptionRepository.deactivateSubscriptionForTask(taskId = "${job.key}")
}
} else {
// put it back
logger.trace { "PROCESS-ENGINE-C8-044: Received user task ${job.key} not matching subscriptions, returning it." }
client
.newFailCommand(job.key)
.retries(job.retries + 1)
.send()
.join()
logger.trace { "PROCESS-ENGINE-C8-045: Successfully returned user task ${job.key} not matching subscriptions." }
}
}
.jobType(Protocol.USER_TASK_JOB_TYPE)
.handler { _, job -> consumeActivatedJob(activeSubscription, job, zeebeClient) }
.maxJobsActive(Integer.MAX_VALUE)
.name(workerId)
.timeout(userTaskLockTimeoutMs * TIMEOUT_FACTOR)
.timeout(userTaskLockTimeoutMs)
.streamEnabled(false)
.forSubscription(activeSubscription)
// FIXME -> metrics to setup
.open()
.let {
jobWorkerRegistry + (activeSubscription.taskDescriptionKey to it)
}

// add to registry, to be able to close worker and stop receiving updates on unsubscribe
jobWorkerRegistry + (activeSubscription.taskDescriptionKey to subscribedJobWorker)
}
} else {
logger.trace { "PROCESS-ENGINE-C8-046: not subscribing for user tasks, no active subscription found." }
}
}

private fun consumeActivatedJob(activeSubscription: TaskSubscriptionHandle, job: ActivatedJob, zeebeClient: ZeebeClient) {
if (activeSubscription.matches(job)) {
subscriptionRepository.activateSubscriptionForTask("${job.key}", activeSubscription)
val variables = job.variablesAsMap.filterBySubscription(activeSubscription)
try {
logger.debug { "PROCESS-ENGINE-C8-041: Delivering user task ${job.key}." }
activeSubscription.action.accept(job.toTaskInformation(), variables)
logger.debug { "PROCESS-ENGINE-C8-042: Successfully delivered user task ${job.key}." }
} catch (e: Exception) {
logger.error { "PROCESS-ENGINE-C8-043: Failed to deliver user task ${job.key}: ${e.message}" }
zeebeClient
.newFailCommand(job.key)
.retries(job.retries)
.send()
.join() // could not deliver
subscriptionRepository.deactivateSubscriptionForTask(taskId = "${job.key}")
}
} else {
// put it back
logger.trace { "PROCESS-ENGINE-C8-044: Received user task ${job.key} not matching subscriptions, returning it." }
zeebeClient
.newFailCommand(job.key)
.retries(job.retries + 1)
.send()
.join()
logger.trace { "PROCESS-ENGINE-C8-045: Successfully returned user task ${job.key} not matching subscriptions." }
}
}

override fun refresh() {
val subscriptions = subscriptionRepository.getDeliveredTaskIds(TaskType.USER)
logger.trace { "PROCESS-ENGINE-C8-047: refreshing user tasks for subscriptions: $subscriptions" }
Expand Down Expand Up @@ -130,16 +136,46 @@ class SubscribingRefreshingUserTaskDelivery(
/*
* Additional restrictions to check.
*/
@Suppress("UNUSED_PARAMETER")
private fun TaskSubscriptionHandle.matches(job: ActivatedJob): Boolean {
return this.taskType == TaskType.USER
&& (this.taskDescriptionKey == null || this.taskDescriptionKey == job.elementId)
&& this.restrictions.all {
when (it.key) {
CommonRestrictions.EXECUTION_ID -> it.value == "${job.elementInstanceKey}"
CommonRestrictions.ACTIVITY_ID -> it.value == job.elementId
CommonRestrictions.TENANT_ID -> it.value == job.tenantId
CommonRestrictions.PROCESS_DEFINITION_KEY -> it.value == job.bpmnProcessId
CommonRestrictions.PROCESS_DEFINITION_ID -> it.value == "${job.processDefinitionKey}"
CommonRestrictions.PROCESS_INSTANCE_ID -> it.value == "${job.processInstanceKey}"
else -> false
}
}
// job.customHeaders // FIXME: analyze this! make sure we reflect the subscription restrictions
}

private fun ActivateJobsCommandStep3.forSubscription(subscription: TaskSubscriptionHandle): ActivateJobsCommandStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this.fetchVariables(subscription.payloadDescription!!.toList())
} else {
this
}
}

private fun JobWorkerBuilderStep3.forSubscription(subscription: TaskSubscriptionHandle): JobWorkerBuilderStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this.fetchVariables(subscription.payloadDescription!!.toList())
} else {
this
}
}

private fun StreamJobsCommandStep3.forSubscription(subscription: TaskSubscriptionHandle): StreamJobsCommandStep3 {
// FIXME -> tenantId
// FIXME -> more to setup from props
return if (subscription.payloadDescription != null && subscription.payloadDescription!!.isNotEmpty()) {
this.fetchVariables(subscription.payloadDescription!!.toList())
} else {
Expand Down
Loading

0 comments on commit f9cb56a

Please sign in to comment.