Skip to content

Commit

Permalink
[Feature] Support dynamic modification of datacache.partition_duration (
Browse files Browse the repository at this point in the history
#32844)

Signed-off-by: stonechen111 <chenshi@starrocks.com>
  • Loading branch information
stonechen111 authored Nov 15, 2023
1 parent 35cd77e commit 24da46d
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,9 @@ public int getAsInt() {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TTL)) {
GlobalStateMgr.getCurrentState().alterTableProperties(db, olapTable, properties);
return null;
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION)) {
GlobalStateMgr.getCurrentState().alterTableProperties(db, olapTable, properties);
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public TableProperty buildProperty(short opCode) {
break;
case OperationType.OP_ALTER_TABLE_PROPERTIES:
buildPartitionLiveNumber();
buildDataCachePartitionDuration();
break;
case OperationType.OP_MODIFY_TABLE_CONSTRAINT_PROPERTY:
buildConstraint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,11 @@ public static PeriodDuration analyzeDataCachePartitionDuration(Map<String, Strin
return null;
}
properties.remove(PROPERTIES_DATACACHE_PARTITION_DURATION);
return TimeUtils.parseHumanReadablePeriodOrDuration(text);
try {
return TimeUtils.parseHumanReadablePeriodOrDuration(text);
} catch (DateTimeParseException ex) {
throw new AnalysisException(ex.getMessage());
}
}

public static TPersistentIndexType analyzePersistentIndexType(Map<String, String> properties) throws AnalysisException {
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4010,6 +4010,13 @@ public void alterTableProperties(Database db, OlapTable table, Map<String, Strin
throw new RuntimeException(ex.getMessage());
}
}
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION)) {
try {
PropertyAnalyzer.analyzeDataCachePartitionDuration(properties);
} catch (AnalysisException ex) {
throw new RuntimeException(ex.getMessage());
}
}
if (!properties.isEmpty()) {
throw new DdlException("Modify failed because unknown properties: " + properties);
}
Expand All @@ -4028,6 +4035,10 @@ public void alterTableProperties(Database db, OlapTable table, Map<String, Strin
String storageCoolDownTTL = logProperties.get(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TTL);
tableProperty.getProperties().put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TTL, storageCoolDownTTL);
tableProperty.buildStorageCoolDownTTL();
} else if (logProperties.containsKey(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION)) {
String partitionDuration = logProperties.get(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION);
tableProperty.getProperties().put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, partitionDuration);
tableProperty.buildDataCachePartitionDuration();
} else {
throw new DdlException("Modify failed because unknown properties: " + properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.starrocks.common.ErrorReport;
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.common.util.WriteQuorum;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.RunMode;
Expand Down Expand Up @@ -74,6 +75,7 @@
import com.starrocks.sql.ast.RollupRenameClause;
import com.starrocks.sql.ast.TableRenameClause;

import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -274,6 +276,14 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
|| properties.containsKey(PropertyAnalyzer.PROPERTIES_UNIQUE_CONSTRAINT)) {
clause.setNeedTableStable(false);
clause.setOpType(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION)) {
try {
TimeUtils.parseHumanReadablePeriodOrDuration(
properties.get(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION));
} catch (DateTimeParseException e) {
ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR, e.getMessage());
}
clause.setNeedTableStable(false);
} else {
ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR, "Unknown properties: " + properties);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.alter;

import com.staros.proto.FileCacheInfo;
import com.staros.proto.FilePathInfo;
import com.staros.proto.FileStoreInfo;
import com.staros.proto.FileStoreType;
import com.staros.proto.S3FileStoreInfo;
import com.starrocks.catalog.AggregateType;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DataProperty;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DistributionInfo;
import com.starrocks.catalog.HashDistributionInfo;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Tablet;
import com.starrocks.catalog.TabletMeta;
import com.starrocks.catalog.Type;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.lake.DataCacheInfo;
import com.starrocks.lake.LakeTable;
import com.starrocks.lake.LakeTablet;
import com.starrocks.lake.StarOSAgent;
import com.starrocks.persist.EditLog;
import com.starrocks.persist.ModifyTablePropertyOperationLog;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.AlterClause;
import com.starrocks.sql.ast.ModifyTablePropertiesClause;
import com.starrocks.thrift.TStorageMedium;
import com.starrocks.thrift.TStorageType;
import mockit.Mock;
import mockit.MockUp;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;

