Skip to content

Commit 396341d

Browse files
Ilya KuramshiniliaxHaarolean
authored
Backend: topic analysis (#1965)
* Topic Analyzer implementation * small cleanup * hourly stats added * imports fix * compilation fixes * PR fixes * PR fixes (renaming) * tests compilation fix * apis improved * checkstyle fix * renaming * node version rollback * Fix naming Signed-off-by: Roman Zabaluev <rzabaluev@provectus.com> Co-authored-by: iliax <ikuramshin@provectus.com> Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
1 parent e4dc113 commit 396341d

File tree

12 files changed

+739
-2
lines changed

12 files changed

+739
-2
lines changed

kafka-ui-api/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,11 @@
212212
<artifactId>groovy-json</artifactId>
213213
<version>${groovy.version}</version>
214214
</dependency>
215+
<dependency>
216+
<groupId>org.apache.datasketches</groupId>
217+
<artifactId>datasketches-java</artifactId>
218+
<version>${datasketches-java.version}</version>
219+
</dependency>
215220

216221
</dependencies>
217222

kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.provectus.kafka.ui.model.ReplicationFactorChangeDTO;
1212
import com.provectus.kafka.ui.model.ReplicationFactorChangeResponseDTO;
1313
import com.provectus.kafka.ui.model.SortOrderDTO;
14+
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
1415
import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
1516
import com.provectus.kafka.ui.model.TopicConfigDTO;
1617
import com.provectus.kafka.ui.model.TopicCreationDTO;
@@ -19,6 +20,7 @@
1920
import com.provectus.kafka.ui.model.TopicUpdateDTO;
2021
import com.provectus.kafka.ui.model.TopicsResponseDTO;
2122
import com.provectus.kafka.ui.service.TopicsService;
23+
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
2224
import java.util.Comparator;
2325
import java.util.List;
2426
import javax.validation.Valid;
@@ -40,6 +42,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
4042
private static final Integer DEFAULT_PAGE_SIZE = 25;
4143

4244
private final TopicsService topicsService;
45+
private final TopicAnalysisService topicAnalysisService;
4346
private final ClusterMapper clusterMapper;
4447

4548
@Override
@@ -181,4 +184,29 @@ public Mono<ResponseEntity<ReplicationFactorChangeResponseDTO>> changeReplicatio
181184
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
182185
.map(ResponseEntity::ok);
183186
}
187+
188+
@Override
189+
public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
190+
return topicAnalysisService.analyze(getCluster(clusterName), topicName)
191+
.thenReturn(ResponseEntity.ok().build());
192+
}
193+
194+
@Override
195+
public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
196+
ServerWebExchange exchange) {
197+
topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
198+
return Mono.just(ResponseEntity.ok().build());
199+
}
200+
201+
202+
@Override
203+
public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterName,
204+
String topicName,
205+
ServerWebExchange exchange) {
206+
return Mono.just(
207+
topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
208+
.map(ResponseEntity::ok)
209+
.orElseGet(() -> ResponseEntity.notFound().build())
210+
);
211+
}
184212
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ public enum ErrorCode {
2626
INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST),
2727
RECREATE_TOPIC_TIMEOUT(4015, HttpStatus.REQUEST_TIMEOUT),
2828
INVALID_ENTITY_STATE(4016, HttpStatus.BAD_REQUEST),
29-
SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR);
29+
SCHEMA_NOT_DELETED(4017, HttpStatus.INTERNAL_SERVER_ERROR),
30+
TOPIC_ANALYSIS_ERROR(4018, HttpStatus.BAD_REQUEST);
3031

3132
static {
3233
// codes uniqueness check
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.provectus.kafka.ui.exception;
2+
3+
public class TopicAnalysisException extends CustomBaseException {
4+
5+
public TopicAnalysisException(String message) {
6+
super(message);
7+
}
8+
9+
@Override
10+
public ErrorCode getErrorCode() {
11+
return ErrorCode.TOPIC_ANALYSIS_ERROR;
12+
}
13+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package com.provectus.kafka.ui.service.analyze;
2+
3+
import com.google.common.base.Throwables;
4+
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
5+
import com.provectus.kafka.ui.model.TopicAnalysisProgressDTO;
6+
import com.provectus.kafka.ui.model.TopicAnalysisResultDTO;
7+
import java.io.Closeable;
8+
import java.math.BigDecimal;
9+
import java.time.Instant;
10+
import java.util.Map;
11+
import java.util.Optional;
12+
import java.util.concurrent.ConcurrentHashMap;
13+
import java.util.stream.Collectors;
14+
import javax.annotation.Nullable;
15+
import lombok.Builder;
16+
import lombok.SneakyThrows;
17+
import lombok.Value;
18+
19+
class AnalysisTasksStore {
20+
21+
private final Map<TopicIdentity, RunningAnalysis> running = new ConcurrentHashMap<>();
22+
private final Map<TopicIdentity, TopicAnalysisResultDTO> completed = new ConcurrentHashMap<>();
23+
24+
void setAnalysisError(TopicIdentity topicId,
25+
Instant collectionStartedAt,
26+
Throwable th) {
27+
running.remove(topicId);
28+
completed.put(
29+
topicId,
30+
new TopicAnalysisResultDTO()
31+
.startedAt(collectionStartedAt.toEpochMilli())
32+
.finishedAt(System.currentTimeMillis())
33+
.error(Throwables.getStackTraceAsString(th))
34+
);
35+
}
36+
37+
void setAnalysisResult(TopicIdentity topicId,
38+
Instant collectionStartedAt,
39+
TopicAnalysisStats totalStats,
40+
Map<Integer, TopicAnalysisStats> partitionStats) {
41+
running.remove(topicId);
42+
completed.put(topicId,
43+
new TopicAnalysisResultDTO()
44+
.startedAt(collectionStartedAt.toEpochMilli())
45+
.finishedAt(System.currentTimeMillis())
46+
.totalStats(totalStats.toDto(null))
47+
.partitionStats(
48+
partitionStats.entrySet().stream()
49+
.map(e -> e.getValue().toDto(e.getKey()))
50+
.collect(Collectors.toList())
51+
));
52+
}
53+
54+
void updateProgress(TopicIdentity topicId,
55+
long msgsScanned,
56+
long bytesScanned,
57+
Double completeness) {
58+
running.computeIfPresent(topicId, (k, state) ->
59+
state.toBuilder()
60+
.msgsScanned(msgsScanned)
61+
.bytesScanned(bytesScanned)
62+
.completenessPercent(completeness)
63+
.build());
64+
}
65+
66+
void registerNewTask(TopicIdentity topicId, Closeable task) {
67+
running.put(topicId, new RunningAnalysis(Instant.now(), 0.0, 0, 0, task));
68+
}
69+
70+
void cancelAnalysis(TopicIdentity topicId) {
71+
Optional.ofNullable(running.remove(topicId))
72+
.ifPresent(RunningAnalysis::stopTask);
73+
}
74+
75+
boolean isAnalysisInProgress(TopicIdentity id) {
76+
return running.containsKey(id);
77+
}
78+
79+
Optional<TopicAnalysisDTO> getTopicAnalysis(TopicIdentity id) {
80+
var runningState = running.get(id);
81+
var completedState = completed.get(id);
82+
if (runningState == null && completedState == null) {
83+
return Optional.empty();
84+
}
85+
return Optional.of(createAnalysisDto(runningState, completedState));
86+
}
87+
88+
private TopicAnalysisDTO createAnalysisDto(@Nullable RunningAnalysis runningState,
89+
@Nullable TopicAnalysisResultDTO completedState) {
90+
return new TopicAnalysisDTO()
91+
.progress(runningState != null ? runningState.toDto() : null)
92+
.result(completedState);
93+
}
94+
95+
@Value
96+
@Builder(toBuilder = true)
97+
private static class RunningAnalysis {
98+
Instant startedAt;
99+
double completenessPercent;
100+
long msgsScanned;
101+
long bytesScanned;
102+
Closeable task;
103+
104+
TopicAnalysisProgressDTO toDto() {
105+
return new TopicAnalysisProgressDTO()
106+
.startedAt(startedAt.toEpochMilli())
107+
.bytesScanned(bytesScanned)
108+
.msgsScanned(msgsScanned)
109+
.completenessPercent(BigDecimal.valueOf(completenessPercent));
110+
}
111+
112+
@SneakyThrows
113+
void stopTask() {
114+
task.close();
115+
}
116+
}
117+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package com.provectus.kafka.ui.service.analyze;
2+
3+
import com.provectus.kafka.ui.exception.TopicAnalysisException;
4+
import com.provectus.kafka.ui.model.KafkaCluster;
5+
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
6+
import com.provectus.kafka.ui.service.ConsumerGroupService;
7+
import com.provectus.kafka.ui.service.TopicsService;
8+
import com.provectus.kafka.ui.util.OffsetsSeek.WaitingOffsets;
9+
import java.io.Closeable;
10+
import java.time.Duration;
11+
import java.time.Instant;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.Optional;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.IntStream;
17+
import lombok.RequiredArgsConstructor;
18+
import lombok.extern.slf4j.Slf4j;
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.clients.consumer.KafkaConsumer;
21+
import org.apache.kafka.common.TopicPartition;
22+
import org.apache.kafka.common.errors.InterruptException;
23+
import org.apache.kafka.common.errors.WakeupException;
24+
import org.apache.kafka.common.utils.Bytes;
25+
import org.springframework.stereotype.Component;
26+
import reactor.core.publisher.Mono;
27+
import reactor.core.scheduler.Schedulers;
28+
29+
30+
@Slf4j
31+
@Component
32+
@RequiredArgsConstructor
33+
public class TopicAnalysisService {
34+
35+
private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
36+
37+
private final TopicsService topicsService;
38+
private final ConsumerGroupService consumerGroupService;
39+
40+
public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
41+
return topicsService.getTopicDetails(cluster, topicName)
42+
.doOnNext(topic ->
43+
startAnalysis(
44+
cluster,
45+
topicName,
46+
topic.getPartitionCount(),
47+
topic.getPartitions().values()
48+
.stream()
49+
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
50+
.sum()
51+
)
52+
).then();
53+
}
54+
55+
private synchronized void startAnalysis(KafkaCluster cluster,
56+
String topic,
57+
int partitionsCnt,
58+
long approxNumberOfMsgs) {
59+
var topicId = new TopicIdentity(cluster, topic);
60+
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
61+
throw new TopicAnalysisException("Topic is already analyzing");
62+
}
63+
var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs);
64+
analysisTasksStore.registerNewTask(topicId, task);
65+
Schedulers.boundedElastic().schedule(task);
66+
}
67+
68+
public void cancelAnalysis(KafkaCluster cluster, String topicName) {
69+
analysisTasksStore.cancelAnalysis(new TopicIdentity(cluster, topicName));
70+
}
71+
72+
public Optional<TopicAnalysisDTO> getTopicAnalysis(KafkaCluster cluster, String topicName) {
73+
return analysisTasksStore.getTopicAnalysis(new TopicIdentity(cluster, topicName));
74+
}
75+
76+
class AnalysisTask implements Runnable, Closeable {
77+
78+
private final Instant startedAt = Instant.now();
79+
80+
private final TopicIdentity topicId;
81+
private final int partitionsCnt;
82+
private final long approxNumberOfMsgs;
83+
84+
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
85+
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
86+
87+
private final KafkaConsumer<Bytes, Bytes> consumer;
88+
89+
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt, long approxNumberOfMsgs) {
90+
this.topicId = topicId;
91+
this.approxNumberOfMsgs = approxNumberOfMsgs;
92+
this.partitionsCnt = partitionsCnt;
93+
this.consumer = consumerGroupService.createConsumer(
94+
cluster,
95+
// to improve polling throughput
96+
Map.of(
97+
ConsumerConfig.RECEIVE_BUFFER_CONFIG, "-1", //let OS tune buffer size
98+
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
99+
)
100+
);
101+
}
102+
103+
@Override
104+
public void close() {
105+
consumer.wakeup();
106+
}
107+
108+
@Override
109+
public void run() {
110+
try {
111+
log.info("Starting {} topic analysis", topicId);
112+
var topicPartitions = IntStream.range(0, partitionsCnt)
113+
.peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
114+
.mapToObj(i -> new TopicPartition(topicId.topicName, i))
115+
.collect(Collectors.toList());
116+
117+
consumer.assign(topicPartitions);
118+
consumer.seekToBeginning(topicPartitions);
119+
120+
var waitingOffsets = new WaitingOffsets(topicId.topicName, consumer, topicPartitions);
121+
for (int emptyPolls = 0; !waitingOffsets.endReached() && emptyPolls < 3; ) {
122+
var polled = consumer.poll(Duration.ofSeconds(3));
123+
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
124+
polled.forEach(r -> {
125+
totalStats.apply(r);
126+
partitionStats.get(r.partition()).apply(r);
127+
waitingOffsets.markPolled(r);
128+
});
129+
updateProgress();
130+
}
131+
analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
132+
log.info("{} topic analysis finished", topicId);
133+
} catch (WakeupException | InterruptException cancelException) {
134+
log.info("{} topic analysis stopped", topicId);
135+
// calling cancel for cases when our thread was interrupted by some non-user cancellation reason
136+
analysisTasksStore.cancelAnalysis(topicId);
137+
} catch (Throwable th) {
138+
log.error("Error analyzing topic {}", topicId, th);
139+
analysisTasksStore.setAnalysisError(topicId, startedAt, th);
140+
} finally {
141+
consumer.close();
142+
}
143+
}
144+
145+
private void updateProgress() {
146+
if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
147+
analysisTasksStore.updateProgress(
148+
topicId,
149+
totalStats.totalMsgs,
150+
totalStats.keysSize.sum + totalStats.valuesSize.sum,
151+
Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
152+
);
153+
}
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)