Skip to content

[FEATURE] Multivalue Statistics Functions for PPL Calcite Engine (v3) #4026

@ps48

Description

@ps48

Summary

This RFC proposes the addition of two new multivalue statistical functions to the PPL (Piped Processing Language) Calcite engine (v3): list() and values(). These functions will enhance PPL's analytical capabilities by providing efficient aggregation of multiple values into structured collections, enabling users to perform advanced data analysis and visualization tasks.

Motivation

Current PPL statistics commands provide scalar aggregation functions (count, sum, avg, min, max, etc.) that reduce multiple values to single results. However, many analytical use cases require collecting and preserving multiple values within groups, such as:

  • Building lists of unique values per category for data exploration
  • Creating collections of all values for downstream processing
  • Generating structured data for visualization and reporting
  • Maintaining relationships between grouped data points

The proposed multivalue functions will fill this gap by providing native support for collecting values during aggregation operations.

Detailed Design

1. Function Specifications

1.1 list() Function

Purpose: Collects all values of a field into an ordered array, preserving duplicates and maintaining insertion order.

Syntax: list(field)
Parameters:

  • field: Expression or field name to collect values from

Return Type: ARRAY of the input field's data type
Behavior:

  • Collects all non-null values in the order they are processed
  • Preserves duplicate values
  • Returns empty array if no non-null values exist
  • Null values are ignored (not included in the result)
  • If more than 100 values are in a field, only the first 100 are returned.

Examples:

source=logs | stats list(user_id) by status
source=events | stats list(timestamp) by event_type
source=products | stats count(*), list(category) by brand

1.2 values() Function

Purpose: Collects unique values of a field into an ordered array, removing duplicates while maintaining order of first occurrence.

Syntax: values(field)
Parameters:

  • field: Expression or field name to collect unique values from

Return Type: ARRAY of the input field's data type
Behavior:

  • Collects unique non-null values in order of first occurrence
  • Removes duplicate values
  • Returns empty array if no non-null values exist
  • Null values are ignored (not included in the result)
  • By default there is no limit to the number of values returned. In other similar solutions, users with the appropriate permissions can specify a limit in the limits.conf file. You specify the limit in the [stats | sistats] stanza using the maxvalues setting.

Examples:

source=logs | stats values(user_id) by status
source=events | stats values(source_ip) by hour
source=products | stats count(*), values(category) by brand

2. Implementation Architecture

2.1 Parser Integration

Grammar Changes (OpenSearchPPLParser.g4):

statsFunctionName
   : AVG
   | COUNT
   | SUM
   | MIN
   | MAX
   | VAR_SAMP
   | VAR_POP
   | STDDEV_SAMP
   | STDDEV_POP
   | PERCENTILE
   | LIST        // New
   | VALUES      // New
   ;

Lexer Changes (OpenSearchPPLLexer.g4):

LIST: 'list';
VALUES: 'values';

2.2 Core Expression System

Function Registration (BuiltinFunctionName.java):

*// Aggregation Functions*
LIST(FunctionName.of("list")),
VALUES(FunctionName.of("values")),

Aggregator Functions (AggregatorFunctions.java):

public static void register(BuiltinFunctionRepository repository) {
    // ... existing registrations ...
    repository.register(list());
    repository.register(values());
}

private static DefaultFunctionResolver list() {
    return DefaultFunctionResolver.builder()
        .functionName(BuiltinFunctionName.LIST)
        .functionBundle(
            new FunctionBundle(
                ImmutableMap.<FunctionSignature, FunctionBuilder>builder()
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.STRING)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.STRING))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.INTEGER)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.INTEGER))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.LONG)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.LONG))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.FLOAT)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.FLOAT))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.DOUBLE)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.DOUBLE))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.BOOLEAN)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.BOOLEAN))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.TIMESTAMP)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.TIMESTAMP))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.DATE)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.DATE))
                    .put(new FunctionSignature(BuiltinFunctionName.LIST, ImmutableList.of(ExprCoreType.TIME)), arguments -> new ListAggregator(arguments.get(0), ExprCoreType.TIME))
                    .build()
            )
        )
        .build();
}

private static DefaultFunctionResolver values() {
    return DefaultFunctionResolver.builder()
        .functionName(BuiltinFunctionName.VALUES)
        .functionBundle(
            new FunctionBundle(
                ImmutableMap.<FunctionSignature, FunctionBuilder>builder()
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.STRING)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.STRING))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.INTEGER)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.INTEGER))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.LONG)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.LONG))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.FLOAT)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.FLOAT))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.DOUBLE)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.DOUBLE))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.BOOLEAN)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.BOOLEAN))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.TIMESTAMP)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.TIMESTAMP))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.DATE)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.DATE))
                    .put(new FunctionSignature(BuiltinFunctionName.VALUES, ImmutableList.of(ExprCoreType.TIME)), arguments -> new ValuesAggregator(arguments.get(0), ExprCoreType.TIME))
                    .build()
            )
        )
        .build();
}

2.3 Aggregator Implementation

ListAggregator Class:

public class ListAggregator extends Aggregator<ListAggregator.ListState> {
    
    public static class ListState implements AggregatorState {
        private final List<ExprValue> values;
        private final ExprCoreType elementType;
        
        public ListState(ExprCoreType elementType) {
            this.values = new ArrayList<>();
            this.elementType = elementType;
        }
        
        public void accumulate(ExprValue value) {
            if (!value.isNull()) {
                values.add(value);
            }
        }
        
        public ExprValue result() {
            return new ExprCollectionValue(values);
        }
    }
    
    // Implementation methods for iterate, result, etc.
} 

ValuesAggregator Class:

public class ValuesAggregator extends Aggregator<ValuesAggregator.ValuesState> {
    
    public static class ValuesState implements AggregatorState {
        private final Set<ExprValue> uniqueValues;
        private final List<ExprValue> orderedValues;
        private final ExprCoreType elementType;
        
        public ValuesState(ExprCoreType elementType) {
            this.uniqueValues = new LinkedHashSet<>();
            this.orderedValues = new ArrayList<>();
            this.elementType = elementType;
        }
        
        public void accumulate(ExprValue value) {
            if (!value.isNull() && uniqueValues.add(value)) {
                orderedValues.add(value);
            }
        }
        
        public ExprValue result() {
            return new ExprCollectionValue(orderedValues);
        }
    }
    
    // Implementation methods for iterate, result, etc.
}

2.4 Calcite Integration (Simplified Approach)

Leveraging Calcite Built-in Functions:
The implementation can be significantly simplified by leveraging Apache Calcite's existing SQL library operators:

For list() Function - Using ARRAY_AGG:

// In CalciteAggCallVisitor or similar integration point
private static final Map<String, SqlAggFunction> PPL_TO_CALCITE_AGG_MAPPING =
    new ImmutableMap.Builder<String, SqlAggFunction>()
        .put("list", SqlLibraryOperators.ARRAY_AGG)  // Built-in Calcite function
        .build();

For values() Function - Using ARRAY_AGG with DISTINCT:

// Custom wrapper for distinct array aggregation
public class ValuesAggFunction extends SqlAggFunction {
    public ValuesAggFunction() {
        super("VALUES",
              null,
              SqlKind.OTHER,
              ReturnTypes.ARRAY_ELEMENT_TYPE,
              null,
              OperandTypes.ANY,
              SqlFunctionCategory.SYSTEM,
              false,
              false,
              Optionality.FORBIDDEN);
    }
    
    // Delegates to ARRAY_AGG with DISTINCT modifier
}

Benefits of This Approach:

  • Reduced Implementation Complexity: Leverages mature, tested Calcite functions
  • Better Performance: Calcite's optimized aggregation implementations
  • SQL Standard Compliance: ARRAY_AGG is part of SQL standard (ISO/IEC 9075-2)
  • Automatic Optimization: Benefits from Calcite's query optimization rules
  • Reduced Maintenance: Less custom code to maintain and debug
  • Better Integration: Natural fit with Calcite v3 engine architecture

3. Input/Output Type Specifications

3.1 Supported Input Types

Both list() and values() functions support all core PPL data types:

Numeric Types:

  • INTEGER: 32-bit signed integers
  • LONG: 64-bit signed integers
  • FLOAT: 32-bit floating point
  • DOUBLE: 64-bit floating point

String Types:

  • STRING: Variable-length text
  • TEXT: Analyzed text fields (treated as STRING)

Boolean Type:

  • BOOLEAN: True/false values

Date/Time Types:

  • DATE: Date values (YYYY-MM-DD)
  • TIME: Time values (HH:MM:SS)
  • TIMESTAMP: Date and time values (YYYY-MM-DD HH:MM:SS)

Complex Types:

  • STRUCT: Nested object structures (collected as-is)
  • ARRAY: Nested arrays (creates array of arrays)

3.2 Output Type Specifications

Return Type: ARRAY<T> where T is the input field's data type

Array Characteristics:

  • Ordering: Preserves processing order for both functions
  • Nulls: Input nulls are filtered out (not included in result arrays)
  • Empty Groups: Return empty array [] when no non-null values exist
  • Type Consistency: All elements in returned array have same type as input field

3.3 Detailed Examples by Data Type

3.3.1 String Data
# Input data
source=logs 
| fields user_name, status
# Sample data: 
# user_name="alice", status="active"
# user_name="bob", status="active" 
# user_name="alice", status="active"
# user_name="charlie", status="inactive"

# list() preserves duplicates and order
source=logs | stats list(user_name) as all_users by status
# Result:
# status="active": all_users=["alice", "bob", "alice"]
# status="inactive": all_users=["charlie"]

# values() removes duplicates, preserves first occurrence order
source=logs | stats values(user_name) as unique_users by status  
# Result:
# status="active": unique_users=["alice", "bob"]
# status="inactive": unique_users=["charlie"]
3.3.2 Numeric Data
# Input data with mixed numeric types
source=transactions
| fields amount, category
# Sample data:
# amount=25.50, category="food"
# amount=100, category="food"
# amount=25.50, category="food" 
# amount=75.25, category="transport"

# list() with floating point numbers
source=transactions | stats list(amount) as all_amounts by category
# Result:
# category="food": all_amounts=[25.50, 100.0, 25.50]
# category="transport": all_amounts=[75.25]

# values() with numeric deduplication  
source=transactions | stats values(amount) as unique_amounts by category
# Result:
# category="food": unique_amounts=[25.50, 100.0]
# category="transport": unique_amounts=[75.25]
3.3.3 Boolean Data
# Input data with boolean flags
source=user_actions
| fields is_success, action_type
# Sample data:
# is_success=true, action_type="login"
# is_success=false, action_type="login"  
# is_success=true, action_type="login"
# is_success=true, action_type="logout"

source=user_actions | stats list(is_success) as success_flags by action_type
# Result:
# action_type="login": success_flags=[true, false, true]
# action_type="logout": success_flags=[true]

source=user_actions | stats values(is_success) as unique_flags by action_type  
# Result:
# action_type="login": unique_flags=[true, false]
# action_type="logout": unique_flags=[true]
3.3.4 Date/Time Data
# Input data with timestamps
source=events
| fields event_date, event_type  
# Sample data:
# event_date="2024-01-15", event_type="error"
# event_date="2024-01-16", event_type="error"
# event_date="2024-01-15", event_type="error"
# event_date="2024-01-17", event_type="warning"

source=events | stats list(event_date) as all_dates by event_type
# Result:
# event_type="error": all_dates=["2024-01-15", "2024-01-16", "2024-01-15"]  
# event_type="warning": all_dates=["2024-01-17"]

source=events | stats values(event_date) as unique_dates by event_type
# Result:  
# event_type="error": unique_dates=["2024-01-15", "2024-01-16"]
# event_type="warning": unique_dates=["2024-01-17"]
3.3.5 Complex/Nested Data
# Input data with struct fields
source=products
| fields product_info, category
# Sample data:
# product_info={"id": 123, "name": "laptop"}, category="electronics"
# product_info={"id": 456, "name": "mouse"}, category="electronics"  
# product_info={"id": 123, "name": "laptop"}, category="electronics"

source=products | stats list(product_info) as all_products by category
# Result:
# category="electronics": all_products=[{"id": 123, "name": "laptop"}, {"id": 456, "name": "mouse"}, {"id": 123, "name": "laptop"}]

source=products | stats values(product_info) as unique_products by category
# Result (structs compared by content):
# category="electronics": unique_products=[{"id": 123, "name": "laptop"}, {"id": 456, "name": "mouse"}]

3.4 Edge Cases and Special Behaviors

3.4.1 Null Handling
# Input with null values
source=mixed_data
| fields value, group_id
# Sample data:
# value="a", group_id=1
# value=null, group_id=1
# value="b", group_id=1  
# value=null, group_id=2

source=mixed_data | stats list(value) as all_values by group_id
# Result (nulls filtered out):
# group_id=1: all_values=["a", "b"]
# group_id=2: all_values=[]
3.4.2 Empty Groups
# Group with no matching records
source=sales | where amount > 1000 | stats list(product) as products by region
# Result for regions with no sales > 1000:
# region="small_market": products=[]
3.4.3 Mixed Type Coercion
# Fields with mixed numeric types get promoted to common type
source=mixed_numbers  
| fields number_value, category
# Sample data with mixed int/float:
# number_value=10, category="A"      # integer
# number_value=10.5, category="A"    # float

source=mixed_numbers | stats list(number_value) as numbers by category  
# Result (integers promoted to doubles):
# category="A": numbers=[10.0, 10.5]

4. Usage Examples

4.1 Basic Usage

# Collect all user IDs for each status
source=access_logs 
| stats list(user_id) as all_users by response_status

# Get unique IP addresses per hour
source=network_logs 
| stats values(src_ip) as unique_ips by bin(timestamp, 1h)

4.2 Combined with Other Aggregations

# Multiple statistics with multivalue functions
source=ecommerce 
| stats count(*) as total_orders, 
        sum(amount) as revenue, 
        list(product_id) as all_products, 
        values(category) as categories 
  by customer_segment

4.3 Nested Usage with Evaluation

# Process collected values further
source=events 
| stats values(event_type) as event_types by user_id
| eval event_count = length(event_types)
| where event_count > 3

