Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.parser.utils.ParserUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.exception.DatabaseConflictException;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.exception.QueryErrorMessage;
Expand Down Expand Up @@ -240,12 +241,30 @@ private List<String> getInstanceIds(String query, List<String> tableNames, Strin
List<String> instanceIds;
if (!tableNames.isEmpty()) {
List<TableConfig> tableConfigList = getListTableConfigs(tableNames, database);
if (tableConfigList == null || tableConfigList.isEmpty()) {
throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException("Unable to find table in cluster, table does not exist");
List<LogicalTableConfig> logicalTableConfigList = null;
// First check for table configs, if not found, check for logical table configs.
if (tableConfigList.size() != tableNames.size()) {
logicalTableConfigList = getListLogicalTableConfigs(tableNames, database);
// If config is not found for all tables, then find the tables that are not found.
if ((tableConfigList.size() + logicalTableConfigList.size()) != tableNames.size()) {
Set<String> tableNamesFoundSet = new HashSet<>();
for (TableConfig tableConfig : tableConfigList) {
tableNamesFoundSet.add(tableConfig.getTableName());
}
for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) {
tableNamesFoundSet.add(logicalTableConfig.getTableName());
}

List<String> tablesNotFound = tableNames.stream().filter(name -> !tableNamesFoundSet.contains(name))
.collect(Collectors.toList());

throw QueryErrorCode.TABLE_DOES_NOT_EXIST.asException(
"Unable to find table in cluster, table does not exist for tables: " + tablesNotFound);
}
}

// find the unions of all the broker tenant tags of the queried tables.
Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList);
Set<String> brokerTenantsUnion = getBrokerTenantsUnion(tableConfigList, logicalTableConfigList);
if (brokerTenantsUnion.isEmpty()) {
throw QueryErrorCode.BROKER_REQUEST_SEND.asException("Unable to find broker tenant for tables: " + tableNames);
}
Expand Down Expand Up @@ -324,14 +343,28 @@ private List<TableConfig> getListTableConfigs(List<String> tableNames, String da
if (_pinotHelixResourceManager.hasOfflineTable(actualTableName)) {
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(actualTableName)));
}
if (tableConfigList.isEmpty()) {
return null;
// If no table configs found for the table, skip it.
if (!tableConfigList.isEmpty()) {
allTableConfigList.addAll(tableConfigList);
}
allTableConfigList.addAll(tableConfigList);
}
return allTableConfigList;
}

private List<LogicalTableConfig> getListLogicalTableConfigs(List<String> tableNames, String database) {
List<LogicalTableConfig> allLogicalTableConfigList = new ArrayList<>();
for (String tableName : tableNames) {
String actualTableName = _pinotHelixResourceManager.getActualLogicalTableName(tableName, database);
LogicalTableConfig logicalTableConfig =
_pinotHelixResourceManager.getLogicalTableConfig(actualTableName);
if (logicalTableConfig != null) {
allLogicalTableConfigList.add(logicalTableConfig);
}
}
return allLogicalTableConfigList;
}


