Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.api.RequesterIdentity;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
Expand Down Expand Up @@ -381,10 +383,12 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
offlineBrokerRequest = getOfflineBrokerRequest(serverBrokerRequest);
PinotQuery offlinePinotQuery = offlineBrokerRequest.getPinotQuery();
handleExpressionOverride(offlinePinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName));
handleTimestampIndexOverride(offlinePinotQuery, offlineTableConfig);
_queryOptimizer.optimize(offlinePinotQuery, offlineTableConfig, schema);
realtimeBrokerRequest = getRealtimeBrokerRequest(serverBrokerRequest);
PinotQuery realtimePinotQuery = realtimeBrokerRequest.getPinotQuery();
handleExpressionOverride(realtimePinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName));
handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
_queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig, schema);
requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
Expand All @@ -393,6 +397,7 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
// OFFLINE only
setTableName(serverBrokerRequest, offlineTableName);
handleExpressionOverride(pinotQuery, _tableCache.getExpressionOverrideMap(offlineTableName));
handleTimestampIndexOverride(pinotQuery, offlineTableConfig);
_queryOptimizer.optimize(pinotQuery, offlineTableConfig, schema);
offlineBrokerRequest = serverBrokerRequest;
requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
Expand All @@ -401,6 +406,7 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
// REALTIME only
setTableName(serverBrokerRequest, realtimeTableName);
handleExpressionOverride(pinotQuery, _tableCache.getExpressionOverrideMap(realtimeTableName));
handleTimestampIndexOverride(pinotQuery, realtimeTableConfig);
_queryOptimizer.optimize(pinotQuery, realtimeTableConfig, schema);
realtimeBrokerRequest = serverBrokerRequest;
requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
Expand Down Expand Up @@ -563,6 +569,64 @@ private BrokerResponseNative handleSQLRequest(long requestId, String query, Json
return brokerResponse;
}

private void handleTimestampIndexOverride(PinotQuery pinotQuery, @Nullable TableConfig tableConfig) {
if (tableConfig == null || tableConfig.getFieldConfigList() == null) {
return;
}

Set<String> timestampIndexColumns = _tableCache.getTimestampIndexColumns(tableConfig.getTableName());
if (CollectionUtils.isEmpty(timestampIndexColumns)) {
return;
}
for (Expression expression : pinotQuery.getSelectList()) {
setTimestampIndexExpressionOverrideHints(expression, timestampIndexColumns, pinotQuery);
}
setTimestampIndexExpressionOverrideHints(pinotQuery.getFilterExpression(), timestampIndexColumns, pinotQuery);
setTimestampIndexExpressionOverrideHints(pinotQuery.getHavingExpression(), timestampIndexColumns, pinotQuery);
List<Expression> groupByList = pinotQuery.getGroupByList();
if (CollectionUtils.isNotEmpty(groupByList)) {
groupByList.forEach(
expression -> setTimestampIndexExpressionOverrideHints(expression, timestampIndexColumns, pinotQuery));
}
List<Expression> orderByList = pinotQuery.getOrderByList();
if (CollectionUtils.isNotEmpty(orderByList)) {
orderByList.forEach(
expression -> setTimestampIndexExpressionOverrideHints(expression, timestampIndexColumns, pinotQuery));
}
}

private void setTimestampIndexExpressionOverrideHints(@Nullable Expression expression,
Set<String> timestampIndexColumns,
PinotQuery pinotQuery) {
if (expression == null || expression.getFunctionCall() == null) {
return;
}
Function function = expression.getFunctionCall();
switch (function.getOperator()) {
case "datetrunc":
String granularString = function.getOperands().get(0).getLiteral().getStringValue();
Expression timeExpression = function.getOperands().get(1);
if (((function.getOperandsSize() == 2)
|| (function.getOperandsSize() == 3
&& "MILLISECONDS".equalsIgnoreCase(function.getOperands().get(2).getLiteral().getStringValue())))
&& TimestampIndexGranularity.isValidTimeGranularity(granularString)
&& timeExpression.getIdentifier() != null) {
String timeColumn = timeExpression.getIdentifier().getName();
String timeColumnWithGranularity = TimestampIndexGranularity.getColumnNameWithGranularity(timeColumn,
TimestampIndexGranularity.valueOf(granularString));
if (timestampIndexColumns.contains(timeColumnWithGranularity)) {
pinotQuery.putToExpressionOverrideHints(expression,
RequestUtils.getIdentifierExpression(timeColumnWithGranularity));
}
}
break;
default:
break;
}
function.getOperands()
.forEach(operand -> setTimestampIndexExpressionOverrideHints(operand, timestampIndexColumns, pinotQuery));
}

