Skip to content

Conversation

YannByron
Copy link
Contributor

Purpose

Support to push down limit to accelerate query.

Linked issue: close #xxx

Tests

API and Format

Documentation

}

override def pushLimit(limit: Int): Boolean = {
if (table.isInstanceOf[AppendOnlyFileStoreTable]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to if append table? Paimon-core scan should take care about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, a boolean value is needed, which is used in SparkSQL. Or, paimon-core needs to provide an api to be called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just return false here? Best effort pushdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a temporary solution, it's ok.

this.projectedIndexes = Some(projected)
}

override def pushLimit(limit: Int): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark SQL will not push limit if there is filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. SparkSQL can guarantee this.


import scala.collection.JavaConverters._

class PaimonSQLPerformanceTest extends PaimonSparkTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this class for? It looks like we can not find performance result from these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's not a good class name. It aims to check whether some operations can work, like this SupportsPushDownLimit which can reduce the number of splits scanned.

@YannByron
Copy link
Contributor Author

These failed UTs are not related to this pr.

@YannByron YannByron force-pushed the master_limitpushdown branch from a5d6d48 to 65520d4 Compare November 28, 2023 09:48
@YannByron YannByron force-pushed the master_limitpushdown branch from 65520d4 to 6fcd11a Compare November 28, 2023 09:49
@YannByron
Copy link
Contributor Author

#2404

@YannByron YannByron closed this Nov 28, 2023
@YannByron YannByron reopened this Nov 28, 2023
@@ -199,6 +202,11 @@ public FileStoreScan withMetrics(ScanMetrics metrics) {
return this;
}

@Override
public FileStoreScan withLimit(int limit) {
Copy link
Contributor

@JingsongLi JingsongLi Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can limit implementation just in AppendOnlyFileStoreScan? It can override plan method to do post-limit.

@JingsongLi
Copy link
Contributor

Another solution: we can make this limit pushdown generic. We can do this limit in InnerTableScanImpl, and we can just check Split.convertToRawFiles, check the rowcount, because rawFiles don't need to be merged.

public void serialize(DataOutputView out) throws IOException {
out.writeUTF(path);
out.writeLong(offset);
out.writeLong(length);
out.writeUTF(format);
out.writeLong(schemaId);
out.writeLong(rowCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increment DataSplit.serialVersionUID too.


import scala.collection.mutable

class SparkScanBuilder(table: Table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a SparkBaseScanBuilder to reuse code?


import scala.collection.mutable

class SparkScanBuilder(table: Table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add class for spark-3.2 too? And add some itcase for spark-3.2 too (For ITCase, we also can have some abstraction to reduce code replication).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For UT, just copy codes for now. Let refine it in another pr.

long scannedRowCount = 0;

List<DataFileMeta> originalDataFiles = split.dataFiles();
List<RawFile> originalRawFiles = split.convertToRawFiles().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just get? Exception when it is empty?

limitedSplits.add(split);
scannedRowCount += splitRowCount;
} else {
DataSplit newSplit = composeDataSplit(split, pushDownLimit - scannedRowCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to introduce composeDataSplit.

The reason we introduce limit pushdown is to reduce split number. We don't need to reduce files in split.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 9a19973 into apache:master Nov 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants