Skip to content

Commit

Permalink
[feat](stats) Limit job record apache#25576 (apache#26251)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kikyou1997 authored Nov 1, 2023
1 parent 6e8a2d9 commit 97b19c7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2168,7 +2168,7 @@ public class Config extends ConfigBase {
"控制统计信息的自动触发作业执行记录的持久化行数",
"Determine the persist number of automatic triggered analyze job execution status"
})
public static long auto_analyze_job_record_count = 20000;
public static long analyze_record_limit = 20000;

@ConfField(description = {
"Auto Buckets中最小的buckets数目",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
Expand All @@ -112,10 +113,12 @@ public class AnalysisManager extends Daemon implements Writable {
private AnalysisTaskExecutor taskExecutor;

// Store task information in metadata.
private final Map<Long, AnalysisInfo> analysisTaskInfoMap = Collections.synchronizedMap(new TreeMap<>());
private final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());

// Store job information in metadata
private final Map<Long, AnalysisInfo> analysisJobInfoMap = Collections.synchronizedMap(new TreeMap<>());
// Store job information in metadata.
private final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());

// Tracking system submitted job, keep in mem only
protected final Map<Long, AnalysisInfo> systemJobInfoMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -818,10 +821,16 @@ private BaseAnalysisTask createTask(AnalysisInfo analysisInfo) throws DdlExcepti
}

public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
}
this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}

public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
}
this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}

Expand Down Expand Up @@ -1074,7 +1083,7 @@ public void replayPersistSysJob(AnalysisInfo analysisInfo) {

protected SimpleQueue<AnalysisInfo> createSimpleQueue(Collection<AnalysisInfo> collection,
AnalysisManager analysisManager) {
return new SimpleQueue<>(Config.auto_analyze_job_record_count,
return new SimpleQueue<>(Config.analyze_record_limit,
a -> {
// FE is not ready when replaying log and operations triggered by replaying
// shouldn't be logged again.
Expand Down

0 comments on commit 97b19c7

Please sign in to comment.