Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
size_t raw_bytes_read = 0;
bool first_read = true;
int64_t limit = scanner->limit();
while (!eos && raw_bytes_read < raw_bytes_threshold) {
if (UNLIKELY(ctx->done())) {
eos = true;
Expand Down Expand Up @@ -319,6 +320,18 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->inc_block_usage(free_block->allocated_bytes());
scan_task->cached_blocks.push_back(std::move(free_block));
}

if (limit > 0 && limit < ctx->batch_size()) {
// If this scanner has limit, and less than batch size,
// return immediately and no need to wait raw_bytes_threshold.
// This can save time that each scanner may only return a small number of rows,
// but rows are enough from all scanners.
// If not break, the query like "select * from tbl where id=1 limit 10"
// may scan a lot data when the "id=1"'s filter ratio is high.
// If limit is larger than batch size, this rule is skipped,
// to avoid user specify a large limit and causing too much small blocks.
break;
}
} // end for while

if (UNLIKELY(!status.ok())) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class VScanner {
_query_statistics = query_statistics;
}

int64_t limit() const { return _limit; }

protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,6 @@ public String getExplainString(ExplainOptions explainOptions) {
return plan;
}

@Override
public boolean isBlockQuery() {
return true;
}

@Override
public DescriptorTable getDescTable() {
return descTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ public OriginalPlanner(Analyzer analyzer) {
this.analyzer = analyzer;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public PlannerContext getPlannerContext() {
return plannerContext;
}
Expand Down Expand Up @@ -274,17 +270,6 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue

if (queryStmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) queryStmt;
if (queryStmt.getSortInfo() != null || selectStmt.getAggInfo() != null) {
isBlockQuery = true;
if (LOG.isDebugEnabled()) {
LOG.debug("this is block query");
}
} else {
isBlockQuery = false;
if (LOG.isDebugEnabled()) {
LOG.debug("this isn't block query");
}
}
// Check SelectStatement if optimization condition satisfied
if (selectStmt.isPointQueryShortCircuit()) {
// Optimize for point query like: SELECT * FROM t1 WHERE pk1 = 1 and pk2 = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public abstract class Planner {

protected ArrayList<PlanFragment> fragments = Lists.newArrayList();

protected boolean isBlockQuery = false;

protected TQueryOptions queryOptions;

public abstract List<ScanNode> getScanNodes();
Expand Down Expand Up @@ -115,10 +113,6 @@ public List<PlanFragment> getFragments() {
return fragments;
}

public boolean isBlockQuery() {
return isBlockQuery;
}

public TQueryOptions getQueryOptions() {
return queryOptions;
}
Expand Down
36 changes: 18 additions & 18 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ public class Coordinator implements CoordInterface {
// same as backend_exec_states_.size() after Exec()
private final Set<TUniqueId> instanceIds = Sets.newHashSet();

private final boolean isBlockQuery;

private int numReceivedRows = 0;

private List<String> deltaUrls;
Expand Down Expand Up @@ -336,7 +334,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
this.scanNodes = planner.getScanNodes();
Expand Down Expand Up @@ -379,7 +376,6 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
// Constructor of Coordinator is too complicated.
public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance, boolean enableProfile) {
this.isBlockQuery = true;
this.jobId = jobId;
this.queryId = queryId;
this.descTable = descTable.toThrift();
Expand Down Expand Up @@ -1448,24 +1444,28 @@ public RowBatch getNext() throws Exception {
}
}

if (resultBatch.isEos()) {
this.returnedAllResults = true;

// if this query is a block query do not cancel.
Long numLimitRows = fragments.get(0).getPlanRoot().getLimit();
boolean hasLimit = numLimitRows > 0;
if (!isBlockQuery && instanceIds.size() > 1 && hasLimit && numReceivedRows >= numLimitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need cancel");
}
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit");
if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
if (LOG.isDebugEnabled()) {
LOG.debug("number received rows: {}, {}", numReceivedRows, DebugUtil.printId(queryId));
}
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
}

if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
if (resultBatch.isEos()) {
numReceivedRows = 0;
numReceivedRows += resultBatch.getQueryStatistics().getReturnedRows();
}
} else if (resultBatch.getBatch() != null) {
numReceivedRows += resultBatch.getBatch().getRowsSize();
}

Long limitRows = fragments.get(0).getPlanRoot().getLimit();
if (limitRows > 0 && numReceivedRows >= limitRows) {
if (LOG.isDebugEnabled()) {
LOG.debug("reach limit rows: {}, received rows: {}, cancel query, {}",
limitRows, numReceivedRows, DebugUtil.printId(queryId));
}
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, "reach limit");
resultBatch.setEos(true);
}

return resultBatch;
Expand Down