public class LakeTableAlterDataCachePartitionDurationTest {
private static final int NUM_BUCKETS = 4;
private ConnectContext connectContext;
private AlterJobV2 alterMetaJob;
private Database db;
private LakeTable table;
private List<Long> shadowTabletIds = new ArrayList<>();

public LakeTableAlterDataCachePartitionDurationTest() {
connectContext = new ConnectContext(null);
connectContext.setStartTime();
connectContext.setThreadLocalInfo();
}

@Before
public void before() throws Exception {
FeConstants.runningUnitTest = true;
new MockUp<StarOSAgent>() {
@Mock
public List<Long> createShards(int shardCount, FilePathInfo path, FileCacheInfo cache, long groupId,
List<Long> matchShardIds, Map<String, String> properties)
throws DdlException {
for (int i = 0; i < shardCount; i++) {
shadowTabletIds.add(GlobalStateMgr.getCurrentState().getNextId());
}
return shadowTabletIds;
}
};

new MockUp<EditLog>() {
@Mock
public void logAlterTableProperties(ModifyTablePropertyOperationLog info) {

}

@Mock
public void logSaveNextId(long nextId) {

}
};

GlobalStateMgr.getCurrentState().setEditLog(new EditLog(new ArrayBlockingQueue<>(100)));
final long dbId = GlobalStateMgr.getCurrentState().getNextId();
final long partitionId = GlobalStateMgr.getCurrentState().getNextId();
final long tableId = GlobalStateMgr.getCurrentState().getNextId();
final long indexId = GlobalStateMgr.getCurrentState().getNextId();

GlobalStateMgr.getCurrentState().setStarOSAgent(new StarOSAgent());

KeysType keysType = KeysType.DUP_KEYS;
db = new Database(dbId, "db0");

Database oldDb = GlobalStateMgr.getCurrentState().getIdToDb().putIfAbsent(db.getId(), db);
Assert.assertNull(oldDb);

Column c0 = new Column("c0", Type.INT, true, AggregateType.NONE, false, null, null);
DistributionInfo dist = new HashDistributionInfo(NUM_BUCKETS, Collections.singletonList(c0));
PartitionInfo partitionInfo = new RangePartitionInfo(Collections.singletonList(c0));
partitionInfo.setDataProperty(partitionId, DataProperty.DEFAULT_DATA_PROPERTY);

table = new LakeTable(tableId, "t0", Collections.singletonList(c0), keysType, partitionInfo, dist);
MaterializedIndex index = new MaterializedIndex(indexId, MaterializedIndex.IndexState.NORMAL);
Partition partition = new Partition(partitionId, "t0", index, dist);
TStorageMedium storage = TStorageMedium.HDD;
TabletMeta tabletMeta = new TabletMeta(db.getId(), table.getId(), partition.getId(), index.getId(), 0, storage, true);
for (int i = 0; i < NUM_BUCKETS; i++) {
Tablet tablet = new LakeTablet(GlobalStateMgr.getCurrentState().getNextId());
index.addTablet(tablet, tabletMeta);
}
table.addPartition(partition);

table.setIndexMeta(index.getId(), "t0", Collections.singletonList(c0), 0, 0, (short) 1, TStorageType.COLUMN, keysType);
table.setBaseIndexId(index.getId());

FilePathInfo.Builder builder = FilePathInfo.newBuilder();
FileStoreInfo.Builder fsBuilder = builder.getFsInfoBuilder();

S3FileStoreInfo.Builder s3FsBuilder = fsBuilder.getS3FsInfoBuilder();
s3FsBuilder.setBucket("test-bucket");
s3FsBuilder.setRegion("test-region");
S3FileStoreInfo s3FsInfo = s3FsBuilder.build();

fsBuilder.setFsType(FileStoreType.S3);
fsBuilder.setFsKey("test-bucket");
fsBuilder.setS3FsInfo(s3FsInfo);
FileStoreInfo fsInfo = fsBuilder.build();

builder.setFsInfo(fsInfo);
builder.setFullPath("s3://test-bucket/object-1");
FilePathInfo pathInfo = builder.build();

table.setStorageInfo(pathInfo, new DataCacheInfo(false, false));
DataCacheInfo dataCacheInfo = new DataCacheInfo(false, false);
partitionInfo.setDataCacheInfo(partitionId, dataCacheInfo);

db.registerTableUnlocked(table);
}

@After
public void after() throws Exception {
db.dropTable(table.getName());
}

@Test
public void testModifyDataCachePartitionDurationAbourtMonths() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, "7 months");
ModifyTablePropertiesClause modify = new ModifyTablePropertiesClause(properties);
SchemaChangeHandler schemaChangeHandler = new SchemaChangeHandler();

List<AlterClause> alterList = Collections.singletonList(modify);
alterMetaJob = schemaChangeHandler.analyzeAndCreateJob(alterList, db, table);
table.setState(OlapTable.OlapTableState.SCHEMA_CHANGE);

Assert.assertEquals("7 months", TimeUtils.toHumanReadableString(
table.getTableProperty().getDataCachePartitionDuration()));

}

@Test
public void testModifyDataCachePartitionDurationAbourtDays() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, "2 days");
ModifyTablePropertiesClause modify = new ModifyTablePropertiesClause(properties);
SchemaChangeHandler schemaChangeHandler = new SchemaChangeHandler();

List<AlterClause> alterList = Collections.singletonList(modify);
alterMetaJob = schemaChangeHandler.analyzeAndCreateJob(alterList, db, table);
table.setState(OlapTable.OlapTableState.SCHEMA_CHANGE);

Assert.assertEquals("2 days", TimeUtils.toHumanReadableString(
table.getTableProperty().getDataCachePartitionDuration()));

}

@Test
public void testModifyDataCachePartitionDurationAboutHours() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, "4 hours");
ModifyTablePropertiesClause modify = new ModifyTablePropertiesClause(properties);
SchemaChangeHandler schemaChangeHandler = new SchemaChangeHandler();

List<AlterClause> alterList = Collections.singletonList(modify);
alterMetaJob = schemaChangeHandler.analyzeAndCreateJob(alterList, db, table);
table.setState(OlapTable.OlapTableState.SCHEMA_CHANGE);

Assert.assertEquals("4 hours", TimeUtils.toHumanReadableString(
table.getTableProperty().getDataCachePartitionDuration()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ public void testFilePathInfo() {
Assert.assertNull(olapTable.getDefaultFilePathInfo());
Assert.assertNull(olapTable.getPartitionFilePathInfo(10));
}

@Test
public void testNullDataCachePartitionDuration() {
OlapTable olapTable = new OlapTable();
Assert.assertNull(olapTable.getTableProperty() == null ? null :
olapTable.getTableProperty().getDataCachePartitionDuration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.starrocks.catalog;

import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.persist.OperationType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -72,4 +74,27 @@ public void testNormal() throws IOException {
Assert.assertEquals(readDynamicPartitionProperty.getTimeUnit(), dynamicPartitionProperty.getTimeUnit());
in.close();
}


@Test
public void testBuildDataCachePartitionDuration() throws IOException {
// 1. Write objects to file
File file = new File(fileName);
file.createNewFile();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));

HashMap<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, "3 month");
TableProperty tableProperty = new TableProperty(properties);
tableProperty.write(out);
out.flush();
out.close();

// 2. Read objects from file
DataInputStream in = new DataInputStream(new FileInputStream(file));
TableProperty readTableProperty = TableProperty.read(in);
Assert.assertNotNull(readTableProperty.buildProperty(OperationType.OP_ALTER_TABLE_PROPERTIES));
in.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,25 @@ public void testAnalyzeDataCacheInfo() {
Assert.assertEquals("enable_async_write_back is disabled since version 3.1.4", e.getMessage());
}
}

@Test
public void testAnalyzeDataCachePartitionDuration() {
Map<String, String> properties = new HashMap<>();
properties.put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, "7 day");

try {
PropertyAnalyzer.analyzeDataCachePartitionDuration(properties);
} catch (AnalysisException e) {
Assert.assertTrue(false);
}

Assert.assertTrue(properties.size() == 0);
properties.put(PropertyAnalyzer.PROPERTIES_DATACACHE_PARTITION_DURATION, "abcd");
try {
PropertyAnalyzer.analyzeDataCachePartitionDuration(properties);
Assert.assertTrue(false);
} catch (AnalysisException e) {
Assert.assertEquals("Cannot parse text to Duration", e.getMessage());
}
}
}
Loading

0 comments on commit 24da46d

Please sign in to comment.