-
Notifications
You must be signed in to change notification settings - Fork 502
feat: support trino engine #1923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@kaori-seasons This is great and really important work to open fluss to more engine. |
@polyzos Thank you for your interest, but I've been quite busy with work this week. Would you mind if I started this PR splitting process next week? |
|
@kaori-seasons Absolutely, it's nothing urgent.. It would just be easier to review smaller PRs |
|
Hi @kaori-seasons, we are very interested in this feature so thank you for driving this effort! I am curious about the way you are going to support the union reads in Trino: I see that in this draft you were going to implement a custom page source that will delegate reads to the corresponding lakehouse. However, I think that it will require quite a lot of effort, plus this will likely require re-implementing a lot of other features (like dynamic filters, partition pruning, etc) that are already present in existing lakehouse connectors. I was thinking to discuss a different approach. Let's say if Trino supported the union read on the API level, something along the following idea:
I think this change may benefit other connectors that try to offload storage to different sources as well (I can imagine building e.g. a postgres CDC to a lakehouse) Do you think this is a viable approach and it is worth discsussing it with Trino community? |
Hi @agoncharuk, I'm glad you're interested in this feature. I apologize for the late reply, as I was away on a business trip over the weekend. In general, I agree with your suggestion, but the reason I implemented the dynamic filter and partition pruning features is to support an adaptive strategy. This allows me to judge data freshness based on the actual business scenario. Here's a compromise I'm not sure if you'd be okay with:
What are your thoughts on this? |
|
These are some advantages of adaptive strategies: 1. Finer Control and OptimizationOur approach provides smarter read strategies through public enum UnionReadStrategy {
REAL_TIME_ONLY, // Read real-time data only
HISTORICAL_ONLY, // Read historical data only
UNION, // Union read
ADAPTIVE // Adaptive strategy
}This strategy can dynamically select the optimal read approach based on query characteristics, rather than simply combining two data sources. 2. Intelligent Analysis and Adaptive OptimizationOur implementation includes complex analysis logic that can perform adaptive optimization based on historical performance data: // Analyze query characteristics to determine optimal strategy
private StrategyAnalysisResult performStrategyAnalysis(FlussTableHandle tableHandle) {
// 1. Limit analysis
double limitBenefit = analyzeLimitBenefit(tableHandle);
// 2. Predicate analysis
PredicateAnalysisResult predicateAnalysis = analyzePredicates(tableHandle);
// 3. Column projection analysis
double projectionBenefit = analyzeProjectionBenefit(tableHandle);
// 4. Time boundary analysis
TimeBoundaryAnalysis timeAnalysis = analyzeTimeBoundary(tableHandle);
// 5. Data freshness analysis
double freshnessBenefit = analyzeDataFreshness(tableHandle);
// Calculate overall benefits
double realTimeBenefit = calculateRealTimeBenefit(
limitBenefit, predicateAnalysis, projectionBenefit, freshnessBenefit);
double historicalBenefit = calculateHistoricalBenefit(
limitBenefit, predicateAnalysis, projectionBenefit, timeAnalysis);
return new StrategyAnalysisResult(
realTimeBenefit,
historicalBenefit,
// ... other parameters
);
}3. Avoiding Data Duplication and Consistency IssuesThrough custom implementation, we can ensure that data duplication is avoided during union reads and handle consistency issues between real-time and historical data: // Check time boundaries to avoid data duplication
private TimeBoundaryAnalysis analyzeTimeBoundary(FlussTableHandle tableHandle) {
Optional<Long> syncBoundary = getLakehouseSyncTimeBoundary(tableHandle);
if (syncBoundary.isPresent()) {
// Ensure real-time reads don't include data already synced to Lakehouse
// Ensure historical reads don't include unsynced real-time data
return new TimeBoundaryAnalysis(syncBoundary.get(), true);
}
return new TimeBoundaryAnalysis(0L, false);
} |
|
Sure, I definitely agree that using an adaptive approach is good and makes sense. Also definitely agree on
Or do you envision scenarios when Fluss connector cannot know in advance whether historical data is needed or not at the planning time? |
|
@agoncharuk raised a very good point about how to leverage existing lakehouse connectors. @luoyuxia is also designing a mechanism for this. Maybe we can share or discuss it further here, or in a dedicated Slack channel. |
|
@agoncharuk Hello, I partially agree with your point. FlussLakehouseReader is necessary, but it should rely more on the existing Trino Lakehouse connector rather than reimplementing all functionality. This is where I partially agree with your point. However, for the aspect of split parallel processing, our implementation can better utilize the built-in parallelism mechanism of the Lakehouse storage format. In FlussSplit, the parallel unit is implemented based on TableBucket. In FlussSplitManager, Split generation is based on the bucket distribution of the table. As can be seen from the code, the FlussSplit mechanism is based on table buckets and is suitable for parallel reading of real-time Fluss data, but it cannot fully utilize the fine-grained parallel capabilities of the Lakehouse storage format. Lakehouse storage formats (such as Parquet, Paimon, and Iceberg) have finer-grained parallel mechanisms that can perform parallel processing based on files, row groups, etc., thereby achieving higher resource utilization and better query performance. Lakehouse has two significant advantages: Strong dynamic adaptability:
|
|
@agoncharuk Regarding your point about "Fluss connectors not being able to know in advance whether historical data is needed during planning," I've thought about it for a long time. In reality, it's possible, but I understand your concerns. I've added a method to FlussMetadata that returns the table's time boundary information and Lakehouse configuration. This will allow the Trino planner to make more informed decisions during planning. 1. Indeed Determine Whether Historical Data Is Needed at Planning TimeFrom code implementation, we can see that we can make this determination at the planning stage: // In FlussUnionReadManager
public UnionReadStrategy determineStrategy(FlussTableHandle tableHandle) {
// 1. First check if Union Read is applicable
if (!isUnionReadApplicable(tableHandle)) {
return UnionReadStrategy.REAL_TIME_ONLY;
}
// 2. Check if only historical data is needed
if (isHistoricalOnlyQuery(tableHandle)) {
return UnionReadStrategy.HISTORICAL_ONLY;
}
// 3. Check if only real-time data is needed
if (isRealTimeOnlyQuery(tableHandle)) {
return UnionReadStrategy.REAL_TIME_ONLY;
}
// 4. Perform complex analysis to determine optimal strategy
return analyzeQueryForOptimalStrategy(tableHandle);
}2. Decision Basis Includes Multiple DimensionsWe can determine whether historical data is needed at the planning stage based on the following information: a) Table Configuration Information// Check if the table has Lakehouse configuration
Optional<String> lakehouseFormat = tableInfo.getTableDescriptor()
.getCustomProperties()
.map(props -> props.get("datalake.format"));b) Query Constraints// Analyze query predicates
private PredicateAnalysisResult analyzePredicates(FlussTableHandle tableHandle) {
// Analyze time-related predicates
// For example: WHERE date < '2024-01-01' may only need historical data
// For example: WHERE date > '2024-06-01' may only need real-time data
}c) Time Boundary Information// Get the time boundary between real-time and historical data
public Optional<Long> getTimeBoundary(FlussTableHandle tableHandle) {
// Get Lakehouse sync timestamp
Optional<String> syncTimestamp = tableInfo.getTableDescriptor()
.getCustomProperties()
.flatMap(props -> Optional.ofNullable(props.get("lakehouse.sync.timestamp")));
}d) Query Limit Conditions// Analyze LIMIT conditions
private double analyzeLimitBenefit(FlussTableHandle tableHandle) {
if (tableHandle.getLimit().isPresent()) {
long limit = tableHandle.getLimit().get();
// Small LIMIT may be better suited for real-time data (lower latency)
if (limit <= 100) {
return 0.9; // High benefit for real-time data
}
}
}3. Strategies for Handling "Cannot Know in Advance" SituationsFor situations where we truly cannot determine at planning time, we employ the following strategies: a) Default Strategy// When uncertain, default to UNION strategy
// This ensures data completeness
private UnionReadStrategy determineOptimalStrategy(FlussTableHandle tableHandle,
StrategyAnalysisResult analysis) {
double realTimeScore = analysis.getRealTimeBenefit();
double historicalScore = analysis.getHistoricalBenefit();
// If scores are very close, use UNION strategy to ensure data completeness
if (Math.abs(realTimeScore - historicalScore) < 0.1) {
return UnionReadStrategy.UNION;
}
// Otherwise, choose the strategy with the higher score
return realTimeScore > historicalScore ?
UnionReadStrategy.REAL_TIME_ONLY :
UnionReadStrategy.HISTORICAL_ONLY;
}b) Runtime Validation and Adjustment// Validate and adjust strategy during execution
private UnionReadStrategy validateAndAdjustStrategy(
FlussTableHandle tableHandle,
UnionReadStrategy strategy,
StrategyAnalysisResult analysis) {
// Verify if historical data is actually available
if (strategy == UnionReadStrategy.HISTORICAL_ONLY) {
if (!isHistoricalDataAvailable(tableHandle)) {
// If historical data is not available, switch to real-time data
return UnionReadStrategy.REAL_TIME_ONLY;
}
}
return strategy;
} |
|
@wuchong Great, I'm happy to discuss here or elsewhere @kaori-seasons Did you already investigate which parts of existing connectors can be reused? I am not sure to which extent existing lakehouse connector parts can be reused. Do you folks think that we have an agreement that representing Fluss union read with |
Hello, I'm currently researching which pushdown rules can be reused in the Trino connector, and I'm waiting for the work to be completed. I'll share a Google document for discussion. |
Purpose
Related to issue-1810
Brief change log
Tests
API and Format
Documentation