Skip to content

Commit

Permalink
[Enhancement] adjust lake compaction history output (#47594)
Browse files Browse the repository at this point in the history
Signed-off-by: starrocks-xupeng <xupeng@starrocks.com>
  • Loading branch information
starrocks-xupeng authored Jul 1, 2024
1 parent e9757c5 commit 526276c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 36 deletions.
11 changes: 1 addition & 10 deletions docs/en/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2992,22 +2992,13 @@ ADMIN SET FRONTEND CONFIG ("key" = "value");

##### lake_compaction_history_size

- Default: 12
- Default: 20
- Type: Int
- Unit: -
- Is mutable: Yes
- Description: The number of recent successful Compaction task records to keep in the memory of the Leader FE node in a shared-data cluster. You can view recent successful Compaction task records using the `SHOW PROC '/compactions'` command. Note that the Compaction history is stored in the FE process memory, and it will be lost if the FE process is restarted.
- Introduced in: v3.1.0

##### lake_compaction_fail_history_size

- Default: 12
- Type: Int
- Unit: -
- Is mutable: Yes
- Description: The number of recent failed Compaction task records to keep in the memory of the Leader FE node in a shared-data cluster. You can view recent failed Compaction task records using the `SHOW PROC '/compactions'` command. Note that the Compaction history is stored in the FE process memory, and it will be lost if the FE process is restarted.
- Introduced in: v3.1.0

##### lake_publish_version_max_threads

- Default: 512
Expand Down
11 changes: 1 addition & 10 deletions docs/zh/administration/management/FE_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2994,22 +2994,13 @@ Compaction Score 代表了一个表分区是否值得进行 Compaction 的评分

##### lake_compaction_history_size

- 默认值:12
- 默认值:20
- 类型:Int
- 单位:-
- 是否动态:是
- 描述:存算分离集群下在 Leader FE 节点内存中保留多少条最近成功的 Compaction 任务历史记录。您可以通过 `SHOW PROC '/compactions'` 命令查看最近成功的 Compaction 任务记录。请注意,Compaction 历史记录是保存在 FE 进程内存中的,FE 进程重启后历史记录会丢失。
- 引入版本:v3.1.0

##### lake_compaction_fail_history_size

- 默认值:12
- 类型:Int
- 单位:-
- 是否动态:是
- 描述:存算分离集群下在 Leader FE 节点内存中保留多少条最近失败的 Compaction 任务历史记录。您可以通过 `SHOW PROC '/compactions'` 命令查看最近失败的 Compaction 任务记录。请注意,Compaction 历史记录是保存在 FE 进程内存中的,FE 进程重启后历史记录会丢失。
- 引入版本:v3.1.0

##### lake_publish_version_max_threads

- 默认值:512
Expand Down
5 changes: 1 addition & 4 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2523,10 +2523,7 @@ public class Config extends ConfigBase {
public static int lake_compaction_max_tasks = -1;

@ConfField(mutable = true)
public static int lake_compaction_history_size = 12;

@ConfField(mutable = true)
public static int lake_compaction_fail_history_size = 12;
public static int lake_compaction_history_size = 20;

// e.g. "tableId1;tableId2"
@ConfField(mutable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.starrocks.lake.compaction;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
Expand Down Expand Up @@ -53,6 +52,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -78,7 +78,6 @@ public class CompactionScheduler extends Daemon {
private final GlobalStateMgr stateMgr;
private final ConcurrentHashMap<PartitionIdentifier, CompactionJob> runningCompactions;
private final SynchronizedCircularQueue<CompactionRecord> history;
private final SynchronizedCircularQueue<CompactionRecord> failHistory;
private boolean finishedWaiting = false;
private long waitTxnId = -1;
private long lastPartitionCleanTime;
Expand All @@ -95,7 +94,6 @@ public class CompactionScheduler extends Daemon {
this.runningCompactions = new ConcurrentHashMap<>();
this.lastPartitionCleanTime = System.currentTimeMillis();
this.history = new SynchronizedCircularQueue<>(Config.lake_compaction_history_size);
this.failHistory = new SynchronizedCircularQueue<>(Config.lake_compaction_fail_history_size);
this.disabledTables = Collections.unmodifiableSet(new HashSet<>());

disableTables(disableTablesStr);
Expand All @@ -113,7 +111,6 @@ protected void runOneCycle() {
if (stateMgr.isLeader() && stateMgr.isReady() && allCommittedCompactionsBeforeRestartHaveFinished()) {
schedule();
history.changeMaxSize(Config.lake_compaction_history_size);
failHistory.changeMaxSize(Config.lake_compaction_fail_history_size);
}
}

Expand Down Expand Up @@ -177,7 +174,7 @@ private void schedule() {
if (errorMsg != null) {
iterator.remove();
job.finish();
failHistory.offer(CompactionRecord.build(job, errorMsg));
history.offer(CompactionRecord.build(job, errorMsg));
compactionManager.enableCompactionAfter(partition, MIN_COMPACTION_INTERVAL_MS_ON_FAILURE);
abortTransactionIgnoreException(job, errorMsg);
continue;
Expand Down Expand Up @@ -325,7 +322,7 @@ private CompactionJob startCompaction(PartitionIdentifier partitionIdentifier) {
nextCompactionInterval = MIN_COMPACTION_INTERVAL_MS_ON_FAILURE;
abortTransactionIgnoreError(job, e.getMessage());
job.finish();
failHistory.offer(CompactionRecord.build(job, e.getMessage()));
history.offer(CompactionRecord.build(job, e.getMessage()));
return null;
} finally {
compactionManager.enableCompactionAfter(partitionIdentifier, nextCompactionInterval);
Expand Down Expand Up @@ -438,15 +435,21 @@ private void abortTransactionIgnoreError(CompactionJob job, String reason) {
}
}

// get running compaction and history compaction, sorted by start time
@NotNull
List<CompactionRecord> getHistory() {
ImmutableList.Builder<CompactionRecord> builder = ImmutableList.builder();
history.forEach(builder::add);
failHistory.forEach(builder::add);
for (CompactionJob job : runningCompactions.values()) {
builder.add(CompactionRecord.build(job));
List<CompactionRecord> list = new ArrayList<>();
history.forEach(list::add);
for (CompactionJob job : getRunningCompactions().values()) {
list.add(CompactionRecord.build(job));
}
return builder.build();
Collections.sort(list, new Comparator<CompactionRecord>() {
@Override
public int compare(CompactionRecord l, CompactionRecord r) {
return l.getStartTs() > r.getStartTs() ? -1 : (l.getStartTs() < r.getStartTs()) ? 1 : 0;
}
});
return list;
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.lake.LakeTable;
import com.starrocks.lake.LakeTablet;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
Expand All @@ -35,6 +40,7 @@
import org.junit.Test;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -147,4 +153,37 @@ public void testDisableTableCompaction() {
compactionScheduler.disableTables("");
Assert.assertFalse(compactionScheduler.isTableDisabled(23456L));
}

@Test
public void testGetHistory() {
CompactionMgr compactionManager = new CompactionMgr();
CompactionScheduler compactionScheduler =
new CompactionScheduler(compactionManager, GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(),
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr(), GlobalStateMgr.getCurrentState(), "");
new MockUp<CompactionScheduler>() {
@Mock
public ConcurrentHashMap<PartitionIdentifier, CompactionJob> getRunningCompactions() {
ConcurrentHashMap<PartitionIdentifier, CompactionJob> r = new ConcurrentHashMap<>();
Database db = new Database();
Table table = new LakeTable();
PartitionIdentifier partitionIdentifier1 = new PartitionIdentifier(1, 2, 3);
PartitionIdentifier partitionIdentifier2 = new PartitionIdentifier(1, 2, 4);
PhysicalPartition partition1 = new Partition(123, "aaa", null, null);
PhysicalPartition partition2 = new Partition(124, "bbb", null, null);
CompactionJob job1 = new CompactionJob(db, table, partition1, 100, false);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
CompactionJob job2 = new CompactionJob(db, table, partition2, 101, false);
r.put(partitionIdentifier1, job1);
r.put(partitionIdentifier2, job2);
return r;
}
};

List<CompactionRecord> list = compactionScheduler.getHistory();
Assert.assertEquals(2, list.size());
Assert.assertTrue(list.get(0).getStartTs() >= list.get(1).getStartTs());
}
}

0 comments on commit 526276c

Please sign in to comment.