Skip to content

Commit

Permalink
added spring boot process test
Browse files Browse the repository at this point in the history
  • Loading branch information
Luc Weinbrecht committed Jun 19, 2022
1 parent 5b5f81b commit 2186248
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The [BPMN process](assets/processes/loan_agreement.png) which start a [second pr

Configure your [Camunda Platform 8 SaaS](https://camunda.com/get-started/) to connect to using the [following properties](https://github.com/camunda-community-hub/spring-zeebe#configuring-camunda-platform-8-saas-connection) .

If you want to run [Zeebe local](https://docs.camunda.io/docs/self-managed/platform-deployment/kubernetes-helm/) you need to use the following properties ([source](https://github.com/camunda-community-hub/spring-zeebe#configuring-camunda-platform-8-saas-connection)):
If you want to run [Zeebe locally](https://docs.camunda.io/docs/self-managed/platform-deployment/kubernetes-helm/) you need to use the following properties ([source](https://github.com/camunda-community-hub/spring-zeebe#configuring-camunda-platform-8-saas-connection)):

```yaml
zeebe:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class RejectionLoanAgreement {

@ZeebeWorker(type = LOAN_REJECTION_TASK, fetchVariables = LOAN_AGREEMENT_NUMBER)
public void handleJobFoo(final JobClient client, final ActivatedJob job) {
Long loanAgreementNumber = (Long) job.getVariablesAsMap().get(LOAN_AGREEMENT_NUMBER);
Long loanAgreementNumber = ((Number) job.getVariablesAsMap().get(LOAN_AGREEMENT_NUMBER)).longValue();
loanAgreementStatusCommand.reject(new LoanAgreementNumber(loanAgreementNumber));

client.newCompleteCommand(job.getKey())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package de.weinbrecht.luc.bpm.architecture.loan.agreement;

import de.weinbrecht.luc.bpm.architecture.loan.agreement.domain.model.*;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.domain.model.recipient.MailAddress;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.domain.model.recipient.Name;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.domain.model.recipient.Recipient;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.usecase.in.LoanAgreementCreation;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.usecase.in.LoanAgreementStatusCommand;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.usecase.out.LoanAgreementCommand;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.usecase.out.LoanAgreementQuery;
import de.weinbrecht.luc.bpm.architecture.loan.agreement.usecase.out.RecommendationTrigger;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;

import java.util.HashMap;
import java.util.Map;

import static de.weinbrecht.luc.bpm.architecture.SpringProcessTestUtils.*;
import static de.weinbrecht.luc.bpm.architecture.loan.agreement.adapter.common.ProcessConstants.*;
import static io.camunda.zeebe.process.test.filters.RecordStream.of;
import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

// https://github.com/camunda-community-hub/camunda-8-examples/blob/main/twitter-review-java-springboot/src/test/java/org/camunda/community/examples/twitter/TestTwitterProcess.java
@SpringBootTest
@ZeebeSpringTest
@Disabled("Flaky if runs together with the non Spring Process Tests")
@MockBean({LoanAgreementCommand.class, LoanAgreementCreation.class})
class SpringProcessTest {

@Autowired
private ZeebeClient zeebe;

@Autowired
private ZeebeTestEngine zeebeTestEngine;

@MockBean
private LoanAgreementStatusCommand loanAgreementStatusCommand;

@MockBean
private RecommendationTrigger recommendationTrigger;

@MockBean
private LoanAgreementQuery loanAgreementQuery;

public static final String PROCESS_DEFINITION = "Loan_Agreement";
private static final String START_EVENT = "LoanAgreementReceivedStartEvent";
private static final String APPROVE_RULE_TASK = "ApproveAgreementRuleTask";
private static final String APPROVE_AGREEMENT_SERVICE_TASK = "ApproveLoanAgreementServiceTask";
private static final String SEND_CROSS_SELLING_EVENT = "SendCrossSellingEvent";
private static final String APPROVED_END_EVENT = "LoanAgreementApprovedEndEvent";

private static final String REJECT_AGREEMENT_SERVICE_TASK = "RejectLoanAgreementServiceTask";
private static final String NOT_APPROVED_END_EVENT = "LoanAgreementNotApprovedEndEvent";

@Test
void testRunsProcessHappyPath() throws Exception {
final LoanAgreementNumber loanAgreementNumber = new LoanAgreementNumber(1L);
final CaseId caseId = new CaseId("Test-A-B");
LoanAgreement loanAgreement = new LoanAgreement(
loanAgreementNumber,
new Recipient(
new CustomerNumber("Test-1"),
new Name("Tester"),
new MailAddress("tester@web.io")
),
new Amount(100)
);
when(loanAgreementQuery.loadByNumber(loanAgreementNumber)).thenReturn(loanAgreement);

Map<String, Object> processVariables = new HashMap<>();
processVariables.put(LOAN_AGREEMENT_NUMBER, loanAgreementNumber.getValue());
processVariables.put(BUSINESS_KEY, caseId.getValue());
zeebe.newPublishMessageCommand().messageName(LOAN_START_EVENT_MESSAGE_REF)
.correlationKey("")
.variables(processVariables)
.send()
.join();

hasPassedElement(START_EVENT, of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION);

zeebeTestEngine.waitForIdleState(ofSeconds(5));
hasPassedElement(APPROVE_RULE_TASK, of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION);

waitForTaskAndComplete(zeebeTestEngine, zeebe, APPROVE_AGREEMENT_SERVICE_TASK, LOAN_AGREEMENT_TASK);
zeebeTestEngine.waitForIdleState(ofSeconds(5));
verify(loanAgreementStatusCommand).accept(loanAgreementNumber);

waitForTaskAndComplete(zeebeTestEngine, zeebe, SEND_CROSS_SELLING_EVENT, SEND_CROSS_SELLING_RECOMMENDATION_TASK);
verify(recommendationTrigger).startLoanAgreement(caseId, loanAgreement);

hasPassedElement(APPROVED_END_EVENT, of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION);

assertTrue(isProcessInstanceCompleted(of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION));
}

@Test
void testRunsProcessExceptionalPath() throws Exception {
final LoanAgreementNumber loanAgreementNumber = new LoanAgreementNumber(6L);

zeebe.newPublishMessageCommand().messageName(LOAN_START_EVENT_MESSAGE_REF)
.correlationKey("")
.variables(singletonMap(LOAN_AGREEMENT_NUMBER, loanAgreementNumber.getValue()))
.send()
.join();
zeebeTestEngine.waitForIdleState(ofSeconds(5));

hasPassedElement(START_EVENT, of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION);

zeebeTestEngine.waitForIdleState(ofSeconds(5));
hasPassedElement(APPROVE_RULE_TASK, of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION);

waitForTaskAndComplete(zeebeTestEngine, zeebe, REJECT_AGREEMENT_SERVICE_TASK, LOAN_REJECTION_TASK);
verify(loanAgreementStatusCommand).reject(loanAgreementNumber);

zeebeTestEngine.waitForIdleState(ofSeconds(5));
hasPassedElement(NOT_APPROVED_END_EVENT, of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION);

assertTrue(isProcessInstanceCompleted(of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.StartRecommendation;
import io.camunda.zeebe.client.ZeebeClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.CUSTOMER_NUMBER;
import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.START_EVENT_MESSAGE_REF;
import static java.util.Collections.singletonMap;

@Slf4j
@RequiredArgsConstructor
@Component
public class ProcessEngineClient implements StartRecommendation {
Expand All @@ -18,6 +20,7 @@ public class ProcessEngineClient implements StartRecommendation {

@Override
public void start(String caseId, CustomerId customerId) {
log.info("Starting new recommendation for loan agreement with case ID {}", caseId);
client.newPublishMessageCommand()
.messageName(START_EVENT_MESSAGE_REF)
.correlationKey("")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package de.weinbrecht.luc.bpm.architecture.recommendation;

import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Content;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.ContentId;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Description;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.Recommendation;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.Customer;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.CustomerId;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.MailAddress;
import de.weinbrecht.luc.bpm.architecture.recommendation.domain.model.customer.Name;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.in.RecommendationCreation;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.in.RecommendationPicker;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.RecommendationQuery;
import de.weinbrecht.luc.bpm.architecture.recommendation.usecase.out.SendNotification;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.annotation.DirtiesContext;

import static de.weinbrecht.luc.bpm.architecture.SpringProcessTestUtils.isProcessInstanceCompleted;
import static de.weinbrecht.luc.bpm.architecture.SpringProcessTestUtils.waitForTaskAndComplete;
import static de.weinbrecht.luc.bpm.architecture.recommendation.adapter.common.ProcessConstants.*;
import static io.camunda.zeebe.process.test.filters.RecordStream.of;
import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_CLASS;

// https://github.com/camunda-community-hub/camunda-8-examples/blob/main/twitter-review-java-springboot/src/test/java/org/camunda/community/examples/twitter/TestTwitterProcess.java
@SpringBootTest
@ZeebeSpringTest
@Disabled("Flaky if runs together with the non Spring Process Tests")
@MockBean(RecommendationCreation.class)
class SpringProcessTest {

@Autowired
private ZeebeClient zeebe;

@Autowired
private ZeebeTestEngine zeebeTestEngine;

@MockBean
private RecommendationPicker recommendationPicker;

@MockBean
private RecommendationQuery recommendationQuery;

@MockBean
private SendNotification sendNotification;

private final ContentId contentId = new ContentId(1L);
private final CustomerId customerId = new CustomerId("Test-11");
private final Recommendation recommendation = new Recommendation(
new Customer(
customerId,
new Name("Tester"),
new MailAddress("tester@ewb.io")
),
new Content(contentId, new Description("Foo"))
);

public static final String PROCESS_DEFINITION = "Cross_Selling_Recommendation";

private static final String PICK_CONTENT_SERVICE_TASK = "PickContentServiceTask";
private static final String SEND_RECOMMENDATION_SERVICE_TASK = "SendRecommendationServiceTask";

@BeforeEach
void setUp() {
when(recommendationPicker.pickContent()).thenReturn(contentId);

when(recommendationQuery.findContentById(contentId)).thenReturn(recommendation.getContent());
when(recommendationQuery.findCustomerById(customerId)).thenReturn(recommendation.getCustomer());
}

@Test
void testRunsProcess() throws Exception {
zeebe.newPublishMessageCommand().messageName(START_EVENT_MESSAGE_REF)
.correlationKey("")
.variables(singletonMap(CUSTOMER_NUMBER, customerId.getValue()))
.send();

waitForTaskAndComplete(zeebeTestEngine, zeebe, PICK_CONTENT_SERVICE_TASK, PICK_CONTENT_TASK, singletonMap(CONTENT_NUMBER, contentId.getValue()));

verify(recommendationPicker).pickContent();

waitForTaskAndComplete(zeebeTestEngine, zeebe, SEND_RECOMMENDATION_SERVICE_TASK, SEND_RECOMMENDATION_TASK);

verify(sendNotification).send(recommendation);

assertTrue(isProcessInstanceCompleted(of(zeebeTestEngine.getRecordStreamSource()), PROCESS_DEFINITION));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package de.weinbrecht.luc.bpm.architecture;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import org.opentest4j.AssertionFailedError;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static io.camunda.zeebe.process.test.filters.StreamFilter.processInstance;
import static io.camunda.zeebe.protocol.record.RejectionType.NULL_VAL;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_COMPLETED;
import static io.camunda.zeebe.protocol.record.value.BpmnElementType.PROCESS;
import static java.time.Duration.ofSeconds;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class SpringProcessTestUtils {

public static ActivatedJob waitForTaskAndComplete(
ZeebeTestEngine zeebeTestEngine,
ZeebeClient zeebe,
String taskId,
String jobType) throws Exception {
int maxRetry = 5;

ActivatedJob taskJob = null;
for (int i = 0; i < maxRetry; i++) {
taskJob = waitAndFetchJobs(zeebeTestEngine, zeebe, jobType);
if (taskJob != null) {
// Make sure it is the right one
assertEquals(taskId, taskJob.getElementId());
zeebe.newCompleteCommand(taskJob.getKey()).send().join();
}
}
return taskJob;
}

public static ActivatedJob waitForTaskAndComplete(
ZeebeTestEngine zeebeTestEngine,
ZeebeClient zeebe,
String taskId,
String jobType,
Map<String, Object> variables) throws Exception {
int maxRetry = 5;

ActivatedJob taskJob = null;
for (int i = 0; i < maxRetry; i++) {
taskJob = waitAndFetchJobs(zeebeTestEngine, zeebe, jobType);
if (taskJob != null) {
// Make sure it is the right one
assertEquals(taskId, taskJob.getElementId());
zeebe.newCompleteCommand(taskJob.getKey()).variables(variables).send().join();
}
}
return taskJob;
}

public static ActivatedJob waitAndFetchJobs(
ZeebeTestEngine zeebeTestEngine,
ZeebeClient zeebe,
String jobType) throws InterruptedException, TimeoutException {
// Let the workflow engine do whatever it needs to do
zeebeTestEngine.waitForIdleState(ofSeconds(5));

// Now get all user tasks
List<ActivatedJob> jobs = zeebe.newActivateJobsCommand().jobType(jobType).maxJobsToActivate(1).send().join().getJobs();

if (jobs.isEmpty()) {
return null;
}
return jobs.get(0);
}

public static boolean isProcessInstanceCompleted(RecordStream recordStream, String bpmnProcessId) {
return processInstance(recordStream).withBpmnProcessId(bpmnProcessId)
.withRejectionType(NULL_VAL)
.withBpmnElementType(PROCESS)
.withIntent(ELEMENT_COMPLETED)
.stream().findFirst().isPresent();
}

public static Record<ProcessInstanceRecordValue> getProcessInstance(RecordStream recordStream, String bpmnProcessId) {
return processInstance(recordStream).withBpmnProcessId(bpmnProcessId)
.withRejectionType(NULL_VAL)
.withBpmnElementType(PROCESS)
.stream()
.findFirst()
.orElseThrow(() ->
new AssertionFailedError("Process Instance not found", bpmnProcessId, null));
}

public static void hasPassedElement(String elementId, RecordStream recordStream, String bpmnProcessId) {
processInstance(recordStream).withBpmnProcessId(bpmnProcessId)
.withRejectionType(NULL_VAL)
.withElementId(elementId)
.stream()
.findFirst()
.orElseThrow(() ->
new AssertionFailedError("Element not found", elementId, null));
}

public static long getProcessInstanceId(RecordStream recordStream, String bpmnProcessId) {
return getProcessInstance(recordStream, bpmnProcessId).getKey();
}
}

0 comments on commit 2186248

Please sign in to comment.