Skip to content

Commit

Permalink
Merge branch 'release/1.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Jan 24, 2023
2 parents 1c06239 + 987dfc4 commit 4ce8dd2
Show file tree
Hide file tree
Showing 34 changed files with 276 additions and 59 deletions.
2 changes: 1 addition & 1 deletion bom/bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-root</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion bom/parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-root</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
32 changes: 21 additions & 11 deletions docs/user-guide/scheduled-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,26 @@ correlate:
persistence: # persistence setting
messageMaxRetries: 100
messageFetchPageSize: 100
messageBatchSize: 1
retry:
retryMaxBackoffMinutes: 5
retryBackoffBase: 2.0


```

| Property | Values | Meaning | Default |
|----------------------------------|---------------------|-------------------------------------------------|---------|
| batch.mode | `all`, `fail_first` | Batch processing mode | all |
| batch.query.pollInitialDelay | Duration in ISO8601 | Start delay before correlation scheduler starts | PT10S |
| batch.query.pollInterval | Duration in ISO8601 | Delay between correlation attempts | PT6S |
| batch.cleanup.pollInitialDelay | Duration in ISO8601 | Start delay before clean-up scheduler starts | |
| batch.cleanup.pollInterval | Duration in ISO8601 | Delay between clean-ups | |
| persistence.messageMaxRetries | Integer | Maximum retries before giving up correlation | 100 |
| persistence.messageFetchPageSize | Integer | Paging size by message fetch | 100 |
| retry.retryMaxBackoffMinutes | Integer | Maximum backoff-time in minutes | 180 |
| retry.retryBackoffBase | Float | Base for exponential backoff-time | 180 |
| Property | Values | Meaning | Default |
|----------------------------------|---------------------|-----------------------------------------------------|---------|
| batch.mode | `all`, `fail_first` | Batch processing mode | all |
| batch.query.pollInitialDelay | Duration in ISO8601 | Start delay before correlation scheduler starts | PT10S |
| batch.query.pollInterval | Duration in ISO8601 | Delay between correlation attempts | PT6S |
| batch.cleanup.pollInitialDelay | Duration in ISO8601 | Start delay before clean-up scheduler starts | |
| batch.cleanup.pollInterval | Duration in ISO8601 | Delay between clean-ups | |
| persistence.messageMaxRetries | Integer | Maximum retries before giving up correlation | 100 |
| persistence.messageFetchPageSize | Integer | Paging size by message fetch | 100 |
| persistence.messageBatchSize | Integer | Limit the number of messages processed from a batch | -1 |
| retry.retryMaxBackoffMinutes | Integer | Maximum backoff-time in minutes | 180 |
| retry.retryBackoffBase | Float | Base for exponential backoff-time | 180 |

### Reading message

Expand All @@ -57,6 +60,13 @@ Batches of messages are checked to fulfill the following criteria:
Messages of one batch are correlated in order of their sorting. If a correlation error occurs, the batch correlation is
either interrupted (`fail_first` mode) or the batch is correlated to the end (`all` mode).

An important parameter for batch processing is the `message-batch-size`. This parameter specifies the number of messages
taken from a batch for synchronous correlation. Effectively, this parameter has two interesting values. Set this parameter
to `-1` (default) and all messages from one batch will be correlated directly one after another. Set this parameter to
`1` and the batch will be constructed, but only the first message will be correlated in current run. If successful, the
next message will be fetched during the next message query (after the `batch.query.pollInterval`, which should be a small interval).
By doing so, you can deal with asynchronous continuations in your process.

### Error detection

If the error is detected during the correlation, it is handled by the library. If the message time-to-live is set and the
Expand Down
2 changes: 1 addition & 1 deletion example/axon/flight-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/axon/hotel-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/axon/reservation-axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/itest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<!-- It is ok to use the extension parent, there is only Kotlin and some managed deps defined there -->
<relativePath>../bom/parent/pom.xml</relativePath>
</parent>
Expand Down
7 changes: 4 additions & 3 deletions example/spring-cloud/example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ CLIENT_BIN=kcat

case "$1" in
"reservation")
JSON=$(jq ".reservationId =\""$RESERVATION_ID\"" "$DIR/../common/local/reservation-created.json")
JSON=$(jq ".reservationId=\"$RESERVATION_ID\"" $DIR/../common/local/reservation-created.json)
EVENT_TYPE=io.holunda.camunda.bpm.example.common.domain.ReservationReceivedEvent
echo "Sending create reservation message: $JSON"
echo "Sending create reservation message: '$JSON'"
;;

"flight")
Expand All @@ -28,7 +28,7 @@ case "$1" in
"show")
"$CLIENT_BIN" -b "$KAFKA_BOOTSTRAP_SERVER_HOST:$KAFKA_BOOTSTRAP_SERVER_PORT" \
-t "$KAFKA_TOPIC_CORRELATE_INGRESS" \
-C
-C -f 'Key: %k, payload: %s\n'
exit 0
;;
"all")
Expand All @@ -43,6 +43,7 @@ case "$1" in
esac

echo "$JSON" | "$CLIENT_BIN" \
-D "\0" \
-b "$KAFKA_BOOTSTRAP_SERVER_HOST:$KAFKA_BOOTSTRAP_SERVER_PORT" \
-t "$KAFKA_TOPIC_CORRELATE_INGRESS" \
-P \
Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/flight-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/hotel-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/spring-cloud/reservation-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-example-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package io.holunda.camunda.bpm.example.kafka.correlation.correlate

import io.holunda.camunda.bpm.correlate.correlation.CorrelationMessage
import io.holunda.camunda.bpm.correlate.correlation.SingleMessageCorrelationStrategy
import io.holunda.camunda.bpm.correlate.correlation.impl.IdentityMessageComparator
import io.holunda.camunda.bpm.correlate.correlation.impl.MessageIdCorrelationMessageComparator
import io.holunda.camunda.bpm.correlate.event.CorrelationHint
import io.holunda.camunda.bpm.correlate.persist.impl.DefaultMessagePersistenceService
import io.holunda.camunda.bpm.example.common.domain.ReservationReceivedEvent
import io.holunda.camunda.bpm.example.common.domain.flight.FlightReservationConfirmedEvent
import io.holunda.camunda.bpm.example.common.domain.hotel.HotelReservationConfirmedEvent
Expand Down Expand Up @@ -52,10 +54,11 @@ class ReservationProcessingCorrelation(
RESERVATION_ID.name to payload.bookingReference
)
)
is DefaultMessagePersistenceService.PayloadNotAvailable -> throw IllegalArgumentException("No payload was available, could not determine correlation hint for message ${message.messageMetaData}")
else -> throw IllegalArgumentException("Could not determine correlation hint for message ${message.messageMetaData}")
}
}
}

override fun correlationMessageSorter(): Comparator<CorrelationMessage> = MessageIdCorrelationMessageComparator()
override fun correlationMessageSorter(): Comparator<CorrelationMessage> = IdentityMessageComparator()
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ correlate:
message:
# timeToLiveAsString: PT10S # errors during TTL seconds after receiving are ignored
payloadEncoding: jackson # our bytes are actually JSON written by Jackson.

persistence:
messageMaxRetries: 5 # default 100 -> will try to deliver 5 times at most
messageFetchPageSize: 100 # default 100
message-max-retries: 5 # default 100 -> will try to deliver 5 times at most
message-fetch-page-size: 100 # default 100
# message-batch-size: 1 # default is -1 meaning unlimited
retry:
retryMaxBackoffMinutes: 5 # default 180 -> maximum 5 minutes between retries
retryBackoffBase: 5.0 # value in minutes default 2.0 -> base in the power of retry to calculate the next retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spring:
stream:
bindings:
correlate-ingress-binding:
content-type: application/json
# content-type: application/octet-stream // do we need this?
destination: ${KAFKA_TOPIC_CORRELATE_INGRES:correlate-ingress}
binder: correlate-ingress-binder
group: ${KAFKA_GROUP_ID}
Expand All @@ -34,9 +34,9 @@ spring:
brokers: ${KAFKA_BOOTSTRAP_SERVER_HOST:localhost}:${KAFKA_BOOTSTRAP_SERVER_PORT:9092}
configuration:
security.protocol: ${KAFKA_SECURITY_PROTOCOL_OVERRIDE:PLAINTEXT}
# consumer-properties:
# key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
consumerProperties:
key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

camunda:
bpm:
Expand Down
2 changes: 1 addition & 1 deletion extension/axon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../bom/parent/pom.xml</relativePath>
</parent>

Expand Down
12 changes: 6 additions & 6 deletions extension/cockpit-plugin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion extension/cockpit-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../bom/parent/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export const EditRetriesModal = ({ message, onClose, onSubmit, maxRetries }: Edi
<ModalBody>
<form>
<span>Message could not be correlated from the first attempt.</span><br />
{retries == maxRetries ?
{retries >= maxRetries ?
<div className="form-group">
<span>All ${maxRetries} are exhausted. To continue retries please decrease the number of retries below:</span>
<label htmlFor={retriesInputId}>Retries</label>
Expand Down
2 changes: 1 addition & 1 deletion extension/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>io.holunda</groupId>
<artifactId>camunda-bpm-correlate-parent</artifactId>
<version>1.0.1</version>
<version>1.1.0</version>
<relativePath>../../bom/parent/pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.holunda.camunda.bpm.correlate.correlation.impl

import io.holunda.camunda.bpm.correlate.correlation.CorrelationMessage

/**
* Correlation message comparator taking not changing the order.
*/
class IdentityMessageComparator : Comparator<CorrelationMessage> {
override fun compare(left: CorrelationMessage, right: CorrelationMessage): Int = -1
}


Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ interface MessageRepository {

/**
* Finds all messages.
* @param page number start element to fetch from.
* @param page number start element to fetch from. Starts from 0.
* @param pageSize number elements to fetch.
* @return list of all messages.
*/
fun findAll(page: Int, pageSize: Int): List<MessageEntity>
fun findAll(page: Int = 0, pageSize: Int): List<MessageEntity>

/**
* Finds all messages without payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DefaultMessagePersistenceService(
logger.debug { "Built ${batches.size} batches." }

/*
* Fast access fun to retry info for message.
* Fast access function to retry info for message.
*/
fun CorrelationMessage.retryInfo() = messagesWithRetries.getValue(this)

Expand All @@ -110,6 +110,12 @@ class DefaultMessagePersistenceService(
// take those without errors or if they are due for retry
hasNoErrors || dueForRetry
}
}.map { correlationBatch ->
if (messagePersistenceConfig.batchSizeLimit() != -1) {
correlationBatch.copy(correlationMessages = correlationBatch.correlationMessages.take(messagePersistenceConfig.batchSizeLimit()))
} else {
correlationBatch
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,12 @@ interface MessagePersistenceConfig {
* Retrieves the page size for paging.
*/
fun getPageSize(): Int

/**
* Limits the global size of messages after batch building process. Defaults to unlimited.
* Special value of interest is 1, because only the first message in batch is taken in current run,
* and all others are ignored and will be processed during the next correlation attempt.
*/
fun batchSizeLimit(): Int = -1
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ interface MyBatisMessageMapper {
/**
* Finds all messages using paging.
*/
@Select("SELECT * FROM COR_MESSAGE")
@Select("SELECT * FROM COR_MESSAGE M ORDER BY M.INSERTED ASC")
@Results(
value = [
Result(property = "id", column = "ID", jdbcType = JdbcType.VARCHAR),
Expand All @@ -36,7 +36,7 @@ interface MyBatisMessageMapper {
/**
* Find all message light objects (message without payload).
*/
@Select("SELECT * FROM COR_MESSAGE M WHERE M.ERROR IS NOT NULL")
@Select("SELECT * FROM COR_MESSAGE M WHERE M.ERROR IS NOT NULL ORDER BY M.INSERTED ASC")
@Results(
value = [
Result(property = "id", column = "ID", jdbcType = JdbcType.VARCHAR),
Expand All @@ -57,7 +57,7 @@ interface MyBatisMessageMapper {
/**
* Loads all messages.
*/
@Select("SELECT * FROM COR_MESSAGE")
@Select("SELECT * FROM COR_MESSAGE M ORDER BY M.INSERTED ASC")
@Results(
value = [
Result(property = "id", column = "ID", jdbcType = JdbcType.VARCHAR),
Expand All @@ -79,7 +79,7 @@ interface MyBatisMessageMapper {
/**
* Finds a message by id.
*/
@Select("SELECT * from COR_MESSAGE M WHERE M.ID=#{id}")
@Select("SELECT * from COR_MESSAGE M WHERE M.ID=#{id} ORDER BY M.INSERTED ASC")
@Results(
value = [
Result(property = "id", column = "ID"),
Expand Down
Loading

0 comments on commit 4ce8dd2

Please sign in to comment.