diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CachedStatisticStorage.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CachedStatisticStorage.java index 6abe203e66a02..7f46f14dc43bf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CachedStatisticStorage.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/CachedStatisticStorage.java @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - package com.starrocks.sql.optimizer.statistics; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; @@ -128,17 +127,21 @@ public Map> getTableStatistics(Long tableId, Collection statsCacheKeyList = new ArrayList<>(); for (Partition partition : table.getPartitions()) { statsCacheKeyList.add(new TableStatsCacheKey(table.getId(), partition.getId())); } try { - CompletableFuture>> completableFuture - = tableStatsCache.getAll(statsCacheKeyList); - if (completableFuture.isDone()) { - completableFuture.get(); + TableStatsCacheLoader loader = new TableStatsCacheLoader(); + CompletableFuture>> future = loader.asyncLoadAll(statsCacheKeyList, + statsCacheRefresherExecutor); + if (isSync) { + Map> result = future.get(); + tableStatsCache.synchronous().putAll(result); + } else { + future.whenComplete((result, e) -> tableStatsCache.synchronous().putAll(result)); } } catch (InterruptedException e) { LOG.warn("Failed to execute refreshTableStatistic", e); @@ -149,13 +152,34 @@ public void refreshTableStatistic(Table table) { } @Override - public void refreshTableStatisticSync(Table table) { - List statsCacheKeyList = new ArrayList<>(); - for (Partition partition : table.getPartitions()) { - statsCacheKeyList.add(new TableStatsCacheKey(table.getId(), partition.getId())); + public void refreshColumnStatistics(Table table, List columns, boolean isSync) { + Preconditions.checkState(table != null); + + // get Statistics Table column info, just return default column statistics + if (StatisticUtils.statisticTableBlackListCheck(table.getId()) || + !StatisticUtils.checkStatisticTableStateNormal()) { + return; } - tableStatsCache.synchronous().getAll(statsCacheKeyList); + List cacheKeys = new ArrayList<>(); + long tableId = table.getId(); + for (String column : columns) { + cacheKeys.add(new ColumnStatsCacheKey(tableId, column)); + } + + try { + ColumnBasicStatsCacheLoader loader = new ColumnBasicStatsCacheLoader(); + CompletableFuture>> future = + loader.asyncLoadAll(cacheKeys, statsCacheRefresherExecutor); + if (isSync) { + Map> result = future.get(); + columnStatistics.synchronous().putAll(result); + } else { + future.whenComplete((res, e) -> columnStatistics.synchronous().putAll(res)); + } + } catch (Exception e) { + LOG.warn("Failed to refresh getColumnStatistics", e); + } } @Override @@ -198,9 +222,11 @@ public List getConnectorTableStatistics(Table table, realResult = result.get(); for (String column : columns) { Optional columnStatistic = - realResult.getOrDefault(new ConnectorTableColumnKey(table.getUUID(), column), Optional.empty()); + realResult.getOrDefault(new ConnectorTableColumnKey(table.getUUID(), column), + Optional.empty()); if (columnStatistic.isPresent()) { - columnStatistics.add(StatisticsUtils.estimateColumnStatistics(table, column, columnStatistic.get())); + columnStatistics.add( + StatisticsUtils.estimateColumnStatistics(table, column, columnStatistic.get())); } else { columnStatistics.add(ConnectorTableColumnStats.unknown()); } @@ -262,6 +288,33 @@ public void expireConnectorTableColumnStatistics(Table table, List colum connectorTableCachedStatistics.synchronous().invalidateAll(allKeys); } + @Override + public void refreshConnectorTableColumnStatistics(Table table, List columns, boolean isSync) { + Preconditions.checkState(table != null); + if (!StatisticUtils.checkStatisticTableStateNormal()) { + return; + } + + List cacheKeys = new ArrayList<>(); + for (String column : columns) { + cacheKeys.add(new ConnectorTableColumnKey(table.getUUID(), column)); + } + + try { + ConnectorColumnStatsCacheLoader loader = new ConnectorColumnStatsCacheLoader(); + CompletableFuture>> future = + loader.asyncLoadAll(cacheKeys, statsCacheRefresherExecutor); + if (isSync) { + Map> result = future.get(); + connectorTableCachedStatistics.synchronous().putAll(result); + } else { + future.whenComplete((res, e) -> connectorTableCachedStatistics.synchronous().putAll(res)); + } + } catch (Exception e) { + LOG.warn("Failed to refresh getConnectorTableStatistics", e); + } + } + @Override public ColumnStatistic getColumnStatistic(Table table, String column) { Preconditions.checkState(table != null); @@ -336,46 +389,6 @@ public List getColumnStatistics(Table table, List colum } } - @Override - public List getColumnStatisticsSync(Table table, List columns) { - Preconditions.checkState(table != null); - - // get Statistics Table column info, just return default column statistics - if (StatisticUtils.statisticTableBlackListCheck(table.getId())) { - return getDefaultColumnStatisticList(columns); - } - - if (!StatisticUtils.checkStatisticTableStateNormal()) { - return getDefaultColumnStatisticList(columns); - } - - List cacheKeys = new ArrayList<>(); - long tableId = table.getId(); - for (String column : columns) { - cacheKeys.add(new ColumnStatsCacheKey(tableId, column)); - } - - try { - Map> result = - columnStatistics.synchronous().getAll(cacheKeys); - List columnStatistics = new ArrayList<>(); - - for (String column : columns) { - Optional columnStatistic = - result.getOrDefault(new ColumnStatsCacheKey(tableId, column), Optional.empty()); - if (columnStatistic.isPresent()) { - columnStatistics.add(columnStatistic.get()); - } else { - columnStatistics.add(ColumnStatistic.unknown()); - } - } - return columnStatistics; - } catch (Exception e) { - LOG.warn("Get column statistic fail, message : " + e.getMessage()); - return getDefaultColumnStatisticList(columns); - } - } - /** * */ diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/StatisticStorage.java b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/StatisticStorage.java index a6d9897a5312b..5219327463233 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/StatisticStorage.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/optimizer/statistics/StatisticStorage.java @@ -32,10 +32,10 @@ default Map> getTableStatistics(Long tableId, Collection Optional.empty())); } - default void refreshTableStatistic(Table table) { + default void refreshTableStatistic(Table table, boolean isSync) { } - default void refreshTableStatisticSync(Table table) { + default void refreshColumnStatistics(Table table, List columns, boolean isSync) { } /** @@ -59,10 +59,6 @@ default Map> getColumnStatisticsOfPartitionLevel(Tab return null; } - default List getColumnStatisticsSync(Table table, List columns) { - return getColumnStatistics(table, columns); - } - default List getConnectorTableStatistics(Table table, List columns) { return columns.stream(). map(col -> ConnectorTableColumnStats.unknown()).collect(Collectors.toList()); @@ -97,6 +93,9 @@ default void expireTableAndColumnStatistics(Table table, List columns) { default void expireConnectorTableColumnStatistics(Table table, List columns) { } + default void refreshConnectorTableColumnStatistics(Table table, List columns, boolean isSync) { + } + default void expireConnectorHistogramStatistics(Table table, List columns) { } diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeMgr.java b/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeMgr.java index 078524a5dc113..6471208fc8413 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeMgr.java @@ -285,30 +285,23 @@ public void refreshBasicStatisticsCache(Long dbId, Long tableId, List co return; } - GlobalStateMgr.getCurrentState().getStatisticStorage().expireTableAndColumnStatistics(table, columns); if (async) { - GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatistic(table); - GlobalStateMgr.getCurrentState().getStatisticStorage().getColumnStatistics(table, columns); + GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatistic(table, false); + GlobalStateMgr.getCurrentState().getStatisticStorage().refreshColumnStatistics(table, columns, false); } else { - GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatisticSync(table); - GlobalStateMgr.getCurrentState().getStatisticStorage().getColumnStatisticsSync(table, columns); + GlobalStateMgr.getCurrentState().getStatisticStorage().refreshTableStatistic(table, true); + GlobalStateMgr.getCurrentState().getStatisticStorage().refreshColumnStatistics(table, columns, true); } } public void refreshConnectorTableBasicStatisticsCache(String catalogName, String dbName, String tableName, List columns, boolean async) { - Table table = GlobalStateMgr.getCurrentState().getMetadataMgr().getTable(catalogName, dbName, tableName); if (table == null) { return; } - - GlobalStateMgr.getCurrentState().getStatisticStorage().expireConnectorTableColumnStatistics(table, columns); - if (async) { - GlobalStateMgr.getCurrentState().getStatisticStorage().getConnectorTableStatistics(table, columns); - } else { - GlobalStateMgr.getCurrentState().getStatisticStorage().getConnectorTableStatisticsSync(table, columns); - } + GlobalStateMgr.getCurrentState().getStatisticStorage() + .refreshConnectorTableColumnStatistics(table, columns, async); } public void replayRemoveBasicStatsMeta(BasicStatsMeta basicStatsMeta) { diff --git a/fe/fe-core/src/test/java/com/starrocks/statistic/AnalyzeMgrTest.java b/fe/fe-core/src/test/java/com/starrocks/statistic/AnalyzeMgrTest.java index d40bc09cc8351..6f334c4a4b346 100644 --- a/fe/fe-core/src/test/java/com/starrocks/statistic/AnalyzeMgrTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/statistic/AnalyzeMgrTest.java @@ -20,20 +20,17 @@ import com.starrocks.analysis.TableName; import com.starrocks.catalog.Database; import com.starrocks.catalog.Table; -import com.starrocks.connector.statistics.ConnectorTableColumnStats; import com.starrocks.journal.JournalEntity; import com.starrocks.persist.OperationType; import com.starrocks.qe.ConnectContext; import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.common.MetaUtils; import com.starrocks.sql.optimizer.statistics.CachedStatisticStorage; -import com.starrocks.sql.optimizer.statistics.ColumnStatistic; import com.starrocks.sql.plan.ConnectorPlanTestBase; import com.starrocks.thrift.TUniqueId; import com.starrocks.transaction.InsertTxnCommitAttachment; import com.starrocks.transaction.TransactionState; import com.starrocks.utframe.UtFrameUtils; -import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; @@ -68,31 +65,10 @@ public static void teardown() throws Exception { @Test public void testRefreshConnectorTableBasicStatisticsCache(@Mocked CachedStatisticStorage cachedStatisticStorage) { Table table = connectContext.getGlobalStateMgr().getMetadataMgr().getTable("hive0", "partitioned_db", "t1"); - new Expectations() { - { - cachedStatisticStorage.getConnectorTableStatistics(table, ImmutableList.of("c1", "c2")); - result = ImmutableList.of( - new ConnectorTableColumnStats(new ColumnStatistic(0, 10, 0, 20, 5), 5, ""), - new ConnectorTableColumnStats(new ColumnStatistic(0, 100, 0, 200, 50), 50, "") - ); - minTimes = 1; - } - }; AnalyzeMgr analyzeMgr = new AnalyzeMgr(); analyzeMgr.refreshConnectorTableBasicStatisticsCache("hive0", "partitioned_db", "t1", ImmutableList.of("c1", "c2"), true); - - new Expectations() { - { - cachedStatisticStorage.getConnectorTableStatisticsSync(table, ImmutableList.of("c1", "c2")); - result = ImmutableList.of( - new ConnectorTableColumnStats(new ColumnStatistic(0, 10, 0, 20, 5), 5, ""), - new ConnectorTableColumnStats(new ColumnStatistic(0, 100, 0, 200, 50), 50, "") - ); - minTimes = 1; - } - }; analyzeMgr.refreshConnectorTableBasicStatisticsCache("hive0", "partitioned_db", "t1", ImmutableList.of("c1", "c2"), false);