private String selectRandomInstanceId(List<String> instanceIds) {
if (instanceIds.isEmpty()) {
throw QueryErrorCode.BROKER_RESOURCE_MISSING.asException("No broker found for query");
Expand All @@ -356,11 +389,18 @@ private List<String> findCommonBrokerInstances(Set<String> brokerTenants) {
}

// return the union of brokerTenants from the tables list.
private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList) {
private Set<String> getBrokerTenantsUnion(List<TableConfig> tableConfigList,
@Nullable List<LogicalTableConfig> logicalTableConfigList) {
Set<String> tableBrokerTenants = new HashSet<>();
for (TableConfig tableConfig : tableConfigList) {
tableBrokerTenants.add(tableConfig.getTenantConfig().getBroker());
}

if (logicalTableConfigList != null) {
for (LogicalTableConfig logicalTableConfig : logicalTableConfigList) {
tableBrokerTenants.add(logicalTableConfig.getBrokerTenant());
}
}
return tableBrokerTenants;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,17 @@ public String getActualTableName(String tableName, @Nullable String databaseName
return actualTableName != null ? actualTableName : tableName;
}

/**
* Given a logical table name in any case, returns the logical table name as defined in Helix/Segment/Schema
* @param logicalTableName logical tableName in any case.
* @return logicalTableName actually defined in Pinot (matches case) and exists ,else, return the input value
*/
public String getActualLogicalTableName(String logicalTableName, @Nullable String databaseName) {
logicalTableName = DatabaseUtils.translateTableName(logicalTableName, databaseName, _tableCache.isIgnoreCase());
String actualTableName = _tableCache.getActualLogicalTableName(logicalTableName);
return actualTableName != null ? actualTableName : logicalTableName;
}

/**
* Table related APIs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,10 @@ void reloadSegments(String tableNameWithType, List<String> segmentNames, boolean
* Returns the instance data directory
*/
String getInstanceDataDir();

/**
* Returns the logical table config and schema for the given logical table name.
*/
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider directly throw exception when some config is not available.
Please also add a TODO to remove this method because we should never read ZK at query path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a TODO.

re: exception, an exception is thrown using Preconditions.

    LogicalTableContext logicalTableContext = instanceDataManager.getLogicalTableContext(logicalTableName);
    Preconditions.checkNotNull(logicalTableContext,
        "LogicalTableManager not found for logical table name: " + logicalTableName);

This is similar to all the other gets from InstanceDataManager. Is this pattern OK or should I throw a TableNotFoundException or similar ?

LogicalTableContext getLogicalTableContext(String logicalTableName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.data.manager;

import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;


public class LogicalTableContext {
private final LogicalTableConfig _logicalTableConfig;
private final Schema _logicalTableSchema;
private final TableConfig _refOfflineTableConfig;
private final TableConfig _refRealtimeTableConfig;

public LogicalTableContext(LogicalTableConfig logicalTableConfig, Schema logicalTableSchema,
TableConfig refOfflineTableConfig, TableConfig refRealtimeTableConfig) {
_logicalTableConfig = logicalTableConfig;
_logicalTableSchema = logicalTableSchema;
_refOfflineTableConfig = refOfflineTableConfig;
_refRealtimeTableConfig = refRealtimeTableConfig;
}

public LogicalTableConfig getLogicalTableConfig() {
return _logicalTableConfig;
}

public Schema getLogicalTableSchema() {
return _logicalTableSchema;
}

public TableConfig getRefOfflineTableConfig() {
return _refOfflineTableConfig;
}

public TableConfig getRefRealtimeTableConfig() {
return _refRealtimeTableConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.integration.tests.QueryAssert;
import org.apache.pinot.integration.tests.QueryGenerator;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand All @@ -48,6 +49,7 @@
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.intellij.lang.annotations.Language;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -67,6 +69,7 @@ public abstract class BaseLogicalTableIntegrationTest extends BaseClusterIntegra
private static final String DEFAULT_TENANT = "DefaultTenant";
private static final String DEFAULT_LOGICAL_TABLE_NAME = "mytable";
protected static final String DEFAULT_TABLE_NAME = "physicalTable";
protected static final String EMPTY_OFFLINE_TABLE_NAME = "empty_o";
protected static BaseLogicalTableIntegrationTest _sharedClusterTestSuite = null;
protected List<File> _avroFiles;

Expand Down Expand Up @@ -111,6 +114,7 @@ public void setUp()
_controllerRequestURLBuilder = _sharedClusterTestSuite._controllerRequestURLBuilder;
_helixResourceManager = _sharedClusterTestSuite._helixResourceManager;
_kafkaStarters = _sharedClusterTestSuite._kafkaStarters;
_controllerBaseApiUrl = _sharedClusterTestSuite._controllerBaseApiUrl;
}

_avroFiles = getAllAvroFiles();
Expand Down Expand Up @@ -164,6 +168,7 @@ public void setUp()

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
createLogicalTableWithEmptyOfflineTable();
}

@AfterClass
Expand Down Expand Up @@ -250,6 +255,13 @@ protected String getBrokerTenant() {
return DEFAULT_TENANT;
}

// Setup H2 table with the same name as the logical table.
protected void setUpH2Connection(List<File> avroFiles)
throws Exception {
setUpH2Connection();
ClusterIntegrationTestUtils.setUpH2TableWithAvro(avroFiles, getLogicalTableName(), _h2Connection);
}

/**
* Creates a new OFFLINE table config.
*/
Expand Down Expand Up @@ -324,12 +336,35 @@ protected LogicalTableConfig getLogicalTableConfig(String logicalTableName)
return LogicalTableConfig.fromString(resp);
}

protected void deleteLogicalTable()
private void createLogicalTableWithEmptyOfflineTable()
throws IOException {
String deleteLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableDelete(getLogicalTableName());
// delete logical table
String deleteResponse = ControllerTest.sendDeleteRequest(deleteLogicalTableUrl, getHeaders());
assertEquals(deleteResponse, "{\"status\":\"" + getLogicalTableName() + " logical table successfully deleted.\"}");
Schema schema = createSchema(getSchemaFileName());
schema.setSchemaName(TableNameBuilder.extractRawTableName(EMPTY_OFFLINE_TABLE_NAME));
addSchema(schema);

Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
TableConfig offlineTableConfig = createOfflineTableConfig(EMPTY_OFFLINE_TABLE_NAME);
addTableConfig(offlineTableConfig);
physicalTableConfigMap.put(TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME),
new PhysicalTableConfig());
String refOfflineTableName = TableNameBuilder.OFFLINE.tableNameWithType(EMPTY_OFFLINE_TABLE_NAME);

String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical";

String addLogicalTableUrl = _controllerRequestURLBuilder.forLogicalTableCreate();
Schema logicalTableSchema = createSchema(getSchemaFileName());
logicalTableSchema.setSchemaName(logicalTableName);
addSchema(logicalTableSchema);
LogicalTableConfigBuilder builder =
new LogicalTableConfigBuilder().setTableName(logicalTableName)
.setBrokerTenant(DEFAULT_TENANT)
.setRefOfflineTableName(refOfflineTableName)
.setPhysicalTableConfigMap(physicalTableConfigMap);

String resp =
ControllerTest.sendPostRequest(addLogicalTableUrl, builder.build().toSingleLineJsonString(), getHeaders());
assertEquals(resp, "{\"unrecognizedProperties\":{},\"status\":\"" + logicalTableName
+ " logical table successfully added.\"}");
}

@Override
Expand Down Expand Up @@ -414,22 +449,24 @@ public void verifyLogicalTableConfig()
assertEquals(new HashSet<>(getPhysicalTableNames()), logicalTableConfig.getPhysicalTableConfigMap().keySet());
}

@Test
public void testHardcodedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testHardcodedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
super.testHardcodedQueries();
}

