Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ private void commitAsync(Optional<Map<TopicPartition, OffsetAndMetadata>> offset
}

private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commit(final CommitEvent commitEvent) {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
offsetCommitCallbackInvoker.executeCallbacks();

if (commitEvent.offsets().isPresent() && commitEvent.offsets().get().isEmpty()) {
Expand Down Expand Up @@ -1083,7 +1083,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
acquireAndEnsureOpen();
long start = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
if (partitions.isEmpty()) {
return Collections.emptyMap();
}
Expand All @@ -1107,7 +1107,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
}
}

private void maybeThrowInvalidGroupIdException() {
private void throwIfGroupIdNotDefined() {
if (groupMetadata.get().isEmpty()) {
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
Expand Down Expand Up @@ -1346,7 +1346,7 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
public ConsumerGroupMetadata groupMetadata() {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
return groupMetadata.get().get();
} finally {
release();
Expand Down Expand Up @@ -2028,7 +2028,7 @@ private void release() {
private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
"null" : "empty"));
Expand All @@ -2052,7 +2052,7 @@ private void subscribeToRegex(SubscriptionPattern pattern,
Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
throwIfSubscriptionPatternIsInvalid(pattern);
log.info("Subscribing to regular expression {}", pattern);
applicationEventHandler.addAndGet(new TopicRe2JPatternSubscriptionChangeEvent(
Expand All @@ -2076,7 +2076,7 @@ private void throwIfSubscriptionPatternIsInvalid(SubscriptionPattern subscriptio
private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public void subscribe(Collection<String> topics) {
private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
Expand Down Expand Up @@ -559,7 +559,7 @@ public void subscribe(SubscriptionPattern pattern) {
* configured at-least one partition assignment strategy
*/
private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> listener) {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
if (pattern == null || pattern.toString().isEmpty())
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ?
"null" : "empty"));
Expand Down Expand Up @@ -743,7 +743,7 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, fin
acquireAndEnsureOpen();
long commitStart = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
offsets.forEach(this::updateLastSeenEpochIfNewer);
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), time.timer(timeout))) {
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
Expand All @@ -769,7 +769,7 @@ public void commitAsync(OffsetCommitCallback callback) {
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
log.debug("Committing offsets: {}", offsets);
offsets.forEach(this::updateLastSeenEpochIfNewer);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
Expand Down Expand Up @@ -890,7 +890,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
acquireAndEnsureOpen();
long start = time.nanoseconds();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
final Map<TopicPartition, OffsetAndMetadata> offsets;
offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout));
if (offsets == null) {
Expand Down Expand Up @@ -1079,7 +1079,7 @@ public OptionalLong currentLag(TopicPartition topicPartition) {
public ConsumerGroupMetadata groupMetadata() {
acquireAndEnsureOpen();
try {
maybeThrowInvalidGroupIdException();
throwIfGroupIdNotDefined();
return coordinator.groupMetadata();
} finally {
release();
Expand Down Expand Up @@ -1273,7 +1273,7 @@ private void throwIfNoAssignorsConfigured() {
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " configuration property");
}

private void maybeThrowInvalidGroupIdException() {
private void throwIfGroupIdNotDefined() {
if (groupId.isEmpty())
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
Expand Down