Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
72ac264
build: Exclude Guava 31.1 from OSSIndex sec scan
astubbs Jun 1, 2022
ea232b1
START: Tx marker adjacency tests
astubbs Jul 6, 2022
da97f45
improve error and naming
astubbs Jul 6, 2022
75f4b8d
step
astubbs Jul 6, 2022
081ba39
step
astubbs Jul 6, 2022
7d7b4c8
TEMPORARY: use TG 0.1.1 snapshot
astubbs Jun 30, 2022
062eb4c
test draft
astubbs Jul 6, 2022
ee4b888
Test captures issue
astubbs Jul 6, 2022
79399f9
Capture cases that work and a stacked/overloapping transaction
astubbs Jul 6, 2022
16c2535
Capture cases that work and a stacked/overloapping transaction
astubbs Jul 6, 2022
4680bd5
major: potential solution
astubbs Jul 6, 2022
ddc0092
only init tx in new version
astubbs Jul 6, 2022
b80087e
review
astubbs Jul 6, 2022
96cf41c
docs
astubbs Jul 6, 2022
88f531b
review
astubbs Jul 8, 2022
f905cdb
...
astubbs Jul 9, 2022
12dc7cf
build: Update to Truth-Generator 0.1.1 for Subject discovery
astubbs Jul 9, 2022
10ac83a
build: Only run OSS Index audit on runtime dependencies, not tests
astubbs Jul 9, 2022
70a37cd
build: Don't fail for OSSIndex scan - will set up another CI job for …
astubbs Jul 9, 2022
f2ec1dd
fix cherry-pick
astubbs Jul 9, 2022
0d2b370
build: Only enforce requireReleaseDeps when running in CI, not locally
astubbs Jul 9, 2022
00c581a
TEMPORARY: use TG 0.1.1 snapshot
astubbs Jun 30, 2022
d396e66
Merge branch 'exclude-guava-oss-index' into improvements/upgrade-tg
astubbs Jul 11, 2022
4f2de5c
remove guava exclude from branch
astubbs Jul 11, 2022
ad5092c
cleanse branch
astubbs Jul 11, 2022
30ea0c9
Merge branch 'improvements/allow-snapshots-locally' into improvements…
astubbs Jul 11, 2022
d7267d7
Merge remote-tracking branch 'antony/improvements/upgrade-tg' into im…
astubbs Jul 11, 2022
5e0db16
test snapshot first
astubbs Jul 11, 2022
ac65300
test snapshot first
astubbs Jul 11, 2022
dce76c2
Merge remote-tracking branch 'origin/master' into improvements/upgrad…
astubbs Jul 11, 2022
84e3fae
Merge branch 'improvements/upgrade-tg' into bugs/tx-marker-bug
astubbs Jul 11, 2022
d68070d
Merge remote-tracking branch 'origin/master' into bugs/tx-marker-bug
astubbs Jul 11, 2022
517e880
remove #since
astubbs Jul 11, 2022
cea1a65
review
astubbs Jul 11, 2022
20459a6
0.1.1 now available
astubbs Jul 11, 2022
dae553d
Merge remote-tracking branch 'origin/master' into improvements/upgrad…
astubbs Jul 11, 2022
11d1b13
Merge branch 'improvements/upgrade-tg' into bugs/tx-marker-bug
astubbs Jul 11, 2022
bdfa671
Merge remote-tracking branch 'origin/master' into bugs/tx-marker-bug
astubbs Jul 11, 2022
b411f04
review
astubbs Jul 11, 2022
807f5d7
review
astubbs Jul 11, 2022
0f87fee
review
astubbs Jul 12, 2022
aeb2eab
review
astubbs Jul 12, 2022
c97d508
START: Revert overloading of partition state and highest succeeded field
astubbs Jul 12, 2022
7440830
step
astubbs Jul 12, 2022
de50ea2
fix - bad code style allowed room for bug
astubbs Jul 12, 2022
ff2c483
Merge remote-tracking branch 'confluent/master' into bugs/tx-marker-bug
astubbs Jul 12, 2022
a7b8903
review
astubbs Jul 12, 2022
f48badc
review
astubbs Jul 12, 2022
f2a6588
Merge remote-tracking branch 'confluent/master' into bugs/tx-marker-bug
astubbs Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static <KK, VV> ParallelStreamProcessor<KK, VV> createEosStreamProcessor(Paralle
*
* @param usersVoidConsumptionFunction the function
*/
// todo why isn't this in ParallelConsumer ?
void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private void checkNotSubscribed(org.apache.kafka.clients.consumer.Consumer<K, V>
Set<String> subscription = consumerToCheck.subscription();
Set<TopicPartition> assignment = consumerToCheck.assignment();
if (!subscription.isEmpty() || !assignment.isEmpty()) {
throw new IllegalStateException("Consumer subscription must be managed by this class. Use " + this.getClass().getName() + "#subcribe methods instead.");
throw new IllegalStateException("Consumer subscription must be managed by the Parallel Consumer. Use " + this.getClass().getName() + "#subcribe methods instead.");
}
}

Expand Down Expand Up @@ -1158,7 +1158,7 @@ protected void addToMailbox(WorkContainer<K, V> wc) {
}

public void registerWork(EpochAndRecordsMap<K, V> polledRecords) {
log.debug("Adding {} to mailbox...", polledRecords);
log.trace("Adding {} to mailbox...", polledRecords);
workMailBox.add(ControllerEventMessage.of(polledRecords));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ PartitionState<K, V> decodePartitionState(TopicPartition tp, OffsetAndMetadata o
return new PartitionState<K, V>(tp, incompletes);
}

public String makeOffsetMetadataPayload(long finalOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
String offsetMap = serialiseIncompleteOffsetMapToBase64(finalOffsetForPartition, state);
public String makeOffsetMetadataPayload(long baseOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
String offsetMap = serialiseIncompleteOffsetMapToBase64(baseOffsetForPartition, state);
return offsetMap;
}

String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
byte[] compressedEncoding = encodeOffsetsCompressed(finalOffsetForPartition, state);
String serialiseIncompleteOffsetMapToBase64(long baseOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
byte[] compressedEncoding = encodeOffsetsCompressed(baseOffsetForPartition, state);
String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
return b64;
}
Expand All @@ -186,7 +186,7 @@ String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, Partit
* <p>
* Can remove string encoding in favour of the boolean array for the `BitSet` if that's how things settle.
*/
byte[] encodeOffsetsCompressed(long finalOffsetForPartition, PartitionState<K, V> partitionState) throws NoEncodingPossibleException {
byte[] encodeOffsetsCompressed(long baseOffsetForPartition, PartitionState<K, V> partitionState) throws NoEncodingPossibleException {
var incompleteOffsets = partitionState.getIncompleteOffsetsBelowHighestSucceeded();
long highestSucceeded = partitionState.getOffsetHighestSucceeded();
if (log.isDebugEnabled()) {
Expand All @@ -195,7 +195,7 @@ byte[] encodeOffsetsCompressed(long finalOffsetForPartition, PartitionState<K, V
highestSucceeded,
partitionState.getIncompleteOffsetsBelowHighestSucceeded());
}
OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, highestSucceeded, incompleteOffsets).invoke();
OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(baseOffsetForPartition, highestSucceeded, incompleteOffsets).invoke();

//
if (forcedCodec.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.PartitionState;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -12,6 +14,7 @@
import java.util.*;

import static io.confluent.csid.utils.Range.range;
import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.offsets.OffsetEncoding.Version.v1;
import static io.confluent.parallelconsumer.offsets.OffsetEncoding.Version.v2;

Expand Down Expand Up @@ -81,18 +84,21 @@ public class OffsetSimultaneousEncoder {
*/
private final Set<OffsetEncoder> encoders;

public OffsetSimultaneousEncoder(long lowWaterMark, long highestSucceededOffset, Set<Long> incompleteOffsets) {
this.lowWaterMark = lowWaterMark;
public OffsetSimultaneousEncoder(long baseOffsetToCommit, long highestSucceededOffset, Set<Long> incompleteOffsets) {
this.lowWaterMark = baseOffsetToCommit;
this.incompleteOffsets = incompleteOffsets;

//
if (highestSucceededOffset == -1) { // nothing succeeded yet
highestSucceededOffset = lowWaterMark;
highestSucceededOffset = baseOffsetToCommit;
}

highestSucceededOffset = maybeRaiseOffsetHighestSucceeded(baseOffsetToCommit, highestSucceededOffset);

long bitsetLengthL = highestSucceededOffset - this.lowWaterMark + 1;
if (bitsetLengthL < 0) {
throw new IllegalStateException("Cannot have negative length BitSet");
throw new IllegalStateException(msg("Cannot have negative length BitSet (calculated length: {}, base offset to commit: {}, highest succeeded offset: {})",
bitsetLengthL, baseOffsetToCommit, highestSucceededOffset));
}

// BitSet only support Integer.MAX_VALUE bits
Expand All @@ -103,6 +109,35 @@ public OffsetSimultaneousEncoder(long lowWaterMark, long highestSucceededOffset,
this.encoders = initEncoders();
}

/**
* Ensure that the {@param #highestSucceededOffset} is always at least a single offset behind the {}@param
* baseOffsetToCommit}. Needed to allow us to jump over gaps in the partitions such as transaction markers.
* <p>
* Under normal operation, it is expected that the highest succeeded offset will generally always be higher than the
* next expected offset to poll. This is because PC processes records well beyond the
* {@link PartitionState#getOffsetHighestSequentialSucceeded()} all the time, unless operation in
* {@link ParallelConsumerOptions.ProcessingOrder#PARTITION} order. So this situation - where the highest succeeded
* offset is below the next offset to poll at the time of commit - will either be an incredibly rare case: only at
* the very beginning of processing records, or where ALL records are slow enough or blocked, or in synthetically
* created scenarios (like test cases).
*/
private long maybeRaiseOffsetHighestSucceeded(long baseOffsetToCommit, long highestSucceededOffset) {
long nextExpectedMinusOne = baseOffsetToCommit - 1;

boolean gapLargerThanOne = highestSucceededOffset < nextExpectedMinusOne;
if (gapLargerThanOne) {
long gap = nextExpectedMinusOne - highestSucceededOffset;
log.debug("Gap detected in partition (highest succeeded: {} while next expected poll offset: {} - gap is {}), probably tx markers. Moving highest succeeded to next expected - 1",
highestSucceededOffset,
nextExpectedMinusOne,
gap);
// jump straight to the lowest incomplete - 1, allows us to jump over gaps in the partitions such as transaction markers
highestSucceededOffset = nextExpectedMinusOne;
}

return highestSucceededOffset;
}

private Set<OffsetEncoder> initEncoders() {
var newEncoders = new HashSet<OffsetEncoder>();
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public class PartitionState<K, V> {

/**
* Highest offset which has completed successfully ("succeeded").
* <p>
* Note that this may in some conditions, there may be a gap between this and the next offset to poll - that being,
* there may be some number of transaction marker records above it, and the next offset to poll.
*/
@Getter(PUBLIC)
private long offsetHighestSucceeded = KAFKA_OFFSET_ABSENCE;
Expand Down Expand Up @@ -133,7 +136,7 @@ private void maybeRaiseHighestSeenOffset(final long offset) {
}
}

public void onOffsetCommitSuccess(final OffsetAndMetadata committed) {
public void onOffsetCommitSuccess(OffsetAndMetadata committed) { //NOSONAR
setClean();
}

Expand Down Expand Up @@ -167,7 +170,7 @@ public int getCommitQueueSize() {
public void onSuccess(WorkContainer<K, V> work) {
long offset = work.offset();

WorkContainer<K, V> removedFromQueue = this.commitQueue.remove(work.offset());
WorkContainer<K, V> removedFromQueue = this.commitQueue.remove(offset);
assert (removedFromQueue != null);

boolean removedFromIncompletes = this.incompleteOffsets.remove(offset);
Expand Down Expand Up @@ -195,9 +198,10 @@ private void updateHighestSucceededOffsetSoFar(WorkContainer<K, V> work) {
}

public void addWorkContainer(WorkContainer<K, V> wc) {
maybeRaiseHighestSeenOffset(wc.offset());
commitQueue.put(wc.offset(), wc);
incompleteOffsets.add(wc.offset());
long newOffset = wc.offset();
maybeRaiseHighestSeenOffset(newOffset);
commitQueue.put(newOffset, wc);
incompleteOffsets.add(newOffset);
}

/**
Expand All @@ -224,6 +228,9 @@ private OffsetAndMetadata createOffsetAndMetadata() {
.orElseGet(() -> new OffsetAndMetadata(nextOffset));
}

/**
* Defines as the offset one below the highest sequentially succeeded offset
*/
private long getNextExpectedPolledOffset() {
return getOffsetHighestSequentialSucceeded() + 1;
}
Expand All @@ -249,6 +256,10 @@ public Set<Long> getIncompleteOffsetsBelowHighestSucceeded() {
.collect(Collectors.toSet()));
}

/**
* Defined for our purpose (as only used in definition of what offset to poll for next), as the offset one below the
* lowest incomplete offset.
*/
public long getOffsetHighestSequentialSucceeded() {
if (this.incompleteOffsets.isEmpty()) {
return this.offsetHighestSeen;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.confluent.csid.testcontainers.FilteredTestContainerSlf4jLogConsumer;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -53,6 +54,7 @@ public abstract class BrokerIntegrationTest<K, V> {
kafkaContainer.start();
}

@Getter
protected KafkaClientUtils kcu = new KafkaClientUtils(kafkaContainer);

@BeforeAll
Expand Down
Loading