4.4 Complex Analysis Patterns

# User journey analysis
source=clickstream
| stats list(page_url) as page_sequence,
        values(page_category) as visited_categories,
        count(*) as total_clicks
  by session_id, user_id
| eval journey_length = length(page_sequence)
| where journey_length > 5

# Error pattern detection
source=application_logs 
| where log_level = "ERROR"
| stats list(error_message) as error_sequence,
        values(error_code) as unique_error_codes,
        min(timestamp) as first_error,
        max(timestamp) as last_error
  by service_name, bin(timestamp, 1h)
| eval error_diversity = length(unique_error_codes)
| where error_diversity > 3

Open Design Questions

Question 1: Type Handling and Sorting Behavior for values() Function

Background: The Other similar solutions convert all field values to strings and returns results in lexicographical order. However, this approach may not align with PPL user expectations and could limit functionality.

Behavior in other similar solutions

# Other similar solutions converts everything to strings and sorts lexicographically
source=data | stats values(score) by team
# Input: [1, 10, 2, 20, 3]
# result: ["1", "10", "2", "20", "3"] (strings in lexicographical order)

Proposed PPL Behavior:

# PPL preserves types and uses type-appropriate sorting
source=data | stats values(score) by team  
# Input: [1, 10, 2, 20, 3]
# PPL result: [1, 2, 3, 10, 20] (numbers in numerical order)

Options for PPL Implementation:

Option A: Other similar solutions Compatible (String Conversion + Lexicographical)

Pros:

  • Direct compatibility with existing other solutions' knowledge
  • Consistent behavior across all data types
  • Simpler implementation (single sort algorithm)

Cons:

  • Counter-intuitive for numeric data ("10" < "2" lexicographically)
  • Loss of type information prevents downstream numeric operations
  • Dates don't sort chronologically in many formats
  • Violates PPL's strong typing principles

Option B: PPL-Native (Type-Preserving + Type-Appropriate Sorting)

Pros:

  • Intuitive behavior: numbers sort numerically, dates chronologically
  • Preserves PPL's type system and enables downstream operations
  • Better user experience for analytics use cases
  • Aligns with modern analytics tool expectations

Cons:

  • Breaks compatibility with behavior of other solutions
  • Requires type-specific sorting logic
  • May surprise users familiar with other similar solutions

Option C: Configurable Behavior (Recommended)

Pros:

  • Defaults to the other similar solutions' way of string implementation
  • Allows users to choose based on use case
  • Provides migration path from other similar solutions
  • Maximum flexibility

Cons:

  • Adds complexity to function signature
  • May confuse users about default behavior
  • Increases testing and maintenance

Detailed Comparison:

Data Type Input Values other similar solutions Output (Strings) PPL Option B Output User Expectation Notes
INTEGER [1, 10, 2, 20] ["1", "10", "2", "20"] [1, 2, 10, 20] [1, 2, 10, 20]
--- --- --- --- --- ---
FLOAT [1.5, 10.2, 2.1] ["1.5", "10.2", "2.1"] [1.5, 2.1, 10.2] [1.5, 2.1, 10.2]
DATE [2024-01-10, 2024-01-02] ["2024-01-02", "2024-01-10"] [2024-01-02, 2024-01-10] [2024-01-02, 2024-01-10]
STRING ["banana", "apple", "cherry"] ["apple", "banana", "cherry"] ["apple", "banana", "cherry"] ["apple", "banana", "cherry"]
BOOLEAN [true, false, true] ["false", "true"] [false, true] [false, true]

Impact on Implementation:

  • Option A: Direct mapping to Calcite's ARRAY_AGG(DISTINCT CAST(field AS STRING)) ORDER BY CAST(field AS STRING)
  • Option B: Type-specific handling with appropriate ORDER BY clauses per data type
  • Option C: Additional parameter handling and conditional logic

Recommendation Needed: This design decision significantly impacts user experience and implementation complexity. Feedback is requested on which approach best serves PPL users:

  1. Prioritize compatibility with other similar solutions (Option A) for easier migration
  2. Prioritize user intuition (Option B) for better analytics experience
  3. Provide flexibility (Option C) at the cost of complexity

Question 2: Function Naming Alignment

Should PPL use exactly the same function names (list, values) as other similar solutions, or consider more descriptive names that better reflect PPL's type-aware behavior (e.g., collect, collect_unique, array_agg, distinct_array)?

Question 3: Do we need the maxValue setting for stats?

In other similar solutions users with the appropriate permissions can specify a limit in the limits.conf file. You specify the limit in the [stats | sistats] stanza using the maxvalues setting.

maxvalues = <integer>
* Maximum number of values for any field to keep track of.
* When set to "0": Specifies an unlimited number of values.
* Default: 0

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

New

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions