-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[core][spark] Supports to push down limit #2367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
} | ||
|
||
override def pushLimit(limit: Int): Boolean = { | ||
if (table.isInstanceOf[AppendOnlyFileStoreTable]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to if append table
? Paimon-core scan should take care about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, a boolean
value is needed, which is used in SparkSQL. Or, paimon-core needs to provide an api to be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just return false here? Best effort pushdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a temporary solution, it's ok.
this.projectedIndexes = Some(projected) | ||
} | ||
|
||
override def pushLimit(limit: Int): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Spark SQL will not push limit if there is filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. SparkSQL can guarantee this.
|
||
import scala.collection.JavaConverters._ | ||
|
||
class PaimonSQLPerformanceTest extends PaimonSparkTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What this class for? It looks like we can not find performance result from these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's not a good class name. It aims to check whether some operations can work, like this SupportsPushDownLimit
which can reduce the number of splits scanned.
These failed UTs are not related to this pr. |
a5d6d48
to
65520d4
Compare
65520d4
to
6fcd11a
Compare
@@ -199,6 +202,11 @@ public FileStoreScan withMetrics(ScanMetrics metrics) { | |||
return this; | |||
} | |||
|
|||
@Override | |||
public FileStoreScan withLimit(int limit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can limit implementation just in AppendOnlyFileStoreScan
? It can override plan method to do post-limit.
Another solution: we can make this limit pushdown generic. We can do this limit in |
public void serialize(DataOutputView out) throws IOException { | ||
out.writeUTF(path); | ||
out.writeLong(offset); | ||
out.writeLong(length); | ||
out.writeUTF(format); | ||
out.writeLong(schemaId); | ||
out.writeLong(rowCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increment DataSplit.serialVersionUID
too.
|
||
import scala.collection.mutable | ||
|
||
class SparkScanBuilder(table: Table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a SparkBaseScanBuilder
to reuse code?
|
||
import scala.collection.mutable | ||
|
||
class SparkScanBuilder(table: Table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to add class for spark-3.2 too? And add some itcase for spark-3.2 too (For ITCase, we also can have some abstraction to reduce code replication).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For UT, just copy codes for now. Let refine it in another pr.
long scannedRowCount = 0; | ||
|
||
List<DataFileMeta> originalDataFiles = split.dataFiles(); | ||
List<RawFile> originalRawFiles = split.convertToRawFiles().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just get? Exception when it is empty?
limitedSplits.add(split); | ||
scannedRowCount += splitRowCount; | ||
} else { | ||
DataSplit newSplit = composeDataSplit(split, pushDownLimit - scannedRowCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to introduce composeDataSplit
.
The reason we introduce limit pushdown is to reduce split number. We don't need to reduce files in split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose
Support to push down limit to accelerate query.
Linked issue: close #xxx
Tests
API and Format
Documentation