Skip to content

Conversation

@kaori-seasons
Copy link

Purpose

Related to issue-1810

Brief change log

Tests

API and Format

Documentation

@kaori-seasons kaori-seasons marked this pull request as ready for review November 3, 2025 03:20
@polyzos
Copy link
Contributor

polyzos commented Nov 3, 2025

@kaori-seasons This is great and really important work to open fluss to more engine.
Do you think it would be feasible to create an epic and break this work into smaller PRs so its easier to review and merge?

@kaori-seasons
Copy link
Author

@kaori-seasons This is great and really important work to open fluss to more engine. Do you think it would be feasible to create an epic and break this work into smaller PRs so its easier to review and merge?

@polyzos Thank you for your interest, but I've been quite busy with work this week. Would you mind if I started this PR splitting process next week?

@polyzos
Copy link
Contributor

polyzos commented Nov 3, 2025

@kaori-seasons Absolutely, it's nothing urgent.. It would just be easier to review smaller PRs

@agoncharuk
Copy link

Hi @kaori-seasons, we are very interested in this feature so thank you for driving this effort!

I am curious about the way you are going to support the union reads in Trino: I see that in this draft you were going to implement a custom page source that will delegate reads to the corresponding lakehouse. However, I think that it will require quite a lot of effort, plus this will likely require re-implementing a lot of other features (like dynamic filters, partition pruning, etc) that are already present in existing lakehouse connectors. I was thinking to discuss a different approach. Let's say if Trino supported the union read on the API level, something along the following idea:

  1. The Metadata interface would have another method splitForUnionRead which would return a Trino table path + table version (snapshot ID) that should be used for the union read
  2. When this result is returned to Trino planner, it would replace the table scan with a Union node, which has the fluss table scan as one leg, and lakehouse table scan as another leg. A plugin cannot instantiate a TableHandle of another plugin directly, but this can be implemented inside a Trino planner rule
  3. Trino will use all existing optimizations for the corresponding connector: filters pushdown, partition pruning, etc and it will handle the interaction with data lakes - no other steps are required from the Fluss connector

I think this change may benefit other connectors that try to offload storage to different sources as well (I can imagine building e.g. a postgres CDC to a lakehouse)

Do you think this is a viable approach and it is worth discsussing it with Trino community?

@kaori-seasons
Copy link
Author

kaori-seasons commented Nov 10, 2025

Hi @kaori-seasons, we are very interested in this feature so thank you for driving this effort!

I am curious about the way you are going to support the union reads in Trino: I see that in this draft you were going to implement a custom page source that will delegate reads to the corresponding lakehouse. However, I think that it will require quite a lot of effort, plus this will likely require re-implementing a lot of other features (like dynamic filters, partition pruning, etc) that are already present in existing lakehouse connectors. I was thinking to discuss a different approach. Let's say if Trino supported the union read on the API level, something along the following idea:

  1. The Metadata interface would have another method splitForUnionRead which would return a Trino table path + table version (snapshot ID) that should be used for the union read
  2. When this result is returned to Trino planner, it would replace the table scan with a Union node, which has the fluss table scan as one leg, and lakehouse table scan as another leg. A plugin cannot instantiate a TableHandle of another plugin directly, but this can be implemented inside a Trino planner rule
  3. Trino will use all existing optimizations for the corresponding connector: filters pushdown, partition pruning, etc and it will handle the interaction with data lakes - no other steps are required from the Fluss connector

I think this change may benefit other connectors that try to offload storage to different sources as well (I can imagine building e.g. a postgres CDC to a lakehouse)

Do you think this is a viable approach and it is worth discsussing it with Trino community?

Hi @agoncharuk, I'm glad you're interested in this feature. I apologize for the late reply, as I was away on a business trip over the weekend. In general, I agree with your suggestion, but the reason I implemented the dynamic filter and partition pruning features is to support an adaptive strategy. This allows me to judge data freshness based on the actual business scenario. Here's a compromise I'm not sure if you'd be okay with:

  1. Retain the advantages of the existing implementation: Keep the intelligent strategy selection and adaptive optimization features.

  2. Enhance integration with the Trino planner: Add a suggestion method to the Metadata interface, but still retain our optimization logic.