/** Set EXPLAIN PLAN query to route to only one segment on one server. */
private void setRoutingToOneSegment(Map<ServerInstance, List<String>> routingTable) {
Set<Map.Entry<ServerInstance, List<String>>> servers = routingTable.entrySet();
Expand Down Expand Up @@ -1795,6 +1859,9 @@ private static String getActualColumnName(String rawTableName, String columnName
return actualAlias;
}
}
if (columnName.charAt(0) == '$') {
return columnName;
}
throw new BadQueryRequestException("Unknown columnName '" + columnName + "' found in the query");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pinot.spi.config.provider.TableConfigChangeListener;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -174,6 +175,15 @@ public Map<Expression, Expression> getExpressionOverrideMap(String tableNameWith
return tableConfigInfo != null ? tableConfigInfo._expressionOverrideMap : null;
}

/**
* Returns the timestamp index columns for the given table, or {@code null} if table does not exist.
*/
@Nullable
public Set<String> getTimestampIndexColumns(String tableNameWithType) {
TableConfigInfo tableConfigInfo = _tableConfigInfoMap.get(tableNameWithType);
return tableConfigInfo != null ? tableConfigInfo._timestampIndexColumns : null;
}

/**
* Returns the table config for the given table, or {@code null} if it does not exist.
*/
Expand Down Expand Up @@ -485,6 +495,8 @@ public synchronized void handleDataDeleted(String path) {
private static class TableConfigInfo {
final TableConfig _tableConfig;
final Map<Expression, Expression> _expressionOverrideMap;
// All the timestamp with granularity column names
final Set<String> _timestampIndexColumns;

private TableConfigInfo(TableConfig tableConfig) {
_tableConfig = tableConfig;
Expand Down Expand Up @@ -513,6 +525,7 @@ private TableConfigInfo(TableConfig tableConfig) {
} else {
_expressionOverrideMap = null;
}
_timestampIndexColumns = TimestampIndexGranularity.extractTimestampIndexGranularityColumnNames(tableConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ public static int millisecond(long millis, String timezoneId) {
* @return truncated timeValue in TimeUnit.MILLISECONDS
*/
@ScalarFunction
public long dateTrunc(String unit, long timeValue) {
public static long dateTrunc(String unit, long timeValue) {
return dateTrunc(unit, timeValue, TimeUnit.MILLISECONDS, ISOChronology.getInstanceUTC(), TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
Expand Down Expand Up @@ -105,6 +106,11 @@ public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadi
if (segmentMetadata.getTotalDocs() == 0) {
return new EmptyIndexSegment(segmentMetadata);
}
if (schema != null) {
schema = SegmentGeneratorConfig.updateSchemaWithTimestampIndexes(schema,
SegmentGeneratorConfig.extractTimestampIndexConfigsFromTableConfig(indexLoadingConfig.getTableConfig()));
}

if (needPreprocess) {
preprocess(indexDir, indexLoadingConfig, schema);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.util.Set;
import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -65,6 +67,19 @@ public ExpressionTransformer(TableConfig tableConfig, Schema schema) {
}
}
}
// For fields with Timestamp indexes, also generate the corresponding values during record transformation.
if (tableConfig.getFieldConfigList() != null) {
for (FieldConfig fieldConfig : tableConfig.getFieldConfigList()) {
if (fieldConfig.getIndexTypes().contains(FieldConfig.IndexType.TIMESTAMP)) {
for (TimestampIndexGranularity granularity : fieldConfig.getTimestampConfig().getGranularities()) {
expressionEvaluators.put(
TimestampIndexGranularity.getColumnNameWithGranularity(fieldConfig.getName(), granularity),
FunctionEvaluatorFactory.getExpressionEvaluator(
String.format("dateTrunc(\'" + granularity + "\', " + fieldConfig.getName() + ")")));
}
}
}
}

// Carry out DFS traversal to topologically sort column names based on transform function dependencies. Throw
// exception if a cycle is discovered. When a name is first seen it is added to discoveredNames set. When a name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
Expand All @@ -39,6 +40,8 @@
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
Expand Down Expand Up @@ -66,6 +69,7 @@ public class IndexLoadingConfig {
private Set<String> _varLengthDictionaryColumns = new HashSet<>();
private Set<String> _onHeapDictionaryColumns = new HashSet<>();
private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
private Map<String, List<TimestampIndexGranularity>> _timestampIndexConfigs = new HashMap<>();
private boolean _enableDynamicStarTreeCreation;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private boolean _enableDefaultStarTree;
Expand Down Expand Up @@ -150,6 +154,24 @@ private void extractFromTableConfig(TableConfig tableConfig) {
extractTextIndexColumnsFromTableConfig(tableConfig);
extractFSTIndexColumnsFromTableConfig(tableConfig);
extractH3IndexConfigsFromTableConfig(tableConfig);
_timestampIndexConfigs.putAll(SegmentGeneratorConfig.extractTimestampIndexConfigsFromTableConfig(tableConfig));

// Apply range index and transform functions for all Timestamp column with granularities columns.
for (String timestampColumn : _timestampIndexConfigs.keySet()) {
for (TimestampIndexGranularity granularity : _timestampIndexConfigs.get(timestampColumn)) {
// Apply range index
_rangeIndexColumns.add(TimestampIndexGranularity.getColumnNameWithGranularity(timestampColumn, granularity));

// Apply transform functions
TransformConfig transformConfig =
new TransformConfig(TimestampIndexGranularity.getColumnNameWithGranularity(timestampColumn, granularity),
TimestampIndexGranularity.getTransformExpression(timestampColumn, granularity));
List<TransformConfig> transformConfigs = tableConfig.getIngestionConfig().getTransformConfigs();
if (!transformConfigs.contains(transformConfig)) {
transformConfigs.add(transformConfig);
}
}
}

Map<String, String> noDictionaryConfig = indexingConfig.getNoDictionaryConfig();
if (noDictionaryConfig != null) {
Expand Down Expand Up @@ -318,6 +340,10 @@ public Map<String, H3IndexConfig> getH3IndexConfigs() {
return _h3IndexConfigs;
}

public Map<String, List<TimestampIndexGranularity>> getTimestampIndexConfigs() {
return _timestampIndexConfigs;
}

public Map<String, Map<String, String>> getColumnProperties() {
return _columnProperties;
}
Expand Down Expand Up @@ -378,6 +404,11 @@ public void setBloomFilterConfigs(Map<String, BloomFilterConfig> bloomFilterConf
_bloomFilterConfigs = bloomFilterConfigs;
}

@VisibleForTesting
public void setTimestampIndexColumns(Map<String, List<TimestampIndexGranularity>> timestampIndexConfigs) {
_timestampIndexConfigs = timestampIndexConfigs;
}

@VisibleForTesting
public void setOnHeapDictionaryColumns(Set<String> onHeapDictionaryColumns) {
_onHeapDictionaryColumns = onHeapDictionaryColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldCon
Preconditions.checkState(fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
"TEXT Index is only supported for string columns");
break;
case TIMESTAMP:
Preconditions.checkState(fieldConfigColSpec.getDataType() == DataType.TIMESTAMP,
"TIMESTAMP Index is only supported for timestamp columns");
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public void testValidateFieldConfig() {
.setNoDictionaryColumns(Arrays.asList("myCol1")).build();

try {
FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null);
FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null, null);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
} catch (Exception e) {
Expand Down
Loading