Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum Key {
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
CALCITE_FALLBACK_ALLOWED("plugins.calcite.fallback.allowed"),
CALCITE_PUSHDOWN_ENABLED("plugins.calcite.pushdown.enabled"),
CALCITE_UDAF_PUSHDOWN_ENABLED("plugins.calcite.udaf_pushdown.enabled"),
CALCITE_PUSHDOWN_ROWCOUNT_ESTIMATION_FACTOR(
"plugins.calcite.pushdown.rowcount.estimation.factor"),
CALCITE_SUPPORT_ALL_JOIN_TYPES("plugins.calcite.all_join_types.allowed"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3259,74 +3259,161 @@ private RexNode explicitMapType(
return new RexInputRef(((RexInputRef) origin).getIndex(), newMapType);
}

/**
* Flattens the parsed pattern result into individual fields for projection.
*
* <p>This method handles two scenarios:
*
* <ul>
* <li>Label mode: extracts pattern (and optionally tokens) from parsedNode
* <li>Aggregation mode: extracts pattern, pattern_count, tokens (optional), and sample_logs
* </ul>
*
* <p>When both flattenPatternAggResult and showNumberedToken are true, the pattern and tokens
* need transformation via evalAggSamples (converting wildcards to numbered tokens).
*
* @param originalPatternResultAlias alias for the pattern field
* @param parsedNode the source RexNode containing parsed pattern data
* @param context the Calcite plan context
* @param flattenPatternAggResult true if in aggregation mode (includes pattern_count,
* sample_logs)
* @param showNumberedToken true if tokens should be extracted and pattern transformed
*/
private void flattenParsedPattern(
String originalPatternResultAlias,
RexNode parsedNode,
CalcitePlanContext context,
boolean flattenPatternAggResult,
Boolean showNumberedToken) {
List<RexNode> fattenedNodes = new ArrayList<>();
boolean showNumberedToken) {
List<RexNode> flattenedNodes = new ArrayList<>();
List<String> projectNames = new ArrayList<>();
// Flatten map struct fields

RelDataType varcharType =
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);

// For aggregation mode with numbered tokens, we need to compute tokens locally
// using evalAggSamples. The UDAF returns pattern with wildcards and sample_logs,
// but NOT tokens (to avoid XContent serialization issues with nested Maps).
// The transformed result contains: pattern (with numbered tokens) and tokens map.
RexNode transformedPatternResult = null;
if (flattenPatternAggResult && showNumberedToken) {
transformedPatternResult = buildEvalAggSamplesCall(parsedNode, context);
}

// Determine source for pattern and tokens:
// - When transformedPatternResult exists, use it (pattern/tokens need transformation)
// - pattern_count and sample_logs always come from the original parsedNode
RexNode patternAndTokensSource =
transformedPatternResult != null ? transformedPatternResult : parsedNode;

// 1. Always add pattern field
RexNode patternExpr =
context.rexBuilder.makeCast(
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR),
varcharType,
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
parsedNode,
patternAndTokensSource,
context.rexBuilder.makeLiteral(PatternUtils.PATTERN)),
true,
true);
fattenedNodes.add(context.relBuilder.alias(patternExpr, originalPatternResultAlias));
flattenedNodes.add(context.relBuilder.alias(patternExpr, originalPatternResultAlias));
projectNames.add(originalPatternResultAlias);

// 2. Add pattern_count when in aggregation mode (from original parsedNode)
if (flattenPatternAggResult) {
RelDataType bigintType =
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT);
RexNode patternCountExpr =
context.rexBuilder.makeCast(
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT),
bigintType,
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
parsedNode,
context.rexBuilder.makeLiteral(PatternUtils.PATTERN_COUNT)),
true,
true);
fattenedNodes.add(context.relBuilder.alias(patternCountExpr, PatternUtils.PATTERN_COUNT));
flattenedNodes.add(context.relBuilder.alias(patternCountExpr, PatternUtils.PATTERN_COUNT));
projectNames.add(PatternUtils.PATTERN_COUNT);
}

// 3. Add tokens when showNumberedToken is enabled
if (showNumberedToken) {
RelDataType tokensType =
context
.rexBuilder
.getTypeFactory()
.createMapType(
varcharType,
context.rexBuilder.getTypeFactory().createArrayType(varcharType, -1));
RexNode tokensExpr =
context.rexBuilder.makeCast(
UserDefinedFunctionUtils.tokensMap,
tokensType,
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
parsedNode,
patternAndTokensSource,
context.rexBuilder.makeLiteral(PatternUtils.TOKENS)),
true,
true);
fattenedNodes.add(context.relBuilder.alias(tokensExpr, PatternUtils.TOKENS));
flattenedNodes.add(context.relBuilder.alias(tokensExpr, PatternUtils.TOKENS));
projectNames.add(PatternUtils.TOKENS);
}