// A similar method can be added to FlussMetadata
public interface FlussMetadata {
    // Existing methods...

   // Add a new method to support union reads at the Trino level
    default Optional<UnionReadInfo> splitForUnionRead(String tableName) {
       // Returns the Trino table path + table version (snapshot ID)
       // This allows the Trino planner to create Union nodes
        return Optional.empty();
    }
}

What are your thoughts on this?

@kaori-seasons
Copy link
Author

These are some advantages of adaptive strategies:

1. Finer Control and Optimization

Our approach provides smarter read strategies through FlussUnionReadManager:

public enum UnionReadStrategy {
    REAL_TIME_ONLY,     // Read real-time data only
    HISTORICAL_ONLY,    // Read historical data only
    UNION,              // Union read
    ADAPTIVE            // Adaptive strategy
}

This strategy can dynamically select the optimal read approach based on query characteristics, rather than simply combining two data sources.

2. Intelligent Analysis and Adaptive Optimization

Our implementation includes complex analysis logic that can perform adaptive optimization based on historical performance data:

// Analyze query characteristics to determine optimal strategy
private StrategyAnalysisResult performStrategyAnalysis(FlussTableHandle tableHandle) {
    // 1. Limit analysis
    double limitBenefit = analyzeLimitBenefit(tableHandle);
    
    // 2. Predicate analysis
    PredicateAnalysisResult predicateAnalysis = analyzePredicates(tableHandle);
    
    // 3. Column projection analysis
    double projectionBenefit = analyzeProjectionBenefit(tableHandle);
    
    // 4. Time boundary analysis
    TimeBoundaryAnalysis timeAnalysis = analyzeTimeBoundary(tableHandle);
    
    // 5. Data freshness analysis
    double freshnessBenefit = analyzeDataFreshness(tableHandle);
    
    // Calculate overall benefits
    double realTimeBenefit = calculateRealTimeBenefit(
            limitBenefit, predicateAnalysis, projectionBenefit, freshnessBenefit);
    
    double historicalBenefit = calculateHistoricalBenefit(
            limitBenefit, predicateAnalysis, projectionBenefit, timeAnalysis);
    
    return new StrategyAnalysisResult(
            realTimeBenefit,
            historicalBenefit,
            // ... other parameters
    );
}

3. Avoiding Data Duplication and Consistency Issues

Through custom implementation, we can ensure that data duplication is avoided during union reads and handle consistency issues between real-time and historical data:

// Check time boundaries to avoid data duplication
private TimeBoundaryAnalysis analyzeTimeBoundary(FlussTableHandle tableHandle) {
    Optional<Long> syncBoundary = getLakehouseSyncTimeBoundary(tableHandle);
    
    if (syncBoundary.isPresent()) {
        // Ensure real-time reads don't include data already synced to Lakehouse
        // Ensure historical reads don't include unsynced real-time data
        return new TimeBoundaryAnalysis(syncBoundary.get(), true);
    }
    
    return new TimeBoundaryAnalysis(0L, false);
}

@agoncharuk
Copy link

Sure, I definitely agree that using an adaptive approach is good and makes sense. Also definitely agree on TimeBoundaryAnalysis you mentioned - the read should be split into two non-overlapping datasets based on Fluss tiering metadata.
The only thing I contest is whether FlussLakehouseReader is needed, because once we understand that we need historical data:

  • In your FlussLakehouseReader you mention that for e.g. Iceberg you would need to instantiate an Iceberg catalog, list splits, do some filtering based on partition column, etc. This would need to be implemented for each data lake implementation inside FlussLakehouseReader, but instead can be delegated to Trino lakehouse connectors
  • If I understand correctly, the basic unit of parallelism (as per FlussSplit) is a table bucket, needed to preserve ordering guarantees. Once we get to the lakehouse, we likely read more data and can increase parallelism based on e.g. parquet row groups, which is also already implemented in Trino lakehouse connectors.

Or do you envision scenarios when Fluss connector cannot know in advance whether historical data is needed or not at the planning time?

@wuchong
Copy link
Member

wuchong commented Nov 10, 2025

@agoncharuk raised a very good point about how to leverage existing lakehouse connectors. @luoyuxia is also designing a mechanism for this. Maybe we can share or discuss it further here, or in a dedicated Slack channel.

@kaori-seasons
Copy link
Author

@agoncharuk Hello, I partially agree with your point. FlussLakehouseReader is necessary, but it should rely more on the existing Trino Lakehouse connector rather than reimplementing all functionality.

This is where I partially agree with your point. However, for the aspect of split parallel processing, our implementation can better utilize the built-in parallelism mechanism of the Lakehouse storage format.

In FlussSplit, the parallel unit is implemented based on TableBucket. In FlussSplitManager, Split generation is based on the bucket distribution of the table.

// FlussSplitManager.java
// Get bucket count from table descriptor
TableDescriptor tableDescriptor = tableInfo.getTableDescriptor();
Optional<Integer> bucketCount = tableDescriptor.getDistribution().getBucketCount();

if (bucketCount.isPresent()) {
    int numBuckets = bucketCount.get();
    // 为每个桶创建一个Split
    for (int bucketId : prunedBuckets) {
        if (bucketId >= 0 && bucketId < numBuckets) {
            TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), bucketId);
            FlussSplit split = new FlussSplit(tablePath, tableBucket, addresses);
            splits.add(split);
        }
    }
}

As can be seen from the code, the FlussSplit mechanism is based on table buckets and is suitable for parallel reading of real-time Fluss data, but it cannot fully utilize the fine-grained parallel capabilities of the Lakehouse storage format. Lakehouse storage formats (such as Parquet, Paimon, and Iceberg) have finer-grained parallel mechanisms that can perform parallel processing based on files, row groups, etc., thereby achieving higher resource utilization and better query performance.

Lakehouse has two significant advantages: Strong dynamic adaptability:

  • Lakehouse can dynamically adjust its parallelism based on file size and data distribution. FlussSplit's parallelism is fixed, determined by the number of buckets in the table.
  • Higher resource utilization: Lakehouse can dynamically adjust its parallelism during queries based on data volume and computing resources. FlussSplit's parallelism is fixed at table creation, which may not fully utilize computing resources.

@wuchong @luoyuxia Do you have any suggestions on this?

@kaori-seasons
Copy link
Author

kaori-seasons commented Nov 11, 2025

@agoncharuk Regarding your point about "Fluss connectors not being able to know in advance whether historical data is needed during planning," I've thought about it for a long time. In reality, it's possible, but I understand your concerns. I've added a method to FlussMetadata that returns the table's time boundary information and Lakehouse configuration. This will allow the Trino planner to make more informed decisions during planning.

// Add methods to FlussMetadata to support Trino-level union reads
public class FlussMetadata implements ConnectorMetadata {
  // Existing methods...

  //Add a new method to support Trino-level union reads
    public Optional<UnionReadInfo> getUnionReadInfo(String tableName) {
   
        return Optional.empty();
    }
}

1. Indeed Determine Whether Historical Data Is Needed at Planning Time

From code implementation, we can see that we can make this determination at the planning stage:

// In FlussUnionReadManager
public UnionReadStrategy determineStrategy(FlussTableHandle tableHandle) {
    // 1. First check if Union Read is applicable
    if (!isUnionReadApplicable(tableHandle)) {
        return UnionReadStrategy.REAL_TIME_ONLY;
    }

    // 2. Check if only historical data is needed
    if (isHistoricalOnlyQuery(tableHandle)) {
        return UnionReadStrategy.HISTORICAL_ONLY;
    }
    
    // 3. Check if only real-time data is needed
    if (isRealTimeOnlyQuery(tableHandle)) {
        return UnionReadStrategy.REAL_TIME_ONLY;
    }
    
    // 4. Perform complex analysis to determine optimal strategy
    return analyzeQueryForOptimalStrategy(tableHandle);
}

2. Decision Basis Includes Multiple Dimensions

We can determine whether historical data is needed at the planning stage based on the following information:

a) Table Configuration Information

// Check if the table has Lakehouse configuration
Optional<String> lakehouseFormat = tableInfo.getTableDescriptor()
        .getCustomProperties()
        .map(props -> props.get("datalake.format"));

b) Query Constraints

// Analyze query predicates
private PredicateAnalysisResult analyzePredicates(FlussTableHandle tableHandle) {
    // Analyze time-related predicates
    // For example: WHERE date < '2024-01-01' may only need historical data
    // For example: WHERE date > '2024-06-01' may only need real-time data
}

c) Time Boundary Information

// Get the time boundary between real-time and historical data
public Optional<Long> getTimeBoundary(FlussTableHandle tableHandle) {
    // Get Lakehouse sync timestamp
    Optional<String> syncTimestamp = tableInfo.getTableDescriptor()
            .getCustomProperties()
            .flatMap(props -> Optional.ofNullable(props.get("lakehouse.sync.timestamp")));
}

d) Query Limit Conditions

// Analyze LIMIT conditions
private double analyzeLimitBenefit(FlussTableHandle tableHandle) {
    if (tableHandle.getLimit().isPresent()) {
        long limit = tableHandle.getLimit().get();
        // Small LIMIT may be better suited for real-time data (lower latency)
        if (limit <= 100) {
            return 0.9; // High benefit for real-time data
        }
    }
}

3. Strategies for Handling "Cannot Know in Advance" Situations

For situations where we truly cannot determine at planning time, we employ the following strategies:

a) Default Strategy

// When uncertain, default to UNION strategy
// This ensures data completeness
private UnionReadStrategy determineOptimalStrategy(FlussTableHandle tableHandle, 
                                                 StrategyAnalysisResult analysis) {
    double realTimeScore = analysis.getRealTimeBenefit();
    double historicalScore = analysis.getHistoricalBenefit();
    
    // If scores are very close, use UNION strategy to ensure data completeness
    if (Math.abs(realTimeScore - historicalScore) < 0.1) {
        return UnionReadStrategy.UNION;
    }
    
    // Otherwise, choose the strategy with the higher score
    return realTimeScore > historicalScore ? 
        UnionReadStrategy.REAL_TIME_ONLY : 
        UnionReadStrategy.HISTORICAL_ONLY;
}

b) Runtime Validation and Adjustment

// Validate and adjust strategy during execution
private UnionReadStrategy validateAndAdjustStrategy(
        FlussTableHandle tableHandle,
        UnionReadStrategy strategy,
        StrategyAnalysisResult analysis) {
    
    // Verify if historical data is actually available
    if (strategy == UnionReadStrategy.HISTORICAL_ONLY) {
        if (!isHistoricalDataAvailable(tableHandle)) {
            // If historical data is not available, switch to real-time data
            return UnionReadStrategy.REAL_TIME_ONLY;
        }
    }
    
    return strategy;
}

@agoncharuk
Copy link

@wuchong Great, I'm happy to discuss here or elsewhere

@kaori-seasons Did you already investigate which parts of existing connectors can be reused? I am not sure to which extent existing lakehouse connector parts can be reused.

Do you folks think that we have an agreement that representing Fluss union read with UNION operator to another connector is at least an option and we can discuss this with Trino community?

@kaori-seasons
Copy link
Author

@wuchong Great, I'm happy to discuss here or elsewhere

@kaori-seasons Did you already investigate which parts of existing connectors can be reused? I am not sure to which extent existing lakehouse connector parts can be reused.

Do you folks think that we have an agreement that representing Fluss union read with UNION operator to another connector is at least an option and we can discuss this with Trino community?

Hello, I'm currently researching which pushdown rules can be reused in the Trino connector, and I'm waiting for the work to be completed. I'll share a Google document for discussion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants