Skip to content

[Logical Table] Support logical tables in MSE.#15773

Merged
yashmayya merged 17 commits intoapache:masterfrom
vrajat:rv-table-route-info-mse
May 30, 2025
Merged

[Logical Table] Support logical tables in MSE.#15773
yashmayya merged 17 commits intoapache:masterfrom
vrajat:rv-table-route-info-mse

Conversation

@vrajat
Copy link
Contributor

@vrajat vrajat commented May 13, 2025

This PR adds support to execute queries on logical tables in MSE. It uses classes introduced in #15634.

The high-level workflow is:

  • PinotCatalog also exposes logical tables as tables.
  • DispatchPlanVisitor checks if the TableScanNode points to a logical table. If yes, adds LogicalTableRouteInfo to DispatchablePlanMetadata.
  • WorkerManager creates a workerId -> {tableType -> {tableName -> segments}}. This is different from physical tables which require workerId -> {tableType -> segments}
  • ServerPlanRequestUtils uses this map to create InstanceRequest with the field for TableSegmentsInfo.

The last step triggers SSE engine to process segments of all physical tables in the logical table.

Closes #15749

@vrajat vrajat changed the title [Logical Table] Support logical tables in SSE. [Logical Table] Support logical tables in MSE. May 13, 2025
@codecov-commenter
Copy link

codecov-commenter commented May 14, 2025

Codecov Report

❌ Patch coverage is 32.26837% with 212 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.39%. Comparing base (1a476de) to head (fe4009b).
⚠️ Report is 1305 commits behind head on master.

Files with missing lines Patch % Lines
.../org/apache/pinot/query/routing/WorkerManager.java 3.57% 52 Missing and 2 partials ⚠️
...ry/runtime/plan/server/ServerPlanRequestUtils.java 25.00% 51 Missing and 3 partials ⚠️
...t/controller/api/resources/PinotQueryResource.java 0.00% 30 Missing ⚠️
...server/starter/helix/HelixInstanceDataManager.java 0.00% 23 Missing ⚠️
...e/pinot/core/data/manager/LogicalTableContext.java 0.00% 10 Missing ⚠️
...org/apache/pinot/query/routing/WorkerMetadata.java 33.33% 7 Missing and 1 partial ⚠️
...uery/planner/physical/DispatchablePlanVisitor.java 30.00% 6 Missing and 1 partial ⚠️
...a/org/apache/pinot/query/catalog/PinotCatalog.java 0.00% 5 Missing and 1 partial ⚠️
...ery/planner/physical/DispatchablePlanMetadata.java 33.33% 4 Missing ⚠️
...query/routing/table/LogicalTableRouteProvider.java 91.11% 2 Missing and 2 partials ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15773      +/-   ##
============================================
+ Coverage     62.90%   63.39%   +0.49%     
+ Complexity     1386     1354      -32     
============================================
  Files          2867     2895      +28     
  Lines        163354   166319    +2965     
  Branches      24952    25434     +482     
============================================
+ Hits         102755   105446    +2691     
- Misses        52847    52884      +37     
- Partials       7752     7989     +237     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.35% <32.26%> (+0.48%) ⬆️
java-21 63.37% <32.26%> (+0.55%) ⬆️
skip-bytebuffers-false ?
skip-bytebuffers-true ?
temurin 63.39% <32.26%> (+0.49%) ⬆️
unittests 63.39% <32.26%> (+0.49%) ⬆️
unittests1 56.53% <39.29%> (+0.71%) ⬆️
unittests2 33.36% <1.59%> (-0.21%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vrajat vrajat force-pushed the rv-table-route-info-mse branch from 2284ed0 to fd1749e Compare May 15, 2025 10:52
@vrajat vrajat marked this pull request as ready for review May 15, 2025 10:57
@vrajat vrajat requested review from Jackie-Jiang and Copilot May 15, 2025 13:24
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for executing queries on logical tables in MSE by introducing a new logical table manager, updating routing and plan request utilities, and adjusting the query catalog and integration tests to recognize logical tables.

  • Introduces a LogicalTableManager API and related accessor in InstanceDataManager.
  • Updates plan request construction and routing providers (both logical and physical) to support logical table segments.
  • Enhances integration tests and catalog methods to account for logical tables.

Reviewed Changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java Adds new API method to obtain a LogicalTableManager based on logical table configurations.
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java Implements new branch logic for constructing instance requests targeting logical tables.
pinot-query-planner/... Updates routing providers (physical and logical) and metadata classes to support logical table routing.
pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java Adjusts table look-up to include logical table names.
pinot-integration-tests/.../BaseLogicalTableIntegrationTest.java Modifies tests to exercise queries for both logical and physical table engines.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableManager.java Introduces the new LogicalTableManager class encapsulating configs and schema for logical table support.
Comments suppressed due to low confidence (1)

pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:142

  • The scanned table appears to be added twice in the TableScanNode visitor (once before and again after resolving as a logical table). Consider reviewing whether both calls to addScannedTable are needed to avoid potential duplication.
dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName()));

@vrajat vrajat requested a review from yashmayya May 16, 2025 06:24
@vrajat vrajat force-pushed the rv-table-route-info-mse branch from aa56cd2 to 9859377 Compare May 19, 2025 13:53
@vrajat vrajat marked this pull request as draft May 19, 2025 13:54
@vrajat
Copy link
Contributor Author

vrajat commented May 19, 2025

This PR is in draft mode as it depends on #15776

@gortiz
Copy link
Contributor

gortiz commented May 19, 2025

I didn't read the code yet but.. Given the design document says:

Conceptually, a logical table is similar to a specific definition of a VIEW in relational databases.

Couldn't we model them as views in MSE?

@vrajat
Copy link
Contributor Author

vrajat commented May 19, 2025

I didn't read the code yet but.. Given the design document says:

Conceptually, a logical table is similar to a specific definition of a VIEW in relational databases.

Couldn't we model them as views in MSE?

Copy-pasting from an internal slack message:

In MSE, I explored using views. However since table scans and bulk of logical table code is in LeafStageOperator and SSE, it was easier to change query planning in MSE to carry metadata for logical tables and then setup SSE executor appropriately. One data point that this is the right approach is that no changes to operators was required.

@vrajat vrajat force-pushed the rv-table-route-info-mse branch from d7fa480 to 7a12b36 Compare May 20, 2025 06:42
@vrajat vrajat marked this pull request as ready for review May 20, 2025 06:45
public String getOfflineTableName() {
return hasOffline() && _logicalTable != null ? TableNameBuilder.OFFLINE.tableNameWithType(
_logicalTable.getTableName()) : null;
return hasOffline() && _logicalTableName != null ? TableNameBuilder.OFFLINE.tableNameWithType(_logicalTableName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can _logicalTableName be null?

should it be set during the init or constructor? Its too basic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if a table name that does not exist is provided. I have kept the same semantics as ImplicitHybridTableRouteInfo where the table config is checked for null to see if it exists. The route provider returns a TableRouteInfo always instead of a null or an exception. A TableRouteInfo is returned to keep the logic same in BaseSingleStageRequestHandler

@abhishekbafna
Copy link
Collaborator

Both

protected boolean hasExceededQPSQuota(@Nullable String database, Set<String> tableNames,
and
protected TableAuthorizationResult hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames,
takes a set of table. How are we setting the for the MSE?

@vrajat
Copy link
Contributor Author

vrajat commented May 20, 2025

How are we setting the for the MSE?

MSE creates a set and passes it to these functions here https://github.com/apache/pinot/blob/master/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java#L403

@abhishekbafna
Copy link
Collaborator

How are we setting the for the MSE?

MSE creates a set and passes it to these functions here https://github.com/apache/pinot/blob/master/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java#L403

This is passing the tables to the auth and quota checks. But how is the set created and set in the first place? I tried to trace it in the code and could not.

Also, in the query quota PR I have modified the input for the query quota from set physical table to logical table as logical table has its own quota. Link: https://github.com/apache/pinot/pull/15839/files#diff-06e2bf5ea89ac7b1f67fa90f3d3e9bbb8c61cae03c20093fd18bdc977f949134L409

@vrajat
Copy link
Contributor Author

vrajat commented May 20, 2025

This is passing the tables to the auth and quota checks. But how is the set created and set in the first place? I tried to trace it in the code and could not.

It gets the list from the parsed query. This will work for query quota as the quota is checked against the logical table name. However more work is required re: auth. That is tracked in #15784

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please double check the code path for logical table is completely separate from the existing one to ensure backward compatibility

import org.apache.pinot.spi.data.Schema;


public class LogicalTableManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really a manager. You may call it LogicalTableConfigs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename it to LogicalTableContext since there is already an important class with LogicalTableConfig and a plural version will be very confusing.

public LogicalTableConfig getLogicalTableConfig() {
return _logicalTableConfig;
}
public Schema getLogicalTableSchema() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) Add some whitespace between methods

private static List<InstanceRequest> constructLogicalTableServerQueryRequests(
OpChainExecutionContext executionContext, PinotQuery pinotQuery, InstanceDataManager instanceDataManager) {
StageMetadata stageMetadata = executionContext.getStageMetadata();
String logicalTableName = TableNameBuilder.extractRawTableName(stageMetadata.getTableName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can logical table name ever have type suffix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy/paste error :)

/**
* Returns the logical table config and schema for the given logical table name.
*/
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider directly throw exception when some config is not available.
Please also add a TODO to remove this method because we should never read ZK at query path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a TODO.

re: exception, an exception is thrown using Preconditions.

    LogicalTableContext logicalTableContext = instanceDataManager.getLogicalTableContext(logicalTableName);
    Preconditions.checkNotNull(logicalTableContext,
        "LogicalTableManager not found for logical table name: " + logicalTableName);

This is similar to all the other gets from InstanceDataManager. Is this pattern OK or should I throw a TableNotFoundException or similar ?

dispatchablePlanMetadata.getWorkerIdToServerInstanceMap();
Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap =
dispatchablePlanMetadata.getWorkerIdToSegmentsMap();
Map<Integer, DispatchablePlanMetadata.TableTypeTableNameToSegmentsMap> workerIdToTableNameSegmentsMap =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a pre-check that this one cannot co-exist with workerIdToSegmentsMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* what do each of the string keys store, we define two classes:
* {@link TableTypeToSegmentsMap} and {@link TableTypeTableNameToSegmentsMap} to help read code more easily.
*/
public static class TableTypeToSegmentsMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest just keeping the plain map. Each class introduced is adding overhead on heap. Putting some javadoc explaining what each level stands for should be good enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the map can be optimized into Pairs to reduce overhead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish Java had a typedef like language construct. I can change it now since the code works. FYI: this data structure caused the most amount of delay in making this feature work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pair can work in general. However serialization, deserialization using jackson is proving to be difficult. FasterXML/jackson-databind#1156

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh! - Simplified the data structure to Map<String, List<String>>. I realised that the data structure has physical table name AND table type. So now it is a map of physicalTableName -> segments.

I did this AFTER spending a lot of time thinking about jackson map serialization and other related issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to clarify, both workerIdToSegmentsMap (physical tables only?) and workerIdToTableSegmentsMap (logical tables only?) are Map<Integer, Map<String, List<String>>> but in the former, the inner map string key is simply OFFLINE or REALTIME and in the latter it's gonna be a physical table name with type?

private List<TableRouteInfo> _offlineTables;
private List<TableRouteInfo> _realtimeTables;
private String _logicalTableName;
private List<ImplicitHybridTableRouteInfo> _offlineTables;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why each physical table is wrapped as an implicit hybrid table? We should have an abstraction for single physical table

Copy link
Contributor Author

@vrajat vrajat May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried that. IMO it didnt add much value. Specifically, ImplicitHybridTableRouteInfo has the following structure:

if (hasOffline()) {
   ...
} 

if (hasRealtime()) {
  ...
}

if (hasBoth()) {
  ... do some extra work ...
}   

A PhysicalTableRouteInfo & PhysicalTableRouteInfo will have:

if (isOffline()) {
   ...
} else if (isRealtime()) {
  ...
}

One if condition is saved. However the code for ImplicitHybridTable and PhysicalHybridTable is exactly the same. So there will be

  • duplicate code

OR

  • ImplicitHybridTable should consist of 2 PhysicalHybridTable.

The second option will make it hard to guarantee backward compatibility because I cannot copy/paste code from BaseSingleStageRequestHandler to ImplicitHybridTableRouteProvider.

@vrajat
Copy link
Contributor Author

vrajat commented May 21, 2025

Please double check the code path for logical table is completely separate from the existing one to ensure backward compatibility

Yes it is. If conditions in:

  • PinotCatalog
  • DispatchablePlanVisitor
  • WorkerManager
  • ServerPlanRequestUtils

Ensure that code flow branches to logic specific to logical tables. Other than trivial code refactoring to help with code reuse, the current MSE code in not modified.

@vrajat vrajat force-pushed the rv-table-route-info-mse branch 3 times, most recently from c692410 to 2fbb727 Compare May 26, 2025 16:23
vrajat added 10 commits May 27, 2025 13:18
Checkpoint 2

Checkpoint 3

checkstyle

Improve how config is obtained in servers.

Reduce timeout for test more.

Checkstyle

Fix comment

License Header

Update pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

Add TODO for time boundary support.

Checkstyle

Reuse code in ServerPlanRequestUtils

Checkstyle
@vrajat vrajat force-pushed the rv-table-route-info-mse branch from 2fbb727 to 5ff3c3e Compare May 27, 2025 09:54
// TODO: LogicalTableContext has to be cached. https://github.com/apache/pinot/issues/15859
@Nullable
@Override
public LogicalTableContext getLogicalTableContext(String logicalTableName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a pretty major issue if each MSE query referencing a logical table requires 4 ZK calls? AFAIK we have been very careful about avoiding direct ZK access in any query path.

cc - @Jackie-Jiang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Tracked in #15859 and there is a draft PR. It is in draft because it depends on this PR.

if (logicalTableConfig.getRefOfflineTableName() != null) {
offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(),
logicalTableConfig.getRefOfflineTableName());
if (offlineTableConfig == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All logical tables are expected to have a reference offline table AND a reference realtime table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Some parts of query planning require configs that can only be set in physical tables. If there are no offline or realtime tables. then the corresponding reference table name will be null. There are validations for these conditions in CRUD APIs.

* what do each of the string keys store, we define two classes:
* {@link TableTypeToSegmentsMap} and {@link TableTypeTableNameToSegmentsMap} to help read code more easily.
*/
public static class TableTypeToSegmentsMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to clarify, both workerIdToSegmentsMap (physical tables only?) and workerIdToTableSegmentsMap (logical tables only?) are Map<Integer, Map<String, List<String>>> but in the former, the inner map string key is simply OFFLINE or REALTIME and in the latter it's gonna be a physical table name with type?

@Override
public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache,
RoutingManager routingManager) {
public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes related to support for logical tables in MSE or an unrelated refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is related. These APIs were over-fitted for SSE request handler logic where it has both table cache and routing manager. So only function was required.

In MSE, the logic is split. MSE checks table cache in one section and routing manager in another section. So the function had to be broken down. You will notice a similar break up in ImplicitHybridTableRouteProvider and LogicalTableRouteProvider

String tableType = offlineTableRouteInfoList.isEmpty() ? TableType.REALTIME.name() : TableType.OFFLINE.name();
if (tableType.equals(TableType.OFFLINE.name())) {
Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig());
String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to suffix the logical table name with OFFLINE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There cant be one broker request per table in the list of TableSegmentInfo because too many objects will be created. If that wasnt a constraint, then the broker request would point to a OFFLINE or REALTIME table each. I kept the same semantics that an offline or realtime broker request will at least point to the expected table type. OTOH I looked and could not find code that used the table name from the broker request.

@vrajat
Copy link
Contributor Author

vrajat commented May 29, 2025

So just to clarify, both workerIdToSegmentsMap (physical tables only?) and workerIdToTableSegmentsMap (logical tables only?) are Map<Integer, Map<String, List>> but in the former, the inner map string key is simply OFFLINE or REALTIME and in the latter it's gonna be a physical table name with type?

I am not able to answer inline. Your understanding is correct. For logical tables, both table name and table type is required. Both the pieces of information is available in a single string.

List<TableConfig> tableConfigList = getListTableConfigs(tableNames, database);
if (tableConfigList == null || tableConfigList.isEmpty()) {
List<LogicalTableConfig> logicalTableConfigList = getListLogicalTableConfigs(tableNames, database);
if ((tableConfigList == null || tableConfigList.isEmpty()) && logicalTableConfigList.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableConfigList can only be empty right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in getListTableConfig, the following if condition is triggered if no offline or realtime table is found for one of the items in the list. Thats why a null check is required. I have added @Nullable annotation to getListTableConfigs. I dont know the reason for this behaviour.

      if (tableConfigList.isEmpty()) {
        return null;
      }

Hmm - I should do the same for logical tables ? This code needs work. The code has to throw if no table or logical table config is found for a specific entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was more complicated than I imagined. However I have added tests in BaseLogicalTableIntegrationTest. Tell me if you are more comfortable if I remove the diff for controller submit. I can create a separate PR for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the additional tests. I think the logic looks fine, and I'm okay with merging it with the rest of the changes here.

vrajat and others added 2 commits May 30, 2025 15:10
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
@yashmayya yashmayya merged commit ba99dc4 into apache:master May 30, 2025
18 checks passed
songwdfu pushed a commit to songwdfu/pinot that referenced this pull request Jun 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Logical Table] Support Logical Tables in MSE

7 participants