Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing the ConsumerSeekAware interface #812

Merged
merged 41 commits into from
Aug 22, 2023
Merged
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
0bda9c8
Implement kafka seek mechanisms
guillermocalvo Aug 14, 2023
4bf5a5d
Add tests
guillermocalvo Aug 14, 2023
052d21f
Replace deprecated `KafkaConsumerAware` -> `ConsumerAware`
guillermocalvo Aug 15, 2023
388a31b
Fix typo `producer` -> `consumer`
guillermocalvo Aug 15, 2023
be32a75
Replace existing example `[source,java]` -> `snippet`
guillermocalvo Aug 15, 2023
99359cd
Add documentation for `ConsumerSeekAware`
guillermocalvo Aug 15, 2023
12debce
Add documentation for `KafkaSeekOperations`
guillermocalvo Aug 15, 2023
d915419
Add `@NonNull` annotations
guillermocalvo Aug 17, 2023
5b3e36d
Turn into a one-liner
guillermocalvo Aug 17, 2023
3b8a0a5
Fix log message
guillermocalvo Aug 17, 2023
c189fe2
Update javadoc
guillermocalvo Aug 17, 2023
54945a6
Use a record instead of a Java class
guillermocalvo Aug 17, 2023
a573357
Use a Kotlin data class instead of a regular class
guillermocalvo Aug 17, 2023
e1e12d1
Use `void` instead of `def`
guillermocalvo Aug 17, 2023
f2341a1
Split section into multiple documents
guillermocalvo Aug 17, 2023
9968baf
Make the Groovy snippet more idiomatic
guillermocalvo Aug 17, 2023
4faab8e
Add tests for the code samples in all test suites
guillermocalvo Aug 17, 2023
1dfc72f
Update tests
guillermocalvo Aug 17, 2023
852e9f6
Merge remote-tracking branch 'remotes/origin/master' into 46-missing-…
guillermocalvo Aug 17, 2023
a0fc4d0
rename back to old name
sdelamo Aug 18, 2023
2b97558
Split Kafka seek in sections
sdelamo Aug 18, 2023
1d517f6
Merge branch 'master' into 46-missing-the-consumerseekaware-interface
sdelamo Aug 18, 2023
b6abd6c
Apply suggestions from code review
guillermocalvo Aug 18, 2023
99bf01a
Use a simple `if`
guillermocalvo Aug 18, 2023
c243281
Refactor `DefaultKafkaSeeker::perform`
guillermocalvo Aug 18, 2023
944d29a
Refactor trait interface into static methods
guillermocalvo Aug 18, 2023
15b9ba3
Refactor cognitive complexity `DefaultKafkaSeeker::perform`
guillermocalvo Aug 18, 2023
c7cad51
Merge branch 'master' into 46-missing-the-consumerseekaware-interface
sdelamo Aug 18, 2023
9cbba67
remove MY_KAFKA stop
sdelamo Aug 18, 2023
052cee0
extract methods
sdelamo Aug 18, 2023
56a048a
Make checkstyle happy again.
guillermocalvo Aug 18, 2023
4785378
Revert "remove MY_KAFKA stop"
guillermocalvo Aug 18, 2023
6c7c396
Merge branch 'master' into 46-missing-the-consumerseekaware-interface
sdelamo Aug 18, 2023
0aba52f
remove abstractKafkatest
sdelamo Aug 18, 2023
9d5bcb3
Merge remote-tracking branch 'remotes/origin/master' into 46-missing-…
guillermocalvo Aug 21, 2023
040d4e9
Keep tests that aren't based on test-resources as they are
guillermocalvo Aug 21, 2023
5460b78
Refactor multi-language tests
guillermocalvo Aug 21, 2023
71632bb
Remove `@Inject` annotations
guillermocalvo Aug 22, 2023
5d50906
Log messages as `debug` instead of `info`
guillermocalvo Aug 22, 2023
6647ef5
Use nullable `Boolean` instead of `Optional`
guillermocalvo Aug 22, 2023
b4044cf
Revert "Use nullable `Boolean` instead of `Optional`"
guillermocalvo Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor DefaultKafkaSeeker::perform
  • Loading branch information
