Skip to content

Commit

Permalink
[fix](planner) Fix sample partition table #25912 (#26399)
Browse files Browse the repository at this point in the history
In the past, two conditions needed to be met when sampling a partitioned table: 1. Data is evenly distributed between partitions; 2. Data is evenly distributed between buckets. Finally, the number of sampled rows in each partition and each bucket is the same.

Now, sampling will be proportional to the number of partitioned and bucketed rows.
  • Loading branch information
xinyiZzz authored and xiaokang committed Nov 4, 2023
1 parent 2c8d4cd commit 3af9119
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 57 deletions.
130 changes: 75 additions & 55 deletions fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
Expand Down Expand Up @@ -94,9 +95,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -220,7 +223,7 @@ public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
public void setIsPreAggregation(boolean isPreAggregation, String reason) {
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? reason :
this.reasonOfPreAggregation + " " + reason;
this.reasonOfPreAggregation + " " + reason;
}


Expand Down Expand Up @@ -402,7 +405,8 @@ public void updateScanRangeInfoByNewMVSelector(long selectedIndexId,
String scanRangeInfo = stringBuilder.toString();
String situation;
boolean update;
CHECK: { // CHECKSTYLE IGNORE THIS LINE
CHECK:
{ // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key with merge-on-write.";
Expand Down Expand Up @@ -774,7 +778,7 @@ private void addScanRangeLocations(Partition partition,
// but it means we will do 3 S3 IO to get the data which will bring 3 slow query
if (-1L != coolDownReplicaId) {
final Optional<Replica> replicaOptional = replicas.stream()
.filter(r -> r.getId() == coolDownReplicaId).findAny();
.filter(r -> r.getId() == coolDownReplicaId).findAny();
replicaOptional.ifPresent(
r -> {
Backend backend = Env.getCurrentSystemInfo()
Expand Down Expand Up @@ -926,75 +930,91 @@ public void setOutputColumnUniqueIds(Set<Integer> outputColumnUniqueIds) {
}

/**
* First, determine how many rows to sample from each partition according to the number of partitions.
* Then determine the number of Tablets to be selected for each partition according to the average number
* of rows of Tablet,
* If seek is not specified, the specified number of Tablets are pseudo-randomly selected from each partition.
* If seek is specified, it will be selected sequentially from the seek tablet of the partition.
* And add the manually specified Tablet id to the selected Tablet.
* simpleTabletNums = simpleRows / partitionNums / (partitionRows / partitionTabletNums)
* Sample some tablets in the selected partition.
* If Seek is specified, the tablets sampled each time are the same.
*/
public void computeSampleTabletIds() {
if (tableSample == null) {
return;
}
OlapTable olapTable = (OlapTable) desc.getTable();
long sampleRows; // The total number of sample rows
long hitRows = 1; // The total number of rows hit by the tablet
long totalRows = 0; // The total number of partition rows hit
long totalTablet = 0; // The total number of tablets in the hit partition

// 1. Calculate the total number of rows in the selected partition, and sort partition list.
long selectedRows = 0;
long totalSampleRows = 0;
List<Long> selectedPartitionList = new ArrayList<>();
if (FeConstants.runningUnitTest && selectedIndexId == -1) {
selectedIndexId = olapTable.getBaseIndexId();
}
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
selectedRows += selectedTable.getRowCount();
selectedPartitionList.add(partitionId);
}
selectedPartitionList.sort(Comparator.naturalOrder());

// 2.Sampling is not required in some cases, will not take effect after clear sampleTabletIds.
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
if (tableSample.getSampleValue() >= 100) {
return;
}
totalSampleRows = (long) Math.max(selectedRows * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
if (tableSample.getSampleValue() > selectedRows) {
return;
}
totalSampleRows = tableSample.getSampleValue();
}

// calculate the number of tablets by each partition
long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1);

for (Partition p : olapTable.getPartitions()) {
List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();

if (ids.isEmpty()) {
// 3. Sampling partition. If Seek is specified, the partition will be the same for each sampling.
long hitRows = 0; // The number of rows hit by the tablet
long partitionSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * selectedPartitionIds.size());
for (int i = 0; i < selectedPartitionList.size(); i++) {
int seekPid = (int) ((i + partitionSeek) % selectedPartitionList.size());
final Partition partition = olapTable.getPartition(selectedPartitionList.get(seekPid));
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
List<Tablet> tablets = selectedTable.getTablets();
if (tablets.isEmpty()) {
continue;
}

// Skip partitions with row count < row count / 2 expected to be sampled per partition.
// It can be expected to sample a smaller number of partitions to avoid uneven distribution
// of sampling results.
if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
continue;
// 4. Calculate the number of rows that need to be sampled in the current partition.
long sampleRows = 0; // The number of sample rows in partition
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(selectedTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = (long) Math.max(
tableSample.getSampleValue() * (selectedTable.getRowCount() / selectedRows), 1);
}

// It is assumed here that all tablets row count is uniformly distributed
// TODO Use `p.getBaseIndex().getTablet(n).getRowCount()` to get each tablet row count to compute sample.
long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() / ids.size(), 1);
long tabletCounts = Math.max(
avgRowsPerPartition / avgRowsPerTablet + (avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
tabletCounts = Math.min(tabletCounts, ids.size());

long seek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (Math.random() * ids.size());
for (int i = 0; i < tabletCounts; i++) {
int seekTid = (int) ((i + seek) % ids.size());
sampleTabletIds.add(ids.get(seekTid));
// 5. Sampling tablets. If Seek is specified, the same tablet will be sampled each time.
long tabletSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * tablets.size());
for (int j = 0; j < tablets.size(); j++) {
int seekTid = (int) ((j + tabletSeek) % tablets.size());
long tabletRowCount;
if (!FeConstants.runningUnitTest) {
tabletRowCount = tablets.get(seekTid).getRowCount(true);
} else {
tabletRowCount = selectedTable.getRowCount() / tablets.size();
}
if (tabletRowCount == 0) {
continue;
}
sampleTabletIds.add(tablets.get(seekTid).getId());
sampleRows -= tabletRowCount;
hitRows += tabletRowCount;
if (sampleRows <= 0) {
break;
}
}
if (hitRows > totalSampleRows) {
break;
}

hitRows += avgRowsPerTablet * tabletCounts;
totalRows += p.getBaseIndex().getRowCount();
totalTablet += ids.size();
}

// all hit, direct full
if (totalRows < sampleRows) {
// can't fill full sample rows
sampleTabletIds.clear();
} else if (sampleTabletIds.size() == totalTablet) {
// TODO add limit
sampleTabletIds.clear();
} else if (!sampleTabletIds.isEmpty()) {
// TODO add limit
}
LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}", hitRows, selectedRows);
}

public boolean isFromPrepareStmt() {
Expand Down
115 changes: 113 additions & 2 deletions fe/fe-core/src/test/java/org/apache/doris/analysis/SelectStmtTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
Expand Down Expand Up @@ -861,7 +862,8 @@ public void testRandomBucketSelectTablet() throws Exception {
}

@Test
public void testSelectSampleTable() throws Exception {
public void testSelectSampleHashBucketTable() throws Exception {
FeConstants.runningUnitTest = true;
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("table1");
long tabletId = 10031L;
Expand Down Expand Up @@ -894,7 +896,7 @@ public void testSelectSampleTable() throws Exception {
String sql4 = "SELECT * FROM db1.table1 TABLESAMPLE(9500 ROWS)";
OriginalPlanner planner4 = (OriginalPlanner) dorisAssert.query(sql4).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds4 = ((OlapScanNode) planner4.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(0, sampleTabletIds4.size()); // no sample, all tablet
Assert.assertEquals(10, sampleTabletIds4.size());

String sql5 = "SELECT * FROM db1.table1 TABLESAMPLE(11000 ROWS)";
OriginalPlanner planner5 = (OriginalPlanner) dorisAssert.query(sql5).internalExecuteOneAndGetPlan();
Expand Down Expand Up @@ -963,8 +965,117 @@ public void testSelectSampleTable() throws Exception {
OriginalPlanner planner16 = (OriginalPlanner) dorisAssert.query(sql16).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds16 = ((OlapScanNode) planner16.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds16.size());
FeConstants.runningUnitTest = false;
}

@Test
public void testSelectSampleRandomBucketTable() throws Exception {
FeConstants.runningUnitTest = true;
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
OlapTable tbl = (OlapTable) db.getTableOrMetaException("table3");
long tabletId = 10031L;
for (Partition partition : tbl.getPartitions()) {
for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
mIndex.setRowCount(10000);
for (Tablet tablet : mIndex.getTablets()) {
tablet.setTabletId(tabletId);
tabletId += 1;
}
}
}

// 1. TABLESAMPLE ROWS
String sql1 = "SELECT * FROM db1.table3 TABLESAMPLE(10 ROWS)";
OriginalPlanner planner1 = (OriginalPlanner) dorisAssert.query(sql1).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds1 = ((OlapScanNode) planner1.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds1.size());

String sql2 = "SELECT * FROM db1.table3 TABLESAMPLE(1000 ROWS)";
OriginalPlanner planner2 = (OriginalPlanner) dorisAssert.query(sql2).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds2 = ((OlapScanNode) planner2.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds2.size());

String sql3 = "SELECT * FROM db1.table3 TABLESAMPLE(1001 ROWS)";
OriginalPlanner planner3 = (OriginalPlanner) dorisAssert.query(sql3).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds3 = ((OlapScanNode) planner3.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(2, sampleTabletIds3.size());

String sql4 = "SELECT * FROM db1.table3 TABLESAMPLE(9500 ROWS)";
OriginalPlanner planner4 = (OriginalPlanner) dorisAssert.query(sql4).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds4 = ((OlapScanNode) planner4.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(10, sampleTabletIds4.size());

String sql5 = "SELECT * FROM db1.table3 TABLESAMPLE(11000 ROWS)";
OriginalPlanner planner5 = (OriginalPlanner) dorisAssert.query(sql5).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds5 = ((OlapScanNode) planner5.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(0, sampleTabletIds5.size()); // no sample, all tablet

String sql6 = "SELECT * FROM db1.table3 TABLET(10033) TABLESAMPLE(900 ROWS)";
OriginalPlanner planner6 = (OriginalPlanner) dorisAssert.query(sql6).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds6 = ((OlapScanNode) planner6.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertTrue(sampleTabletIds6.size() >= 1 && sampleTabletIds6.size() <= 2);
Assert.assertTrue(sampleTabletIds6.contains(10033L));

// 2. TABLESAMPLE PERCENT
String sql7 = "SELECT * FROM db1.table3 TABLESAMPLE(10 PERCENT)";
OriginalPlanner planner7 = (OriginalPlanner) dorisAssert.query(sql7).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds7 = ((OlapScanNode) planner7.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds7.size());

String sql8 = "SELECT * FROM db1.table3 TABLESAMPLE(15 PERCENT)";
OriginalPlanner planner8 = (OriginalPlanner) dorisAssert.query(sql8).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds8 = ((OlapScanNode) planner8.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(2, sampleTabletIds8.size());

String sql9 = "SELECT * FROM db1.table3 TABLESAMPLE(100 PERCENT)";
OriginalPlanner planner9 = (OriginalPlanner) dorisAssert.query(sql9).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds9 = ((OlapScanNode) planner9.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(0, sampleTabletIds9.size());

String sql10 = "SELECT * FROM db1.table3 TABLESAMPLE(110 PERCENT)";
OriginalPlanner planner10 = (OriginalPlanner) dorisAssert.query(sql10).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds10 = ((OlapScanNode) planner10.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(0, sampleTabletIds10.size());

String sql11 = "SELECT * FROM db1.table3 TABLET(10033) TABLESAMPLE(5 PERCENT)";
OriginalPlanner planner11 = (OriginalPlanner) dorisAssert.query(sql11).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds11 = ((OlapScanNode) planner11.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertTrue(sampleTabletIds11.size() >= 1 && sampleTabletIds11.size() <= 2);
Assert.assertTrue(sampleTabletIds11.contains(10033L));

// 3. TABLESAMPLE REPEATABLE
String sql12 = "SELECT * FROM db1.table3 TABLESAMPLE(900 ROWS)";
OriginalPlanner planner12 = (OriginalPlanner) dorisAssert.query(sql12).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds12 = ((OlapScanNode) planner12.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds12.size());

String sql13 = "SELECT * FROM db1.table3 TABLESAMPLE(900 ROWS) REPEATABLE 2";
OriginalPlanner planner13 = (OriginalPlanner) dorisAssert.query(sql13).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds13 = ((OlapScanNode) planner13.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds13.size());
Assert.assertTrue(sampleTabletIds13.contains(10033L));

String sql14 = "SELECT * FROM db1.table3 TABLESAMPLE(900 ROWS) REPEATABLE 10";
OriginalPlanner planner14 = (OriginalPlanner) dorisAssert.query(sql14).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds14 = ((OlapScanNode) planner14.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds14.size());
Assert.assertTrue(sampleTabletIds14.contains(10031L));

String sql15 = "SELECT * FROM db1.table3 TABLESAMPLE(900 ROWS) REPEATABLE 0";
OriginalPlanner planner15 = (OriginalPlanner) dorisAssert.query(sql15).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds15 = ((OlapScanNode) planner15.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds15.size());
Assert.assertTrue(sampleTabletIds15.contains(10031L));

// 4. select returns 900 rows of results
String sql16 = "SELECT * FROM (SELECT * FROM db1.table3 TABLESAMPLE(900 ROWS) REPEATABLE 9999999 limit 900) t";
OriginalPlanner planner16 = (OriginalPlanner) dorisAssert.query(sql16).internalExecuteOneAndGetPlan();
Set<Long> sampleTabletIds16 = ((OlapScanNode) planner16.getScanNodes().get(0)).getSampleTabletIds();
Assert.assertEquals(1, sampleTabletIds16.size());
FeConstants.runningUnitTest = false;
}


@Test
public void testSelectExcept() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
Expand Down

0 comments on commit 3af9119

Please sign in to comment.