Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
3

fix

fix count

fix number backends

wait file filter
  • Loading branch information
morningman committed Dec 6, 2024
1 parent 33ee349 commit e7d15a1
Show file tree
Hide file tree
Showing 23 changed files with 225 additions and 102 deletions.
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ Status VFileScanner::_get_next_reader() {
_should_enable_file_meta_cache() ? ExecEnv::GetInstance()->file_meta_cache()
: nullptr,
_state->query_options().enable_parquet_lazy_mat);
parquet_reader->set_push_down_agg_type(_get_push_down_agg_type());
{
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
Expand Down Expand Up @@ -887,6 +888,7 @@ Status VFileScanner::_get_next_reader() {
_profile, _state, *_params, range, _state->query_options().batch_size,
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat,
unsupported_pushdown_types);
orc_reader->set_push_down_agg_type(_get_push_down_agg_type());
if (push_down_predicates) {
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.tablefunction.BackendsTableValuedFunction;
import org.apache.doris.tablefunction.LocalTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
Expand Down Expand Up @@ -119,8 +120,8 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
analyzeJoin(analyzer);
}

public ScanNode getScanNode(PlanNodeId id) {
return tableFunction.getScanNode(id, desc);
public ScanNode getScanNode(PlanNodeId id, SessionVariable sv) {
return tableFunction.getScanNode(id, desc, sv);
}

public TableValuedFunctionIf getTableFunction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
Expand Down Expand Up @@ -94,6 +95,9 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected String brokerName;

protected TableSnapshot tableSnapshot;
// Save the reference of session variable, so that we don't need to get it from connection context.
// connection context is a thread local variable, it is not available is running in other thread.
protected SessionVariable sessionVariable;

/**
* External file scan node for Query hms table
Expand All @@ -102,8 +106,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
StatisticalType statisticalType, boolean needCheckColumnPriv,
SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
this.sessionVariable = sv;
}

@Override
Expand Down Expand Up @@ -314,6 +320,7 @@ public void createScanRangeLocations() throws UserException {
params.setProperties(locationProperties);
}

int numBackends = backendPolicy.numBackends();
List<String> pathPartitionKeys = getPathPartitionKeys();
if (isBatchMode()) {
// File splits are generated lazily, and fetched by backends while scanning.
Expand Down Expand Up @@ -356,7 +363,7 @@ public void createScanRangeLocations() throws UserException {
scanBackendIds.add(backend.getId());
}
} else {
List<Split> inputSplits = getSplits();
List<Split> inputSplits = getSplits(numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
Expand Down Expand Up @@ -605,4 +612,9 @@ public TableSnapshot getQueryTableSnapshot() {
}
return this.tableSnapshot;
}

protected boolean noNeedSplitForCountPushDown() throws UserException {
TFileFormatType fileFormatType = getFileFormatType();
return fileFormatType == TFileFormatType.FORMAT_PARQUET || fileFormatType == TFileFormatType.FORMAT_ORC;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileCompressType;

import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.List;

public class FileSplitter {
private static final Logger LOG = LogManager.getLogger(FileSplitter.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public SplitAssignment(
}

public void init() throws UserException {
splitGenerator.startSplit();
splitGenerator.startSplit(backendPolicy.numBackends());
synchronized (assignLock) {
while (sampleSplit == null && waitFirstSplit()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@
public interface SplitGenerator {
/**
* Get all file splits if the producer doesn't support batch mode.
* @param numBackends the number of backends, this is useful when determine the number of splits.
*/
default List<Split> getSplits() throws UserException {
default List<Split> getSplits(int numBackends) throws UserException {
// todo: remove this interface if batch mode is stable
throw new NotImplementedException("Not implement");
}
Expand All @@ -51,7 +52,7 @@ default int numApproximateSplits() {
return -1;
}

default void startSplit() {
default void startSplit(int numBackends) {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TPushAggOp;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -98,15 +100,13 @@ public class HiveScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) {
this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, needCheckColumnPriv, sv);
}

public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType, boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
StatisticalType statisticalType, boolean needCheckColumnPriv, SessionVariable sv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, sv);
hmsTable = (HMSExternalTable) desc.getTable();
brokerName = hmsTable.getCatalog().bindBrokerName();
}
Expand Down Expand Up @@ -163,7 +163,7 @@ protected List<HivePartition> getPartitions() throws AnalysisException {
}

@Override
public List<Split> getSplits() throws UserException {
public List<Split> getSplits(int numBackends) throws UserException {
long start = System.currentTimeMillis();
try {
if (!partitionInit) {
Expand All @@ -174,7 +174,7 @@ public List<Split> getSplits() throws UserException {
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName);
getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName, numBackends);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
Expand All @@ -193,7 +193,7 @@ public List<Split> getSplits() throws UserException {
}

@Override
public void startSplit() {
public void startSplit(int numBackends) {
if (prunedPartitions.isEmpty()) {
splitAssignment.finishSchedule();
return;
Expand All @@ -214,12 +214,12 @@ public void startSplit() {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(
cache, Collections.singletonList(partition), allFiles, bindBrokerName);
cache, Collections.singletonList(partition), allFiles, bindBrokerName, numBackends);
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
splitAssignment.addToQueue(allFiles);
} catch (IOException e) {
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
splittersOnFlight.release();
Expand Down Expand Up @@ -263,7 +263,7 @@ public int numApproximateSplits() {
}

private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions,
List<Split> allFiles, String bindBrokerName) throws IOException {
List<Split> allFiles, String bindBrokerName, int numBackends) throws IOException, UserException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
Expand All @@ -276,11 +276,39 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
splitAllFiles(allFiles, hiveFileStatuses);
return;
}

/**
* If the push down aggregation operator is COUNT,
* we don't need to split the file because for parquet/orc format, only metadata is read.
* If we split the file, we will read metadata of a file multiple times, which is not efficient.
*
* - Hive Transactional Table may need merge on read, so do not apply this optimization.
* - If the file format is not parquet/orc, eg, text, we need to split the file to increase the parallelism.
*/
boolean needSplit = true;
if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT
&& hiveTransaction != null
&& noNeedSplitForCountPushDown()) {
int parallelNum = sessionVariable.getParallelExecInstanceNum();
int totalFileNum = 0;
for (FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() != null) {
totalFileNum += fileCacheValue.getFiles().size();
}
}
// If the number of files is larger than parallel instances * num of backends,
// we don't need to split the file.
// Otherwise, split the file to avoid local shuffle.
if (totalFileNum > parallelNum * numBackends) {
needSplit = false;
}
}
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
// set block size to Long.MAX_VALUE to avoid splitting the file.
allFiles.addAll(splitFile(status.getPath(), needSplit ? status.getBlockSize() : Long.MAX_VALUE,
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
isSplittable, fileCacheValue.getPartitionValues(),
new HiveSplitCreator(fileCacheValue.getAcidInfo())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ public class HudiScanNode extends HiveScanNode {
*/
public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv,
Optional<TableScanParams> scanParams, Optional<IncrementalRelation> incrementalRelation,
SessionVariable sessionVariable) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv);
SessionVariable sv) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv, sv);
isCowTable = hmsTable.isHoodieCowTable();
if (LOG.isDebugEnabled()) {
if (isCowTable) {
Expand Down Expand Up @@ -390,7 +390,7 @@ private void getPartitionsSplits(List<HivePartition> partitions, List<Split> spl
}

@Override
public List<Split> getSplits() throws UserException {
public List<Split> getSplits(int numBackends) throws UserException {
if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
return getIncrementalSplits();
}
Expand All @@ -406,7 +406,7 @@ public List<Split> getSplits() throws UserException {
}

@Override
public void startSplit() {
public void startSplit(int numBackends) {
if (prunedPartitions.isEmpty()) {
splitAssignment.finishSchedule();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
Expand Down Expand Up @@ -96,8 +96,8 @@ public class IcebergScanNode extends FileQueryScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv);
public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) {
super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv);

ExternalTable table = (ExternalTable) desc.getTable();
if (table instanceof HMSExternalTable) {
Expand Down Expand Up @@ -177,11 +177,12 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli
}

@Override
public List<Split> getSplits() throws UserException {
return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(), this::doGetSplits);
public List<Split> getSplits(int numBackends) throws UserException {
return HiveMetaStoreClientHelper.ugiDoAs(source.getCatalog().getConfiguration(),
() -> doGetSplits(numBackends));
}

private List<Split> doGetSplits() throws UserException {
private List<Split> doGetSplits(int numBackends) throws UserException {
TableScan scan = icebergTable.newScan();

// set snapshot
Expand Down Expand Up @@ -271,8 +272,8 @@ private List<Split> doGetSplits() throws UserException {
pushDownCount = true;
List<Split> pushDownCountSplits;
if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
int parallelNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
pushDownCountSplits = splits.subList(0, Math.min(splits.size(), parallelNum));
int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends;
pushDownCountSplits = splits.subList(0, Math.min(splits.size(), minSplits));
} else {
pushDownCountSplits = Collections.singletonList(splits.get(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
Expand Down Expand Up @@ -80,8 +81,8 @@ public class LakeSoulScanNode extends FileQueryScanNode {

String readType;

public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv);
public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv, SessionVariable sv) {
super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv, sv);
}

@Override
Expand Down Expand Up @@ -209,7 +210,7 @@ private void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulS
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}

public List<Split> getSplits() throws UserException {
public List<Split> getSplits(int numBackends) throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits with columnFilters={}", columnFilters);
LOG.debug("getSplits with columnNameToRange={}", columnNameToRange);
Expand Down
Loading

0 comments on commit e7d15a1

Please sign in to comment.