Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](planner) Fix sample partition table #25912

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
3
  • Loading branch information
xinyiZzz committed Oct 26, 2023
commit 00c3e6065973f4c748be77918cffa941685b41fe
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla
scanNode = new HiveScanNode(fileScan.translatePlanNodeId(), tupleDescriptor, false);
HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
if (fileScan.getTableSample().isPresent()) {
hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
}
hiveScanNode.setTableSample(new TableSample(fileScan.getTableSample().get().isPercent,
fileScan.getTableSample().get().sampleValue, fileScan.getTableSample().get().seek));
break;
default:
throw new RuntimeException("do not support DLA type " + ((HMSExternalTable) table).getDlaType());
Expand Down Expand Up @@ -624,12 +622,8 @@ public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTransla
BaseTableRef tableRef = new BaseTableRef(ref, olapTable, tableName);
tupleDescriptor.setRef(tableRef);
olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds());
if (olapScan.getTableSample().isPresent()) {
olapScanNode.setTableSample(new TableSample(olapScan.getTableSample().get().isPercent,
olapScan.getTableSample().get().sampleValue, olapScan.getTableSample().get().seek));
olapScanNode.computeSampleTabletIds();
}
olapScanNode.setTableSample(new TableSample(olapScan.getTableSample().get().isPercent,
olapScan.getTableSample().get().sampleValue, olapScan.getTableSample().get().seek));