// 4. Add sample_logs when in aggregation mode (from original parsedNode)
if (flattenPatternAggResult) {
RelDataType sampleLogsArrayType =
context.rexBuilder.getTypeFactory().createArrayType(varcharType, -1);
RexNode sampleLogsExpr =
context.rexBuilder.makeCast(
context
.rexBuilder
.getTypeFactory()
.createArrayType(
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), -1),
sampleLogsArrayType,
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
explicitMapType(context, parsedNode, SqlTypeName.VARCHAR),
context.rexBuilder.makeLiteral(PatternUtils.SAMPLE_LOGS)),
true,
true);
fattenedNodes.add(context.relBuilder.alias(sampleLogsExpr, PatternUtils.SAMPLE_LOGS));
flattenedNodes.add(context.relBuilder.alias(sampleLogsExpr, PatternUtils.SAMPLE_LOGS));
projectNames.add(PatternUtils.SAMPLE_LOGS);
}
projectPlusOverriding(fattenedNodes, projectNames, context);

projectPlusOverriding(flattenedNodes, projectNames, context);
}

/**
* Builds the evalAggSamples call to transform pattern with wildcards to numbered tokens and
* compute the tokens map from sample logs.
*
* @param parsedNode The UDAF result containing pattern and sample_logs
* @param context The Calcite plan context
* @return RexNode representing the evalAggSamples call result
*/
private RexNode buildEvalAggSamplesCall(RexNode parsedNode, CalcitePlanContext context) {
// Extract pattern string (with wildcards) from UDAF result
RexNode patternStr =
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
parsedNode,
context.rexBuilder.makeLiteral(PatternUtils.PATTERN));

// Extract sample_logs from UDAF result
RexNode sampleLogs =
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
explicitMapType(context, parsedNode, SqlTypeName.VARCHAR),
context.rexBuilder.makeLiteral(PatternUtils.SAMPLE_LOGS));

// Call evalAggSamples to transform pattern (wildcards -> numbered tokens) and compute tokens
return PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_PATTERN_PARSER,
patternStr,
sampleLogs,
context.rexBuilder.makeLiteral(true));
}

private void buildExpandRelNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@

package org.opensearch.sql.calcite.udf.udaf;

import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;
import org.opensearch.sql.calcite.udf.udaf.LogPatternAggFunction.LogParserAccumulator;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.common.patterns.BrainLogParser;
import org.opensearch.sql.common.patterns.PatternUtils;
import org.opensearch.sql.common.patterns.PatternUtils.ParseResult;

import org.opensearch.sql.common.patterns.PatternAggregationHelpers;

