Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@
<artifactId>groovy-json</artifactId>
<version>${groovy.version}</version>
</dependency>
<dependency>
<groupId>org.apache.datasketches</groupId>
<artifactId>datasketches-java</artifactId>
<version>${datasketches-java.version}</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicCreationDTO;
Expand All @@ -19,6 +20,7 @@
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import java.util.Comparator;
import java.util.List;
import javax.validation.Valid;
Expand All @@ -40,6 +42,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
private static final Integer DEFAULT_PAGE_SIZE = 25;

private final TopicsService topicsService;
private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;

@Override
Expand Down Expand Up @@ -181,4 +184,29 @@ public Mono<ResponseEntity<ReplicationFactorChangeResponseDTO>> changeReplicatio
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
.map(ResponseEntity::ok);
}

@Override
public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
return topicAnalysisService.analyze(getCluster(clusterName), topicName)
.thenReturn(ResponseEntity.ok().build());
}

@Override
public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
ServerWebExchange exchange) {
topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
return Mono.just(ResponseEntity.ok().build());
}


@Override
public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterName,
String topicName,
ServerWebExchange exchange) {
return Mono.just(
topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
.map(ResponseEntity::ok)
.orElseGet(() -> ResponseEntity.notFound().build())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public enum ErrorCode {
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT),
INVALID_ENTITY_STATE(4016, HttpStatus.BAD_REQUEST),
SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR);
SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR),
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST);

static {
// codes uniqueness check
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.provectus.kafka.ui.exception;

public class TopicAnalysisException extends CustomBaseException {

public TopicAnalysisException(String message) {
super(message);
}

@Override
public ErrorCode getErrorCode() {
return ErrorCode.TOPIC_ANALYSIS_ERROR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.provectus.kafka.ui.service.analyze;

import com.google.common.base.Throwables;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.model.TopicAnalysisProgressDTO;
import com.provectus.kafka.ui.model.TopicAnalysisResultDTO;
import java.io.Closeable;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.Value;

class AnalysisTasksStore {

private final Map<TopicIdentity, RunningAnalysis> running = new ConcurrentHashMap<>();
private final Map<TopicIdentity, TopicAnalysisResultDTO> completed = new ConcurrentHashMap<>();

void setAnalysisError(TopicIdentity topicId,
Instant collectionStartedAt,
Throwable th) {
running.remove(topicId);
completed.put(
topicId,
new TopicAnalysisResultDTO()
.startedAt(collectionStartedAt.toEpochMilli())
.finishedAt(System.currentTimeMillis())
.error(Throwables.getStackTraceAsString(th))
);
}

void setAnalysisResult(TopicIdentity topicId,
Instant collectionStartedAt,
TopicAnalysisStats totalStats,
Map<Integer, TopicAnalysisStats> partitionStats) {
running.remove(topicId);
completed.put(topicId,
new TopicAnalysisResultDTO()
.startedAt(collectionStartedAt.toEpochMilli())
.finishedAt(System.currentTimeMillis())
.totalStats(totalStats.toDto(null))
.partitionStats(
partitionStats.entrySet().stream()
.map(e -> e.getValue().toDto(e.getKey()))
.collect(Collectors.toList())
));
}

void updateProgress(TopicIdentity topicId,
long msgsScanned,
long bytesScanned,
Double completeness) {
running.computeIfPresent(topicId, (k, state) ->
state.toBuilder()
.msgsScanned(msgsScanned)
.bytesScanned(bytesScanned)
.completenessPercent(completeness)
.build());
}

void registerNewTask(TopicIdentity topicId, Closeable task) {
running.put(topicId, new RunningAnalysis(Instant.now(), 0.0, 0, 0, task));
}

void cancelAnalysis(TopicIdentity topicId) {
Optional.ofNullable(running.remove(topicId))
.ifPresent(RunningAnalysis::stopTask);
}

boolean isAnalysisInProgress(TopicIdentity id) {
return running.containsKey(id);
}

Optional<TopicAnalysisDTO> getTopicAnalysis(TopicIdentity id) {
var runningState = running.get(id);
var completedState = completed.get(id);
if (runningState == null && completedState == null) {
return Optional.empty();
}
return Optional.of(createAnalysisDto(runningState, completedState));
}

private TopicAnalysisDTO createAnalysisDto(@Nullable RunningAnalysis runningState,
@Nullable TopicAnalysisResultDTO completedState) {
return new TopicAnalysisDTO()
.progress(runningState != null ? runningState.toDto() : null)
.result(completedState);
}

@Value
@Builder(toBuilder = true)
private static class RunningAnalysis {
Instant startedAt;
double completenessPercent;
long msgsScanned;
long bytesScanned;
Closeable task;

TopicAnalysisProgressDTO toDto() {
return new TopicAnalysisProgressDTO()
.startedAt(startedAt.toEpochMilli())
.bytesScanned(bytesScanned)
.msgsScanned(msgsScanned)
.completenessPercent(BigDecimal.valueOf(completenessPercent));
}

@SneakyThrows
void stopTask() {
task.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.provectus.kafka.ui.service.analyze;

import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.util.OffsetsSeek.WaitingOffsets;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;


@Slf4j
@Component
@RequiredArgsConstructor
public class TopicAnalysisService {

private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();

private final TopicsService topicsService;
private final ConsumerGroupService consumerGroupService;

public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
return topicsService.getTopicDetails(cluster, topicName)
.doOnNext(topic ->
startAnalysis(
cluster,
topicName,
topic.getPartitionCount(),
topic.getPartitions().values()
.stream()
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
.sum()
)
).then();
}

private synchronized void startAnalysis(KafkaCluster cluster,
String topic,
int partitionsCnt,
long approxNumberOfMsgs) {
var topicId = new TopicIdentity(cluster, topic);
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
throw new TopicAnalysisException("Topic is already analyzing");
}
var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs);
analysisTasksStore.registerNewTask(topicId, task);
Schedulers.boundedElastic().schedule(task);
}

public void cancelAnalysis(KafkaCluster cluster, String topicName) {
analysisTasksStore.cancelAnalysis(new TopicIdentity(cluster, topicName));
}

public Optional<TopicAnalysisDTO> getTopicAnalysis(KafkaCluster cluster, String topicName) {
return analysisTasksStore.getTopicAnalysis(new TopicIdentity(cluster, topicName));
}

class AnalysisTask implements Runnable, Closeable {

private final Instant startedAt = Instant.now();

private final TopicIdentity topicId;
private final int partitionsCnt;
private final long approxNumberOfMsgs;

private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();

private final KafkaConsumer<Bytes, Bytes> consumer;

AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, long approxNumberOfMsgs) {
this.topicId = topicId;
this.approxNumberOfMsgs = approxNumberOfMsgs;
this.partitionsCnt = partitionsCnt;
this.consumer = consumerGroupService.createConsumer(
cluster,
// to improve polling throughput
Map.of(
ConsumerConfig.RECEIVE_BUFFER_CONFIG, "-1", //let OS tune buffer size
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
)
);
}

@Override
public void close() {
consumer.wakeup();
}

@Override
public void run() {
try {
log.info("Starting {} topic analysis", topicId);
var topicPartitions = IntStream.range(0, partitionsCnt)
.peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
.mapToObj(i -> new TopicPartition(topicId.topicName, i))
.collect(Collectors.toList());

consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);

var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions);
for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < 3; ) {
var polled = consumer.poll(Duration.ofSeconds(3));
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
polled.forEach(r -> {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);
waitingOffsets.markPolled(r);
});
updateProgress();
}
analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
log.info("{} topic analysis finished", topicId);
} catch (WakeupException | InterruptException cancelException) {
log.info("{} topic analysis stopped", topicId);
// calling cancel for cases when our thread was interrupted by some non-user cancellation reason
analysisTasksStore.cancelAnalysis(topicId);
} catch (Throwable th) {
log.error("Error analyzing topic {}", topicId, th);
analysisTasksStore.setAnalysisError(topicId, startedAt, th);
} finally {
consumer.close();
}
}

private void updateProgress() {
if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
analysisTasksStore.updateProgress(
topicId,
totalStats.totalMsgs,
totalStats.keysSize.sum + totalStats.valuesSize.sum,
Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
);
}
}
}
}
Loading