Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,16 @@ public class MutableSegmentImpl implements MutableSegment {
private final File _consumerDir;

private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>();
private boolean _indexCapacityThresholdBreached;

private final IdMap<FixedIntArray> _recordIdMap;

private volatile int _numDocsIndexed = 0;
private final int _numKeyColumns;

// Cache the physical (non-virtual) field specs
private final Collection<FieldSpec> _physicalFieldSpecs;
private final Collection<DimensionFieldSpec> _physicalDimensionFieldSpecs;
private final Collection<MetricFieldSpec> _physicalMetricFieldSpecs;
private final Collection<String> _physicalTimeColumnNames;
private final Collection<ComplexFieldSpec> _physicalComplexFieldSpecs;

// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;

private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final String _dedupTimeColumn;

private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final List<String> _upsertComparisonColumns;
private final String _deleteRecordColumn;
Expand All @@ -189,6 +178,11 @@ public class MutableSegmentImpl implements MutableSegment {
// the valid doc ids won't be updated.
private final ThreadSafeMutableRoaringBitmap _validDocIds;
private final ThreadSafeMutableRoaringBitmap _queryableDocIds;
private boolean _indexCapacityThresholdBreached;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this threshold check for all indexes/dictionaries in one segment?

Suggest to rename to _indexThresholdReached and also add javadoc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is one for all check. The PR is only extracting the variable to an instance field. The name was unchanged. But I am open to rename.

private volatile int _numDocsIndexed = 0;
// default message metadata
private volatile long _lastIndexedTimeMs = Long.MIN_VALUE;
private volatile long _latestIngestionTimeMs = Long.MIN_VALUE;

public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics serverMetrics) {
_serverMetrics = serverMetrics;
Expand Down Expand Up @@ -427,6 +421,54 @@ public boolean isMutableSegment() {
}
}

private static Map<String, Pair<String, ValueAggregator>> getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
if (segmentConfig.aggregateMetrics()) {
return fromAggregateMetrics(segmentConfig);
} else if (!CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
return fromAggregationConfig(segmentConfig);
} else {
return Collections.emptyMap();
}
}

private static Map<String, Pair<String, ValueAggregator>> fromAggregateMetrics(RealtimeSegmentConfig segmentConfig) {
Preconditions.checkState(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs()),
"aggregateMetrics cannot be enabled if AggregationConfig is set");

Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>();
for (String metricName : segmentConfig.getSchema().getMetricNames()) {
columnNameToAggregator.put(metricName, Pair.of(metricName,
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, Collections.emptyList())));
}
return columnNameToAggregator;
}

private static Map<String, Pair<String, ValueAggregator>> fromAggregationConfig(RealtimeSegmentConfig segmentConfig) {
Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>();

Preconditions.checkState(!segmentConfig.aggregateMetrics(),
"aggregateMetrics cannot be enabled if AggregationConfig is set");
for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) {
ExpressionContext expressionContext = RequestContextUtils.getExpression(config.getAggregationFunction());
// validation is also done when the table is created, this is just a sanity check.
Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
"aggregation function must be a function: %s", config);
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
TableConfigUtils.validateIngestionAggregation(functionType);
ExpressionContext argument = functionContext.getArguments().get(0);
Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
"aggregator function argument must be a identifier: %s", config);

columnNameToAggregator.put(config.getColumnName(), Pair.of(argument.getIdentifier(),
ValueAggregatorFactory.getValueAggregator(functionType,
functionContext.getArguments().subList(1, functionContext.getArguments().size()))));
}

return columnNameToAggregator;
}

private boolean isNullable(FieldSpec fieldSpec) {
return _schema.isEnableColumnBasedNullHandling() ? fieldSpec.isNullable() : _defaultNullHandlingEnabled;
}
Expand Down Expand Up @@ -659,7 +701,8 @@ private Comparable getComparisonValue(GenericRow row) {
* @throws UnsupportedOperationException if the length of an MV column would exceed the
* capacity of a chunk in the ForwardIndex
*/
private void validateLengthOfMVColumns(GenericRow row) throws UnsupportedOperationException {
private void validateLengthOfMVColumns(GenericRow row)
throws UnsupportedOperationException {
for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
IndexContainer indexContainer = entry.getValue();
FieldSpec fieldSpec = indexContainer._fieldSpec;
Expand All @@ -681,7 +724,6 @@ private void validateLengthOfMVColumns(GenericRow row) throws UnsupportedOperati
}
}


private void updateDictionary(GenericRow row) {
for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
IndexContainer indexContainer = entry.getValue();
Expand All @@ -704,6 +746,7 @@ private void updateDictionary(GenericRow row) {
indexContainer._minValue = dictionary.getMinVal();
indexContainer._maxValue = dictionary.getMaxVal();
}
updateIndexCapacityThresholdBreached(dictionary, entry.getKey());
}
}

Expand Down Expand Up @@ -790,7 +833,9 @@ private void addNewRow(int docId, GenericRow row) {
int dictId = indexContainer._dictId;
for (Map.Entry<IndexType, MutableIndex> indexEntry : indexContainer._mutableIndexes.entrySet()) {
try {
indexEntry.getValue().add(value, dictId, docId);
MutableIndex mutableIndex = indexEntry.getValue();
mutableIndex.add(value, dictId, docId);
updateIndexCapacityThresholdBreached(mutableIndex, indexEntry.getKey(), column);
} catch (Exception e) {
recordIndexingError(indexEntry.getKey(), e);
}
Expand Down Expand Up @@ -831,18 +876,7 @@ private void addNewRow(int docId, GenericRow row) {
try {
MutableIndex mutableIndex = indexEntry.getValue();
mutableIndex.add(values, dictIds, docId);
// Few of the Immutable version of the mutable index are bounded by size like FixedBitMVForwardIndex.
// If num of values overflows or size is above limit, A mutable index is unable to convert to
// an immutable index and segment build fails causing the realtime consumption to stop.
// Hence, The below check is a temporary measure to avoid such scenarios until immutable index
// implementations are changed.
if (!_indexCapacityThresholdBreached && !mutableIndex.canAddMore()) {
_logger.info(
"Index: {} for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true",
indexEntry.getKey(), column
);
_indexCapacityThresholdBreached = true;
}
updateIndexCapacityThresholdBreached(mutableIndex, indexEntry.getKey(), column);
} catch (Exception e) {
recordIndexingError(indexEntry.getKey(), e);
}
Expand All @@ -852,6 +886,36 @@ private void addNewRow(int docId, GenericRow row) {
}
}

private void updateIndexCapacityThresholdBreached(MutableIndex mutableIndex, IndexType indexType, String column) {
// Few of the Immutable version of the mutable index are bounded by size like
// {@link VarByteChunkForwardIndexWriterV4#putBytes(byte[])} and {@link FixedBitMVForwardIndex}
// If num of values or size is above limit, A mutable index is unable to convert to an immutable index and segment
// build fails causing the realtime consumption to stop. Hence, The below check is a temporary measure to avoid
// such scenarios until immutable index implementations are changed.
if (!_indexCapacityThresholdBreached && !mutableIndex.canAddMore()) {
_logger.info(
"Index: {} for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true",
indexType, column
);
_indexCapacityThresholdBreached = true;
}
}

private void updateIndexCapacityThresholdBreached(MutableDictionary dictionary, String column) {
// If optimizeDictionary is enabled, Immutable version of the mutable dictionary may become raw forward index.
// Some of them may be bounded by size like
// {@link VarByteChunkForwardIndexWriterV4#putBytes(byte[])} and {@link FixedBitMVForwardIndex}
// If num of values or size is above limit, A mutable index is unable to convert to an immutable index and segment
// build fails causing the realtime consumption to stop. Hence, The below check is a temporary measure to avoid
// such scenarios until immutable index implementations are changed.
if (!_indexCapacityThresholdBreached && !dictionary.canAddMore()) {
_logger.info(
"Dictionary for column: {} cannot consume more rows, marking _indexCapacityThresholdBreached as true", column
);
_indexCapacityThresholdBreached = true;
}
}

private void recordIndexingError(IndexType<?, ?, ?> indexType, Exception exception) {
_logger.error("failed to index value with {}", indexType, exception);
if (_serverMetrics != null) {
Expand Down Expand Up @@ -1341,54 +1405,6 @@ void updateVarByteMVMaxRowLengthInBytes(Object entry, DataType dataType) {
}
}

private static Map<String, Pair<String, ValueAggregator>> getMetricsAggregators(RealtimeSegmentConfig segmentConfig) {
if (segmentConfig.aggregateMetrics()) {
return fromAggregateMetrics(segmentConfig);
} else if (!CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs())) {
return fromAggregationConfig(segmentConfig);
} else {
return Collections.emptyMap();
}
}

private static Map<String, Pair<String, ValueAggregator>> fromAggregateMetrics(RealtimeSegmentConfig segmentConfig) {
Preconditions.checkState(CollectionUtils.isEmpty(segmentConfig.getIngestionAggregationConfigs()),
"aggregateMetrics cannot be enabled if AggregationConfig is set");

Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>();
for (String metricName : segmentConfig.getSchema().getMetricNames()) {
columnNameToAggregator.put(metricName, Pair.of(metricName,
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, Collections.emptyList())));
}
return columnNameToAggregator;
}

private static Map<String, Pair<String, ValueAggregator>> fromAggregationConfig(RealtimeSegmentConfig segmentConfig) {
Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>();

Preconditions.checkState(!segmentConfig.aggregateMetrics(),
"aggregateMetrics cannot be enabled if AggregationConfig is set");
for (AggregationConfig config : segmentConfig.getIngestionAggregationConfigs()) {
ExpressionContext expressionContext = RequestContextUtils.getExpression(config.getAggregationFunction());
// validation is also done when the table is created, this is just a sanity check.
Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
"aggregation function must be a function: %s", config);
FunctionContext functionContext = expressionContext.getFunction();
AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
TableConfigUtils.validateIngestionAggregation(functionType);
ExpressionContext argument = functionContext.getArguments().get(0);
Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
"aggregator function argument must be a identifier: %s", config);

columnNameToAggregator.put(config.getColumnName(), Pair.of(argument.getIdentifier(),
ValueAggregatorFactory.getValueAggregator(functionType,
functionContext.getArguments().subList(1, functionContext.getArguments().size()))));
}

return columnNameToAggregator;
}

private class IndexContainer implements Closeable {
final FieldSpec _fieldSpec;
final PartitionFunction _partitionFunction;
Expand Down
Loading
Loading