// TODO: remove this switch?
switch (olapScan.getTable().getKeysType()) {
Expand Down
123 changes: 67 additions & 56 deletions fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -220,7 +221,7 @@ public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
public void setIsPreAggregation(boolean isPreAggregation, String reason) {
this.isPreAggregation = isPreAggregation;
this.reasonOfPreAggregation = this.reasonOfPreAggregation == null ? reason :
this.reasonOfPreAggregation + " " + reason;
this.reasonOfPreAggregation + " " + reason;
}


Expand Down Expand Up @@ -402,7 +403,8 @@ public void updateScanRangeInfoByNewMVSelector(long selectedIndexId,
String scanRangeInfo = stringBuilder.toString();
String situation;
boolean update;
CHECK: { // CHECKSTYLE IGNORE THIS LINE
CHECK:
{ // CHECKSTYLE IGNORE THIS LINE
if (olapTable.getKeysType() == KeysType.DUP_KEYS || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
situation = "The key type of table is duplicate, or unique key with merge-on-write.";
Expand Down Expand Up @@ -545,7 +547,6 @@ public void init(Analyzer analyzer) throws UserException {
computePartitionInfo();
}
computeTupleState(analyzer);
computeSampleTabletIds();

/**
* Compute InAccurate cardinality before mv selector and tablet pruning.
Expand Down Expand Up @@ -778,7 +779,7 @@ private void addScanRangeLocations(Partition partition,
// but it means we will do 3 S3 IO to get the data which will bring 3 slow query
if (-1L != coolDownReplicaId) {
final Optional<Replica> replicaOptional = replicas.stream()
.filter(r -> r.getId() == coolDownReplicaId).findAny();
.filter(r -> r.getId() == coolDownReplicaId).findAny();
replicaOptional.ifPresent(
r -> {
Backend backend = Env.getCurrentSystemInfo()
Expand Down Expand Up @@ -930,75 +931,84 @@ public void setOutputColumnUniqueIds(Set<Integer> outputColumnUniqueIds) {
}

/**
* First, determine how many rows to sample from each partition according to the number of partitions.
* Then determine the number of Tablets to be selected for each partition according to the average number
* of rows of Tablet,
* If seek is not specified, the specified number of Tablets are pseudo-randomly selected from each partition.
* If seek is specified, it will be selected sequentially from the seek tablet of the partition.
* And add the manually specified Tablet id to the selected Tablet.
* simpleTabletNums = simpleRows / partitionNums / (partitionRows / partitionTabletNums)
* Sample some tablets in the selected partition.
* If Seek is specified, the tablets sampled each time are the same.
*/
public void computeSampleTabletIds() {
if (tableSample == null) {
return;
}
OlapTable olapTable = (OlapTable) desc.getTable();
long sampleRows; // The total number of sample rows
long hitRows = 1; // The total number of rows hit by the tablet
long totalRows = 0; // The total number of partition rows hit
long totalTablet = 0; // The total number of tablets in the hit partition

// 1. Calculate the total number of rows in the selected partition, and sort partition list.
long selectedRows = 0;
long totalSampleRows = 0;
List<Long> selectedPartitionList = new ArrayList<>();
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
selectedRows += selectedTable.getRowCount();
selectedPartitionList.add(partitionId);
}
selectedPartitionList.sort(Comparator.naturalOrder());

// 2.Sampling is not required in some cases, will not take effect after clear sampleTabletIds.
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(olapTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
if (tableSample.getSampleValue() == 100) {
sampleTabletIds.clear();
return;
}
totalSampleRows = (long) Math.max(selectedRows * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = Math.max(tableSample.getSampleValue(), 1);
if (tableSample.getSampleValue() > selectedRows) {
sampleTabletIds.clear();
return;
}
totalSampleRows = tableSample.getSampleValue();
}

// calculate the number of tablets by each partition
long avgRowsPerPartition = sampleRows / Math.max(olapTable.getPartitions().size(), 1);

for (Partition p : olapTable.getPartitions()) {
List<Long> ids = p.getBaseIndex().getTabletIdsInOrder();

if (ids.isEmpty()) {
// 3. Sampling partition. If Seek is specified, the partition will be the same for each sampling.
long hitRows = 0; // The number of rows hit by the tablet
long partitionSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * selectedPartitionIds.size());
for (int i = 0; i < selectedPartitionList.size(); i++) {
int seekPid = (int) ((i + partitionSeek) % selectedPartitionList.size());
final Partition partition = olapTable.getPartition(selectedPartitionList.get(seekPid));
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
List<Tablet> tablets = selectedTable.getTablets();
if (tablets.isEmpty()) {
continue;
}

// Skip partitions with row count < row count / 2 expected to be sampled per partition.
// It can be expected to sample a smaller number of partitions to avoid uneven distribution
// of sampling results.
if (p.getBaseIndex().getRowCount() < (avgRowsPerPartition / 2)) {
continue;
// 4. Calculate the number of rows that need to be sampled in the current partition.
long sampleRows = 0; // The number of sample rows in partition
if (tableSample.isPercent()) {
sampleRows = (long) Math.max(selectedTable.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
} else {
sampleRows = (long) Math.max(
tableSample.getSampleValue() * (selectedTable.getRowCount() / selectedRows), 1);
}

// It is assumed here that all tablets row count is uniformly distributed
// TODO Use `p.getBaseIndex().getTablet(n).getRowCount()` to get each tablet row count to compute sample.
long avgRowsPerTablet = Math.max(p.getBaseIndex().getRowCount() / ids.size(), 1);
long tabletCounts = Math.max(
avgRowsPerPartition / avgRowsPerTablet + (avgRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
tabletCounts = Math.min(tabletCounts, ids.size());

long seek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * ids.size());
for (int i = 0; i < tabletCounts; i++) {
int seekTid = (int) ((i + seek) % ids.size());
sampleTabletIds.add(ids.get(seekTid));
// 5. Sampling tablets. If Seek is specified, the same tablet will be sampled each time.
long tabletSeek = tableSample.getSeek() != -1
? tableSample.getSeek() : (long) (new SecureRandom().nextDouble() * tablets.size());
for (int j = 0; j < tablets.size(); j++) {
int seekTid = (int) ((j + tabletSeek) % tablets.size());
if (tablets.get(seekTid).getRowCount(true) == 0) {
continue;
}
sampleTabletIds.add(tablets.get(seekTid).getId());
sampleRows -= tablets.get(seekTid).getRowCount(true);
hitRows += tablets.get(seekTid).getRowCount(true);
if (sampleRows <= 0) {
break;
}
}
if (hitRows > totalSampleRows) {
break;
}

hitRows += avgRowsPerTablet * tabletCounts;
totalRows += p.getBaseIndex().getRowCount();
totalTablet += ids.size();
}

// all hit, direct full
if (totalRows < sampleRows) {
// can't fill full sample rows
sampleTabletIds.clear();
} else if (sampleTabletIds.size() == totalTablet) {
// TODO add limit
sampleTabletIds.clear();
} else if (!sampleTabletIds.isEmpty()) {
// TODO add limit
}
LOG.debug("after computeSampleTabletIds, hitRows {}, selectedRows {}", hitRows, selectedRows);
}

public boolean isFromPrepareStmt() {
Expand All @@ -1024,6 +1034,7 @@ private void computeTabletInfo() throws UserException {
*/
Preconditions.checkState(scanBackendIds.size() == 0);
Preconditions.checkState(scanTabletIds.size() == 0);
computeSampleTabletIds();
for (Long partitionId : selectedPartitionIds) {
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
Expand Down