guillermocalvo committed Aug 18, 2023
commit c24328132d3d150e1fe8c40ec9515ae38f1eb05b
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,46 @@ record DefaultKafkaSeeker(@NonNull Consumer<?, ?> consumer) implements KafkaSeek
@Override
public boolean perform(@NonNull KafkaSeekOperation operation) {
try {
return Optional.of(operation).filter(op -> op.offset() == 0L).flatMap(this::optimized)
.orElseGet(() -> regular(operation));
final TopicPartition tp = operation.topicPartition();
if (operation.offset() == 0) {
switch (operation.offsetType()) {
case FORWARD, BACKWARD:
// Special case: relative zero-offset
if (LOG.isInfoEnabled()) {
LOG.info("Relative zero-offset seek operation dropped: {}", operation);
}
return false;
case BEGINNING:
// Optimized case: seek to the beginning
consumer.seekToBeginning(singletonList(tp));
if (LOG.isInfoEnabled()) {
LOG.info("Seek to the beginning operation succeeded: {}-{}", operation.topic(), operation.partition());
}
return true;
case END:
// Optimized case: seek to the end
consumer.seekToEnd(singletonList(tp));
if (LOG.isInfoEnabled()) {
LOG.info("Seek to the end operation succeeded: {}-{}", operation.topic(), operation.partition());
}
return true;
default:
// Perform operation regularly
}
}
final long offset = switch (operation.offsetType()) {
case ABSOLUTE -> operation.offset();
case FORWARD -> current(tp) + operation.offset();
case BACKWARD -> current(tp) - operation.offset();
case BEGINNING -> beginning(tp) + operation.offset();
case END -> end(tp) - operation.offset();
case TIMESTAMP -> earliest(tp, operation.offset()).orElseGet(() -> end(tp));
};
consumer.seek(tp, Math.max(0, offset));
if (LOG.isInfoEnabled()) {
LOG.info("Seek operation succeeded: {} - offset: {}", operation, offset);
}
return true;
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Seek operation failed: {}", operation, e);
Expand All @@ -64,54 +102,6 @@ public boolean perform(@NonNull KafkaSeekOperation operation) {
}
}

private Optional<Boolean> optimized(@NonNull KafkaSeekOperation op) {
// Assuming offset is zero
final TopicPartition tp = op.topicPartition();
switch (op.offsetType()) {
case FORWARD, BACKWARD:
// Special case: relative zero-offset
if (LOG.isInfoEnabled()) {
LOG.info("Relative zero-offset seek operation dropped: {}", op);
}
return Optional.of(false);
case BEGINNING:
// Optimized case: seek to the beginning
consumer.seekToBeginning(singletonList(tp));
if (LOG.isInfoEnabled()) {
LOG.info("Seek to the beginning operation succeeded: {}-{}", op.topic(), op.partition());
}
return Optional.of(true);
case END:
// Optimized case: seek to the end
consumer.seekToEnd(singletonList(tp));
if (LOG.isInfoEnabled()) {
LOG.info("Seek to the end operation succeeded: {}-{}", op.topic(), op.partition());
}
return Optional.of(true);
default:
// Perform operation regularly
return Optional.empty();
}
}

private boolean regular(@NonNull KafkaSeekOperation op) {
// Assuming offset is greater than zero
final TopicPartition tp = op.topicPartition();
final long offset = switch (op.offsetType()) {
case ABSOLUTE -> op.offset();
case FORWARD -> current(tp) + op.offset();
case BACKWARD -> current(tp) - op.offset();
case BEGINNING -> beginning(tp) + op.offset();
case END -> end(tp) - op.offset();
case TIMESTAMP -> earliest(tp, op.offset()).orElseGet(() -> end(tp));
};
consumer.seek(tp, Math.max(0, offset));
if (LOG.isInfoEnabled()) {
LOG.info("Seek operation succeeded: {} - offset: {}", op, offset);
}
return true;
}

private long current(TopicPartition tp) {
return consumer.position(tp);
}
Expand Down