[Logical Table] Support logical tables in MSE.#15773
[Logical Table] Support logical tables in MSE.#15773yashmayya merged 17 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #15773 +/- ##
============================================
+ Coverage 62.90% 63.39% +0.49%
+ Complexity 1386 1354 -32
============================================
Files 2867 2895 +28
Lines 163354 166319 +2965
Branches 24952 25434 +482
============================================
+ Hits 102755 105446 +2691
- Misses 52847 52884 +37
- Partials 7752 7989 +237
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
2284ed0 to
fd1749e
Compare
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for executing queries on logical tables in MSE by introducing a new logical table manager, updating routing and plan request utilities, and adjusting the query catalog and integration tests to recognize logical tables.
- Introduces a LogicalTableManager API and related accessor in InstanceDataManager.
- Updates plan request construction and routing providers (both logical and physical) to support logical table segments.
- Enhances integration tests and catalog methods to account for logical tables.
Reviewed Changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java | Adds new API method to obtain a LogicalTableManager based on logical table configurations. |
| pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java | Implements new branch logic for constructing instance requests targeting logical tables. |
| pinot-query-planner/... | Updates routing providers (physical and logical) and metadata classes to support logical table routing. |
| pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java | Adjusts table look-up to include logical table names. |
| pinot-integration-tests/.../BaseLogicalTableIntegrationTest.java | Modifies tests to exercise queries for both logical and physical table engines. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableManager.java | Introduces the new LogicalTableManager class encapsulating configs and schema for logical table support. |
Comments suppressed due to low confidence (1)
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java:142
- The scanned table appears to be added twice in the TableScanNode visitor (once before and again after resolving as a logical table). Consider reviewing whether both calls to addScannedTable are needed to avoid potential duplication.
dispatchablePlanMetadata.addScannedTable(_tableCache.getActualTableName(node.getTableName()));
...runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
Outdated
Show resolved
Hide resolved
aa56cd2 to
9859377
Compare
|
This PR is in draft mode as it depends on #15776 |
|
I didn't read the code yet but.. Given the design document says:
Couldn't we model them as views in MSE? |
Copy-pasting from an internal slack message: In MSE, I explored using views. However since table scans and bulk of logical table code is in LeafStageOperator and SSE, it was easier to change query planning in MSE to carry metadata for logical tables and then setup SSE executor appropriately. One data point that this is the right approach is that no changes to operators was required. |
d7fa480 to
7a12b36
Compare
pinot-core/src/main/java/org/apache/pinot/core/data/manager/LogicalTableManager.java
Outdated
Show resolved
Hide resolved
...y-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
Outdated
Show resolved
Hide resolved
| public String getOfflineTableName() { | ||
| return hasOffline() && _logicalTable != null ? TableNameBuilder.OFFLINE.tableNameWithType( | ||
| _logicalTable.getTableName()) : null; | ||
| return hasOffline() && _logicalTableName != null ? TableNameBuilder.OFFLINE.tableNameWithType(_logicalTableName) |
There was a problem hiding this comment.
How can _logicalTableName be null?
should it be set during the init or constructor? Its too basic.
There was a problem hiding this comment.
This can happen if a table name that does not exist is provided. I have kept the same semantics as ImplicitHybridTableRouteInfo where the table config is checked for null to see if it exists. The route provider returns a TableRouteInfo always instead of a null or an exception. A TableRouteInfo is returned to keep the logic same in BaseSingleStageRequestHandler
|
Both |
MSE creates a set and passes it to these functions here https://github.com/apache/pinot/blob/master/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java#L403 |
This is passing the tables to the auth and quota checks. But how is the set created and set in the first place? I tried to trace it in the code and could not. Also, in the query quota PR I have modified the input for the query quota from set physical table to logical table as logical table has its own quota. Link: https://github.com/apache/pinot/pull/15839/files#diff-06e2bf5ea89ac7b1f67fa90f3d3e9bbb8c61cae03c20093fd18bdc977f949134L409 |
It gets the list from the parsed query. This will work for query quota as the quota is checked against the logical table name. However more work is required re: auth. That is tracked in #15784 |
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Please double check the code path for logical table is completely separate from the existing one to ensure backward compatibility
| import org.apache.pinot.spi.data.Schema; | ||
|
|
||
|
|
||
| public class LogicalTableManager { |
There was a problem hiding this comment.
This is not really a manager. You may call it LogicalTableConfigs
There was a problem hiding this comment.
I'll rename it to LogicalTableContext since there is already an important class with LogicalTableConfig and a plural version will be very confusing.
| public LogicalTableConfig getLogicalTableConfig() { | ||
| return _logicalTableConfig; | ||
| } | ||
| public Schema getLogicalTableSchema() { |
There was a problem hiding this comment.
(minor) Add some whitespace between methods
| private static List<InstanceRequest> constructLogicalTableServerQueryRequests( | ||
| OpChainExecutionContext executionContext, PinotQuery pinotQuery, InstanceDataManager instanceDataManager) { | ||
| StageMetadata stageMetadata = executionContext.getStageMetadata(); | ||
| String logicalTableName = TableNameBuilder.extractRawTableName(stageMetadata.getTableName()); |
There was a problem hiding this comment.
Can logical table name ever have type suffix?
| /** | ||
| * Returns the logical table config and schema for the given logical table name. | ||
| */ | ||
| @Nullable |
There was a problem hiding this comment.
Consider directly throw exception when some config is not available.
Please also add a TODO to remove this method because we should never read ZK at query path.
There was a problem hiding this comment.
I've added a TODO.
re: exception, an exception is thrown using Preconditions.
LogicalTableContext logicalTableContext = instanceDataManager.getLogicalTableContext(logicalTableName);
Preconditions.checkNotNull(logicalTableContext,
"LogicalTableManager not found for logical table name: " + logicalTableName);
This is similar to all the other gets from InstanceDataManager. Is this pattern OK or should I throw a TableNotFoundException or similar ?
| dispatchablePlanMetadata.getWorkerIdToServerInstanceMap(); | ||
| Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = | ||
| dispatchablePlanMetadata.getWorkerIdToSegmentsMap(); | ||
| Map<Integer, DispatchablePlanMetadata.TableTypeTableNameToSegmentsMap> workerIdToTableNameSegmentsMap = |
There was a problem hiding this comment.
Consider adding a pre-check that this one cannot co-exist with workerIdToSegmentsMap
| * what do each of the string keys store, we define two classes: | ||
| * {@link TableTypeToSegmentsMap} and {@link TableTypeTableNameToSegmentsMap} to help read code more easily. | ||
| */ | ||
| public static class TableTypeToSegmentsMap { |
There was a problem hiding this comment.
I'd suggest just keeping the plain map. Each class introduced is adding overhead on heap. Putting some javadoc explaining what each level stands for should be good enough.
There was a problem hiding this comment.
Some of the map can be optimized into Pairs to reduce overhead
There was a problem hiding this comment.
I wish Java had a typedef like language construct. I can change it now since the code works. FYI: this data structure caused the most amount of delay in making this feature work.
There was a problem hiding this comment.
Pair can work in general. However serialization, deserialization using jackson is proving to be difficult. FasterXML/jackson-databind#1156
There was a problem hiding this comment.
Doh! - Simplified the data structure to Map<String, List<String>>. I realised that the data structure has physical table name AND table type. So now it is a map of physicalTableName -> segments.
I did this AFTER spending a lot of time thinking about jackson map serialization and other related issues.
There was a problem hiding this comment.
So just to clarify, both workerIdToSegmentsMap (physical tables only?) and workerIdToTableSegmentsMap (logical tables only?) are Map<Integer, Map<String, List<String>>> but in the former, the inner map string key is simply OFFLINE or REALTIME and in the latter it's gonna be a physical table name with type?
| private List<TableRouteInfo> _offlineTables; | ||
| private List<TableRouteInfo> _realtimeTables; | ||
| private String _logicalTableName; | ||
| private List<ImplicitHybridTableRouteInfo> _offlineTables; |
There was a problem hiding this comment.
Why each physical table is wrapped as an implicit hybrid table? We should have an abstraction for single physical table
There was a problem hiding this comment.
I had tried that. IMO it didnt add much value. Specifically, ImplicitHybridTableRouteInfo has the following structure:
if (hasOffline()) {
...
}
if (hasRealtime()) {
...
}
if (hasBoth()) {
... do some extra work ...
}
A PhysicalTableRouteInfo & PhysicalTableRouteInfo will have:
if (isOffline()) {
...
} else if (isRealtime()) {
...
}
One if condition is saved. However the code for ImplicitHybridTable and PhysicalHybridTable is exactly the same. So there will be
- duplicate code
OR
ImplicitHybridTableshould consist of 2PhysicalHybridTable.
The second option will make it hard to guarantee backward compatibility because I cannot copy/paste code from BaseSingleStageRequestHandler to ImplicitHybridTableRouteProvider.
Yes it is. If conditions in:
Ensure that code flow branches to logic specific to logical tables. Other than trivial code refactoring to help with code reuse, the current MSE code in not modified. |
c692410 to
2fbb727
Compare
Checkpoint 2 Checkpoint 3 checkstyle Improve how config is obtained in servers. Reduce timeout for test more. Checkstyle Fix comment License Header Update pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Add TODO for time boundary support. Checkstyle Reuse code in ServerPlanRequestUtils Checkstyle
2fbb727 to
5ff3c3e
Compare
pinot-query-planner/src/main/java/org/apache/pinot/query/catalog/PinotCatalog.java
Outdated
Show resolved
Hide resolved
| // TODO: LogicalTableContext has to be cached. https://github.com/apache/pinot/issues/15859 | ||
| @Nullable | ||
| @Override | ||
| public LogicalTableContext getLogicalTableContext(String logicalTableName) { |
There was a problem hiding this comment.
Isn't this a pretty major issue if each MSE query referencing a logical table requires 4 ZK calls? AFAIK we have been very careful about avoiding direct ZK access in any query path.
cc - @Jackie-Jiang
There was a problem hiding this comment.
Yes. Tracked in #15859 and there is a draft PR. It is in draft because it depends on this PR.
| if (logicalTableConfig.getRefOfflineTableName() != null) { | ||
| offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(getPropertyStore(), | ||
| logicalTableConfig.getRefOfflineTableName()); | ||
| if (offlineTableConfig == null) { |
There was a problem hiding this comment.
All logical tables are expected to have a reference offline table AND a reference realtime table?
There was a problem hiding this comment.
Yes. Some parts of query planning require configs that can only be set in physical tables. If there are no offline or realtime tables. then the corresponding reference table name will be null. There are validations for these conditions in CRUD APIs.
...y-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
Outdated
Show resolved
Hide resolved
| * what do each of the string keys store, we define two classes: | ||
| * {@link TableTypeToSegmentsMap} and {@link TableTypeTableNameToSegmentsMap} to help read code more easily. | ||
| */ | ||
| public static class TableTypeToSegmentsMap { |
There was a problem hiding this comment.
So just to clarify, both workerIdToSegmentsMap (physical tables only?) and workerIdToTableSegmentsMap (logical tables only?) are Map<Integer, Map<String, List<String>>> but in the former, the inner map string key is simply OFFLINE or REALTIME and in the latter it's gonna be a physical table name with type?
...query-planner/src/main/java/org/apache/pinot/query/timeboundary/MinTimeBoundaryStrategy.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public TimeBoundaryInfo computeTimeBoundary(LogicalTableConfig logicalTableConfig, TableCache tableCache, | ||
| RoutingManager routingManager) { | ||
| public TimeBoundaryInfo computeTimeBoundary(RoutingManager routingManager) { |
There was a problem hiding this comment.
Are these changes related to support for logical tables in MSE or an unrelated refactor?
There was a problem hiding this comment.
It is related. These APIs were over-fitted for SSE request handler logic where it has both table cache and routing manager. So only function was required.
In MSE, the logic is split. MSE checks table cache in one section and routing manager in another section. So the function had to be broken down. You will notice a similar break up in ImplicitHybridTableRouteProvider and LogicalTableRouteProvider
pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
Show resolved
Hide resolved
...runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
Show resolved
Hide resolved
| String tableType = offlineTableRouteInfoList.isEmpty() ? TableType.REALTIME.name() : TableType.OFFLINE.name(); | ||
| if (tableType.equals(TableType.OFFLINE.name())) { | ||
| Preconditions.checkNotNull(logicalTableContext.getRefOfflineTableConfig()); | ||
| String offlineTableName = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(logicalTableName); |
There was a problem hiding this comment.
Why do we need to suffix the logical table name with OFFLINE?
There was a problem hiding this comment.
There cant be one broker request per table in the list of TableSegmentInfo because too many objects will be created. If that wasnt a constraint, then the broker request would point to a OFFLINE or REALTIME table each. I kept the same semantics that an offline or realtime broker request will at least point to the expected table type. OTOH I looked and could not find code that used the table name from the broker request.
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
I am not able to answer inline. Your understanding is correct. For logical tables, both table name and table type is required. Both the pieces of information is available in a single string. |
| List<TableConfig> tableConfigList = getListTableConfigs(tableNames, database); | ||
| if (tableConfigList == null || tableConfigList.isEmpty()) { | ||
| List<LogicalTableConfig> logicalTableConfigList = getListLogicalTableConfigs(tableNames, database); | ||
| if ((tableConfigList == null || tableConfigList.isEmpty()) && logicalTableConfigList.isEmpty()) { |
There was a problem hiding this comment.
tableConfigList can only be empty right?
There was a problem hiding this comment.
in getListTableConfig, the following if condition is triggered if no offline or realtime table is found for one of the items in the list. Thats why a null check is required. I have added @Nullable annotation to getListTableConfigs. I dont know the reason for this behaviour.
if (tableConfigList.isEmpty()) {
return null;
}
Hmm - I should do the same for logical tables ? This code needs work. The code has to throw if no table or logical table config is found for a specific entry.
There was a problem hiding this comment.
This change was more complicated than I imagined. However I have added tests in BaseLogicalTableIntegrationTest. Tell me if you are more comfortable if I remove the diff for controller submit. I can create a separate PR for that.
There was a problem hiding this comment.
Thanks for the additional tests. I think the logic looks fine, and I'm okay with merging it with the rest of the changes here.
...t-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
Outdated
Show resolved
Hide resolved
...st/java/org/apache/pinot/integration/tests/logicaltable/BaseLogicalTableIntegrationTest.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Yash Mayya <yash.mayya@gmail.com>
This PR adds support to execute queries on logical tables in MSE. It uses classes introduced in #15634.
The high-level workflow is:
PinotCatalogalso exposes logical tables as tables.DispatchPlanVisitorchecks if theTableScanNodepoints to a logical table. If yes, addsLogicalTableRouteInfotoDispatchablePlanMetadata.WorkerManagercreates aworkerId -> {tableType -> {tableName -> segments}}. This is different from physical tables which requireworkerId -> {tableType -> segments}ServerPlanRequestUtilsuses this map to createInstanceRequestwith the field forTableSegmentsInfo.The last step triggers SSE engine to process segments of all physical tables in the logical table.
Closes #15749