Skip to content

Commit

Permalink
[Enhancement] modify statistic cache expire-update logical (StarRocks…
Browse files Browse the repository at this point in the history
…#53344)

Signed-off-by: Seaven <seaven_7@qq.com>
  • Loading branch information
Seaven authored Dec 2, 2024
1 parent a6a57b8 commit a979bef
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.


package com.starrocks.sql.optimizer.statistics;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
Expand Down Expand Up @@ -128,17 +127,21 @@ public Map<Long, Optional<Long>> getTableStatistics(Long tableId, Collection<Par
}

@Override
public void refreshTableStatistic(Table table) {
public void refreshTableStatistic(Table table, boolean isSync) {
List<TableStatsCacheKey> statsCacheKeyList = new ArrayList<>();
for (Partition partition : table.getPartitions()) {
statsCacheKeyList.add(new TableStatsCacheKey(table.getId(), partition.getId()));
}

try {
CompletableFuture<Map<TableStatsCacheKey, Optional<Long>>> completableFuture
= tableStatsCache.getAll(statsCacheKeyList);
if (completableFuture.isDone()) {
completableFuture.get();
TableStatsCacheLoader loader = new TableStatsCacheLoader();
CompletableFuture<Map<TableStatsCacheKey, Optional<Long>>> future = loader.asyncLoadAll(statsCacheKeyList,
statsCacheRefresherExecutor);
if (isSync) {
Map<TableStatsCacheKey, Optional<Long>> result = future.get();
tableStatsCache.synchronous().putAll(result);
} else {
future.whenComplete((result, e) -> tableStatsCache.synchronous().putAll(result));
}
} catch (InterruptedException e) {
LOG.warn("Failed to execute refreshTableStatistic", e);
Expand All @@ -149,13 +152,34 @@ public void refreshTableStatistic(Table table) {
}

@Override
public void refreshTableStatisticSync(Table table) {
List<TableStatsCacheKey> statsCacheKeyList = new ArrayList<>();
for (Partition partition : table.getPartitions()) {
statsCacheKeyList.add(new TableStatsCacheKey(table.getId(), partition.getId()));
public void refreshColumnStatistics(Table table, List<String> columns, boolean isSync) {
Preconditions.checkState(table != null);

// get Statistics Table column info, just return default column statistics
if (StatisticUtils.statisticTableBlackListCheck(table.getId()) ||
!StatisticUtils.checkStatisticTableStateNormal()) {
return;
}

tableStatsCache.synchronous().getAll(statsCacheKeyList);
List<ColumnStatsCacheKey> cacheKeys = new ArrayList<>();
long tableId = table.getId();
for (String column : columns) {
cacheKeys.add(new ColumnStatsCacheKey(tableId, column));
}

try {
ColumnBasicStatsCacheLoader loader = new ColumnBasicStatsCacheLoader();
CompletableFuture<Map<ColumnStatsCacheKey, Optional<ColumnStatistic>>> future =
loader.asyncLoadAll(cacheKeys, statsCacheRefresherExecutor);
if (isSync) {
Map<ColumnStatsCacheKey, Optional<ColumnStatistic>> result = future.get();
columnStatistics.synchronous().putAll(result);
} else {
future.whenComplete((res, e) -> columnStatistics.synchronous().putAll(res));
}
} catch (Exception e) {
LOG.warn("Failed to refresh getColumnStatistics", e);
}
}

@Override
Expand Down Expand Up @@ -198,9 +222,11 @@ public List<ConnectorTableColumnStats> getConnectorTableStatistics(Table table,
realResult = result.get();
for (String column : columns) {
Optional<ConnectorTableColumnStats> columnStatistic =
realResult.getOrDefault(new ConnectorTableColumnKey(table.getUUID(), column), Optional.empty());
realResult.getOrDefault(new ConnectorTableColumnKey(table.getUUID(), column),
Optional.empty());
if (columnStatistic.isPresent()) {
columnStatistics.add(StatisticsUtils.estimateColumnStatistics(table, column, columnStatistic.get()));
columnStatistics.add(
StatisticsUtils.estimateColumnStatistics(table, column, columnStatistic.get()));
} else {
columnStatistics.add(ConnectorTableColumnStats.unknown());
}
Expand Down Expand Up @@ -262,6 +288,33 @@ public void expireConnectorTableColumnStatistics(Table table, List<String> colum
connectorTableCachedStatistics.synchronous().invalidateAll(allKeys);
}

@Override
public void refreshConnectorTableColumnStatistics(Table table, List<String> columns, boolean isSync) {
Preconditions.checkState(table != null);
if (!StatisticUtils.checkStatisticTableStateNormal()) {
return;
}

List<ConnectorTableColumnKey> cacheKeys = new ArrayList<>();
for (String column : columns) {
cacheKeys.add(new ConnectorTableColumnKey(table.getUUID(), column));
}

try {
ConnectorColumnStatsCacheLoader loader = new ConnectorColumnStatsCacheLoader();
CompletableFuture<Map<ConnectorTableColumnKey, Optional<ConnectorTableColumnStats>>> future =
loader.asyncLoadAll(cacheKeys, statsCacheRefresherExecutor);
if (isSync) {
Map<ConnectorTableColumnKey, Optional<ConnectorTableColumnStats>> result = future.get();
connectorTableCachedStatistics.synchronous().putAll(result);
} else {
future.whenComplete((res, e) -> connectorTableCachedStatistics.synchronous().putAll(res));
}
} catch (Exception e) {
LOG.warn("Failed to refresh getConnectorTableStatistics", e);
}
}

@Override
public ColumnStatistic getColumnStatistic(Table table, String column) {
Preconditions.checkState(table != null);
Expand Down Expand Up @@ -336,46 +389,6 @@ public List<ColumnStatistic> getColumnStatistics(Table table, List<String> colum
}
}

@Override
public List<ColumnStatistic> getColumnStatisticsSync(Table table, List<String> columns) {
Preconditions.checkState(table != null);

// get Statistics Table column info, just return default column statistics
if (StatisticUtils.statisticTableBlackListCheck(table.getId())) {
return getDefaultColumnStatisticList(columns);
}

if (!StatisticUtils.checkStatisticTableStateNormal()) {
return getDefaultColumnStatisticList(columns);
}

List<ColumnStatsCacheKey> cacheKeys = new ArrayList<>();
long tableId = table.getId();
for (String column : columns) {
cacheKeys.add(new ColumnStatsCacheKey(tableId, column));
}

try {
Map<ColumnStatsCacheKey, Optional<ColumnStatistic>> result =
columnStatistics.synchronous().getAll(cacheKeys);
List<ColumnStatistic> columnStatistics = new ArrayList<>();

for (String column : columns) {
Optional<ColumnStatistic> columnStatistic =
result.getOrDefault(new ColumnStatsCacheKey(tableId, column), Optional.empty());
if (columnStatistic.isPresent()) {
columnStatistics.add(columnStatistic.get());
} else {
columnStatistics.add(ColumnStatistic.unknown());
}
}
return columnStatistics;
} catch (Exception e) {
LOG.warn("Get column statistic fail, message : " + e.getMessage());
return getDefaultColumnStatisticList(columns);
}
}

/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ default Map<Long, Optional<Long>> getTableStatistics(Long tableId, Collection<Pa
return partitions.stream().collect(Collectors.toMap(Partition::getId, p -> Optional.empty()));
}

default void refreshTableStatistic(Table table) {
default void refreshTableStatistic(Table table, boolean isSync) {
}

default void refreshTableStatisticSync(Table table) {
default void refreshColumnStatistics(Table table, List<String> columns, boolean isSync) {
}

/**
Expand All @@ -59,10 +59,6 @@ default Map<Long, List<ColumnStatistic>> getColumnStatisticsOfPartitionLevel(Tab
return null;
}

default List<ColumnStatistic> getColumnStatisticsSync(Table table, List<String> columns) {
return getColumnStatistics(table, columns);
}

default List<ConnectorTableColumnStats> getConnectorTableStatistics(Table table, List<String> columns) {
return columns.stream().
map(col -> ConnectorTableColumnStats.unknown()).collect(Collectors.toList());
Expand Down Expand Up @@ -97,6 +93,9 @@ default void expireTableAndColumnStatistics(Table table, List<String> columns) {
default void expireConnectorTableColumnStatistics(Table table, List<String> columns) {
}

default void refreshConnectorTableColumnStatistics(Table table, List<String> columns, boolean isSync) {
}

default void expireConnectorHistogramStatistics(Table table, List<String> columns) {
}

Expand Down
19 changes: 6 additions & 13 deletions fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,30 +285,23 @@ public void refreshBasicStatisticsCache(Long dbId, Long tableId, List<String> co
return;
}

GlobalStateMgr.getCurrentState().getStatisticStorage().expireTableAndColumnStatistics(table, columns);
if (async) {
GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatistic(table);
GlobalStateMgr.getCurrentState().getStatisticStorage().getColumnStatistics(table, columns);
GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatistic(table, false);
GlobalStateMgr.getCurrentState().getStatisticStorage().refreshColumnStatistics(table, columns, false);
} else {
GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatisticSync(table);
GlobalStateMgr.getCurrentState().getStatisticStorage().getColumnStatisticsSync(table, columns);
GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatistic(table, true);
GlobalStateMgr.getCurrentState().getStatisticStorage().refreshColumnStatistics(table, columns, true);
}
}

public void refreshConnectorTableBasicStatisticsCache(String catalogName, String dbName, String tableName,
List<String> columns, boolean async) {

Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(catalogName, dbName, tableName);
if (table == null) {
return;
}

GlobalStateMgr.getCurrentState().getStatisticStorage().expireConnectorTableColumnStatistics(table, columns);
if (async) {
GlobalStateMgr.getCurrentState().getStatisticStorage().getConnectorTableStatistics(table, columns);
} else {
GlobalStateMgr.getCurrentState().getStatisticStorage().getConnectorTableStatisticsSync(table, columns);
}
GlobalStateMgr.getCurrentState().getStatisticStorage()
.refreshConnectorTableColumnStatistics(table, columns, async);
}

public void replayRemoveBasicStatsMeta(BasicStatsMeta basicStatsMeta) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.connector.statistics.ConnectorTableColumnStats;
import com.starrocks.journal.JournalEntity;
import com.starrocks.persist.OperationType;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.common.MetaUtils;
import com.starrocks.sql.optimizer.statistics.CachedStatisticStorage;
import com.starrocks.sql.optimizer.statistics.ColumnStatistic;
import com.starrocks.sql.plan.ConnectorPlanTestBase;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.InsertTxnCommitAttachment;
import com.starrocks.transaction.TransactionState;
import com.starrocks.utframe.UtFrameUtils;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
Expand Down Expand Up @@ -68,31 +65,10 @@ public static void teardown() throws Exception {
@Test
public void testRefreshConnectorTableBasicStatisticsCache(@Mocked CachedStatisticStorage cachedStatisticStorage) {
Table table = connectContext.getGlobalStateMgr().getMetadataMgr().getTable("hive0", "partitioned_db", "t1");
new Expectations() {
{
cachedStatisticStorage.getConnectorTableStatistics(table, ImmutableList.of("c1", "c2"));
result = ImmutableList.of(
new ConnectorTableColumnStats(new ColumnStatistic(0, 10, 0, 20, 5), 5, ""),
new ConnectorTableColumnStats(new ColumnStatistic(0, 100, 0, 200, 50), 50, "")
);
minTimes = 1;
}
};

AnalyzeMgr analyzeMgr = new AnalyzeMgr();
analyzeMgr.refreshConnectorTableBasicStatisticsCache("hive0", "partitioned_db", "t1",
ImmutableList.of("c1", "c2"), true);

new Expectations() {
{
cachedStatisticStorage.getConnectorTableStatisticsSync(table, ImmutableList.of("c1", "c2"));
result = ImmutableList.of(
new ConnectorTableColumnStats(new ColumnStatistic(0, 10, 0, 20, 5), 5, ""),
new ConnectorTableColumnStats(new ColumnStatistic(0, 100, 0, 200, 50), 50, "")
);
minTimes = 1;
}
};
analyzeMgr.refreshConnectorTableBasicStatisticsCache("hive0", "partitioned_db", "t1",
ImmutableList.of("c1", "c2"), false);

Expand Down

0 comments on commit a979bef

Please sign in to comment.