Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ protected AbstractAuthSecurityConfig() {
"/resources/**",
"/actuator/health/**",
"/actuator/info",
"/actuator/prometheus",
"/auth",
"/login",
"/logout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,28 @@

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

public abstract class AbstractEmitter {

private final MessagesProcessing messagesProcessing;
private final PollingThrottler throttler;
protected final PollingSettings pollingSettings;

protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
this.messagesProcessing = messagesProcessing;
this.pollingSettings = pollingSettings;
this.throttler = pollingSettings.getPollingThrottler();
}

protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
protected PolledRecords poll(
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}

protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
Instant start = Instant.now();
ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
Instant finish = Instant.now();
int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
throttler.throttleAfterPoll(polledBytes);
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
sendConsuming(sink, records);
return records;
}

Expand All @@ -49,10 +40,8 @@ protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
messagesProcessing.sendPhase(sink, name);
}

protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> records,
long elapsed) {
return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
Expand All @@ -22,12 +20,12 @@ public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;

public BackwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
Expand All @@ -41,7 +39,7 @@ public BackwardRecordEmitter(
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting backward polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");

var seekOperations = SeekOperations.create(consumer, consumerPosition);
Expand Down Expand Up @@ -91,7 +89,7 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,
long toOffset,
Consumer<Bytes, Bytes> consumer,
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
Expand All @@ -101,21 +99,21 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(

var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();

EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords);
emptyPolls.count(polledRecords.count());

log.debug("{} records polled from {}", polledRecords.count(), tp);

var filteredRecords = polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.toList();

if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -13,23 +10,17 @@ class ConsumingStats {
private int records = 0;
private long elapsed = 0;

/**
* returns bytes polled.
*/
int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
PolledRecords polledRecords,
int filterApplyErrors) {
int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
bytes += polledBytes;
bytes += polledRecords.bytes();
this.records += polledRecords.count();
this.elapsed += elapsed;
this.elapsed += polledRecords.elapsed().toMillis();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
return polledBytes;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public class EmptyPollsCounter {
this.maxEmptyPolls = maxEmptyPolls;
}

public void count(ConsumerRecords<?, ?> polled) {
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
public void count(int polledCount) {
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
}

public boolean noDataEmptyPollsReached() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.provectus.kafka.ui.emitter;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;


public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {

private final PollingThrottler throttler;
private final ApplicationMetrics metrics;
private String pollingTopic;

public EnhancedConsumer(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
super(properties, new BytesDeserializer(), new BytesDeserializer());
this.throttler = throttler;
this.metrics = metrics;
metrics.activeConsumers().incrementAndGet();
}

public PolledRecords pollEnhanced(Duration dur) {
var stopwatch = Stopwatch.createStarted();
ConsumerRecords<Bytes, Bytes> polled = poll(dur);
PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
return polledEnhanced;
}

@Override
public void assign(Collection<TopicPartition> partitions) {
super.assign(partitions);
Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Preconditions.checkState(assignedTopics.size() == 1);
this.pollingTopic = assignedTopics.iterator().next();
}

@Override
public void subscribe(Pattern pattern) {
throw new UnsupportedOperationException();
}

@Override
public void subscribe(Collection<String> topics) {
throw new UnsupportedOperationException();
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}

@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -16,11 +14,11 @@ public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition position;

public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition position,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
Expand All @@ -32,7 +30,7 @@ public ForwardRecordEmitter(
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting forward polling for {}", position);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
Expand All @@ -44,8 +42,8 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
&& !emptyPolls.noDataEmptyPollsReached()) {

sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
emptyPolls.count(records);
var records = poll(sink, consumer);
emptyPolls.count(records.count());

log.debug("{} records polled", records.count());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

Expand Down Expand Up @@ -54,13 +53,10 @@ void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> r
}
}

int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed) {
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
if (!sink.isCancelled()) {
return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
}
return 0;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.provectus.kafka.ui.emitter;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;

public record PolledRecords(int count,
int bytes,
Duration elapsed,
ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {

static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
return new PolledRecords(
polled.count(),
calculatePolledRecSize(polled),
pollDuration,
polled
);
}

public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
return records.records(tp);
}

@Override
public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
return records.iterator();
}

private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
}
return polledBytes;
}
}
Loading