/**
* User-defined aggregate function for log pattern extraction using the Brain algorithm. This UDAF
* is used for in-memory pattern aggregation in Calcite. For OpenSearch scripted metric pushdown,
* see {@link PatternAggregationHelpers} which provides the same logic with Map-based state.
*
* <p>Both implementations share the same underlying logic through {@link PatternAggregationHelpers}
* to ensure consistency.
*/
public class LogPatternAggFunction implements UserDefinedAggFunction<LogParserAccumulator> {
private int bufferLimit = 100000;
private int maxSampleCount = 10;
Expand All @@ -36,10 +37,9 @@ public LogParserAccumulator init() {

@Override
public Object result(LogParserAccumulator acc) {
if (acc.size() == 0 && acc.logSize() == 0) {
if (acc.isEmpty()) {
return null;
}

return acc.value(
maxSampleCount, variableCountThreshold, thresholdPercentage, showNumberedToken);
}
Expand Down Expand Up @@ -83,17 +83,16 @@ public LogParserAccumulator add(
if (Objects.isNull(field)) {
return acc;
}
// Store parameters for result() phase
this.bufferLimit = bufferLimit;
this.maxSampleCount = maxSampleCount;
this.showNumberedToken = showNumberedToken;
this.variableCountThreshold = variableCountThreshold;
this.thresholdPercentage = thresholdPercentage;
acc.evaluate(field);
if (bufferLimit > 0 && acc.logSize() == bufferLimit) {
acc.partialMerge(
maxSampleCount, variableCountThreshold, thresholdPercentage, showNumberedToken);
acc.clearBuffer();
}

// Delegate to shared helper logic
PatternAggregationHelpers.addLogToPattern(
acc.state, field, maxSampleCount, bufferLimit, variableCountThreshold, thresholdPercentage);
return acc;
}

Expand Down Expand Up @@ -147,82 +146,50 @@ public LogParserAccumulator add(
this.variableCountThreshold);
}

/**
* Accumulator for log pattern aggregation. This is a thin wrapper around the Map-based state used
* by {@link PatternAggregationHelpers}, providing type safety for Calcite UDAF while reusing the
* same underlying logic.
*/
public static class LogParserAccumulator implements Accumulator {
private final List<String> logMessages;
public Map<String, Map<String, Object>> patternGroupMap = new HashMap<>();

public int size() {
return patternGroupMap.size();
}

public int logSize() {
return logMessages.size();
}
/** The underlying state map, compatible with PatternAggregationHelpers */
final Map<String, Object> state;

public LogParserAccumulator() {
this.logMessages = new ArrayList<>();
}

public void evaluate(String value) {
logMessages.add(value);
this.state = PatternAggregationHelpers.initPatternAccumulator();
}

public void clearBuffer() {
logMessages.clear();
}

public void partialMerge(Object... argList) {
if (logMessages.isEmpty()) {
return;
}
assert argList.length == 4 : "partialMerge of LogParserAccumulator requires 4 parameters";
int maxSampleCount = (int) argList[0];
BrainLogParser logParser =
new BrainLogParser((int) argList[1], ((Double) argList[2]).floatValue());
Map<String, Map<String, Object>> partialPatternGroupMap =
logParser.parseAllLogPatterns(logMessages, maxSampleCount);
patternGroupMap =
PatternUtils.mergePatternGroups(patternGroupMap, partialPatternGroupMap, maxSampleCount);
@SuppressWarnings("unchecked")
public boolean isEmpty() {
List<String> logMessages = (List<String>) state.get("logMessages");
Map<String, ?> patternGroupMap = (Map<String, ?>) state.get("patternGroupMap");
return (logMessages == null || logMessages.isEmpty())
&& (patternGroupMap == null || patternGroupMap.isEmpty());
}

@Override
public Object value(Object... argList) {
partialMerge(argList);
clearBuffer();

Boolean showToken = (Boolean) argList[3];
return patternGroupMap.values().stream()
.sorted(
Comparator.comparing(
m -> (Long) m.get(PatternUtils.PATTERN_COUNT),
Comparator.nullsLast(Comparator.reverseOrder())))
.map(
m -> {
String pattern = (String) m.get(PatternUtils.PATTERN);
Long count = (Long) m.get(PatternUtils.PATTERN_COUNT);
List<String> sampleLogs = (List<String>) m.get(PatternUtils.SAMPLE_LOGS);
Map<String, List<String>> tokensMap = new HashMap<>();
ParseResult parseResult = null;
if (showToken) {
parseResult = PatternUtils.parsePattern(pattern, PatternUtils.WILDCARD_PATTERN);
for (String sampleLog : sampleLogs) {
PatternUtils.extractVariables(
parseResult, sampleLog, tokensMap, PatternUtils.WILDCARD_PREFIX);
}
}
return ImmutableMap.of(
PatternUtils.PATTERN,
showToken
? parseResult.toTokenOrderString(PatternUtils.WILDCARD_PREFIX)
: pattern,
PatternUtils.PATTERN_COUNT,
count,
PatternUtils.TOKENS,
showToken ? tokensMap : Collections.EMPTY_MAP,
PatternUtils.SAMPLE_LOGS,
sampleLogs);
})
.collect(Collectors.toList());
// Return the current state for use by LogPatternAggFunction.result()
// The argList contains [maxSampleCount, variableCountThreshold, thresholdPercentage,
// showNumberedToken]
if (isEmpty()) {
return null;
}
int maxSampleCount =
argList.length > 0 && argList[0] != null ? ((Number) argList[0]).intValue() : 10;
int variableCountThreshold =
argList.length > 1 && argList[1] != null
? ((Number) argList[1]).intValue()
: BrainLogParser.DEFAULT_VARIABLE_COUNT_THRESHOLD;
double thresholdPercentage =
argList.length > 2 && argList[2] != null
? ((Number) argList[2]).doubleValue()
: BrainLogParser.DEFAULT_FREQUENCY_THRESHOLD_PERCENTAGE;
boolean showNumberedToken =
argList.length > 3 && argList[3] != null && Boolean.TRUE.equals(argList[3]);

return PatternAggregationHelpers.producePatternResult(
state, maxSampleCount, variableCountThreshold, thresholdPercentage, showNumberedToken);
}
}
}
Loading
Loading