@Test
public void testQueriesFromQueryFile()
throws Exception {
setUseMultiStageQueryEngine(false);
super.testQueriesFromQueryFile();
}

@Test
public void testGeneratedQueries()
@Test(dataProvider = "useBothQueryEngines")
public void testGeneratedQueries(boolean useMultiStageQueryEngine)
throws Exception {
super.testGeneratedQueries(true, false);
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
super.testGeneratedQueries(true, useMultiStageQueryEngine);
}

@Test
Expand Down Expand Up @@ -558,4 +595,60 @@ public void testQueryTimeOut()
exceptions = response.get("exceptions");
assertTrue(exceptions.isEmpty(), "Query should not throw exception");
}

@Test(dataProvider = "useBothQueryEngines")
public void testLogicalTableWithEmptyOfflineTable(boolean useMultiStageQueryEngine)
throws Exception {

setUseMultiStageQueryEngine(useMultiStageQueryEngine);

String logicalTableName = EMPTY_OFFLINE_TABLE_NAME + "_logical";
// Query should return empty result
JsonNode queryResponse = postQuery("SELECT count(*) FROM " + logicalTableName);
assertEquals(queryResponse.get("numDocsScanned").asInt(), 0);
assertEquals(queryResponse.get("numServersQueried").asInt(), useMultiStageQueryEngine ? 1 : 0);
assertTrue(queryResponse.get("exceptions").isEmpty());
}

@Test(dataProvider = "useBothQueryEngines")
void testControllerQuerySubmit(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
@Language("sql")
String query = "SELECT count(*) FROM " + getLogicalTableName();
JsonNode response = postQueryToController(query);
assertNoError(response);

query = "SELECT count(*) FROM " + getOfflineTableNames().get(0);
response = postQueryToController(query);
assertNoError(response);

query = "SELECT count(*) FROM unknown";
response = postQueryToController(query);
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
.containsMessage("TableDoesNotExistError");
}

@Test
void testControllerJoinQuerySubmit()
throws Exception {
setUseMultiStageQueryEngine(true);
@Language("sql")
String query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN " + getPhysicalTableNames().get(0)
+ " ON " + getLogicalTableName() + ".FlightNum = " + getPhysicalTableNames().get(0) + ".FlightNum";
JsonNode response = postQueryToController(query);
assertNoError(response);

query = "SELECT count(*) FROM unknown JOIN " + getPhysicalTableNames().get(0)
+ " ON unknown.FlightNum = " + getPhysicalTableNames().get(0) + ".FlightNum";
response = postQueryToController(query);
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
.containsMessage("TableDoesNotExistError");

query = "SELECT count(*) FROM " + getLogicalTableName() + " JOIN known ON "
+ getLogicalTableName() + ".FlightNum = unknown.FlightNum";
response = postQueryToController(query);
QueryAssert.assertThat(response).firstException().hasErrorCode(QueryErrorCode.TABLE_DOES_NOT_EXIST)
.containsMessage("TableDoesNotExistError");
}
}
Loading
Loading