-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Description
Search before asking
- I had searched in the issues and found no similar feature requirement.
Description
Current Architecture
The Func[N] interface (In, Val, Reset) is used as a monolithic accumulator everywhere:
- Data node aggregation plans (via
Analyze) - Liaison node aggregation plans (via
DistributedAnalyze) - TopN post-processor
The distributed push-down is handled with hacks:
needCompletePushDownAggonly supports MAX/MIN/SUM/COUNT (not MEAN)- COUNT is silently converted to SUM at the liaison (measure_analyzer.go:185-186)
- The liaison just deduplicates push-down results rather than properly reducing
New Interface Design
Intermediate type in aggregation.go
// Partial represents the intermediate result of a Map phase.
// For most functions, only Value is meaningful.
// For MEAN, both Value (sum) and Count are used.
type Partial[N Number] struct {
Value N
Count N
}Map interface (replaces Func, used by data node, standalone, and TopN)
// Map accumulates raw values and produces aggregation results.
// It serves as the local accumulator for raw data points.
type Map[N Number] interface {
// In feeds a raw value into the accumulator.
In(N)
// Val returns the locally finalized aggregation result.
// For MEAN, this computes sum/count. For others, same as Partial().Value.
Val() N
// Partial returns the intermediate result for the reduce phase.
Partial() Partial[N]
// Reset clears the accumulator for reuse.
Reset()
}Reduce interface (new, for liaison node use)
// Reduce combines intermediate results from Map phases into a final value.
type Reduce[N Number] interface {
// Combine feeds an intermediate result from a Map phase.
Combine(Partial[N])
// Val returns the final aggregated value.
Val() N
// Reset clears the accumulator for reuse.
Reset()
}Factory functions
func NewMap[N Number](af modelv1.AggregationFunction) (Map[N], error)
func NewReduce[N Number](af modelv1.AggregationFunction) (Reduce[N], error)Concrete Implementations in function.go
SUM
- Map: Accumulates running sum.
Partial()returns{Value: sum}. - Reduce: Sums incoming
Valuefields.Val()returns total sum.
COUNT
- Map: Increments counter.
Partial()returns{Value: count}. - Reduce: Sums incoming
Valuefields (same logic as SUM reduce).Val()returns total count.
MAX
- Map: Tracks local maximum.
Partial()returns{Value: max}. - Reduce: Tracks maximum of incoming
Valuefields.Val()returns global max.
MIN
- Map: Tracks local minimum.
Partial()returns{Value: min}. - Reduce: Tracks minimum of incoming
Valuefields.Val()returns global min.
MEAN (the key case this enables)
- Map: Tracks sum and count.
Partial()returns{Value: sum, Count: count}. - Reduce: Accumulates total sum and total count from incoming partials.
Val()returnstotalSum / totalCount.
Serialization Helpers in aggregation.go
// PartialToFieldValues converts a Partial to field values for wire transport.
func PartialToFieldValues[N Number](p Partial[N]) ([]*modelv1.FieldValue, error)
// FieldValuesToPartial converts field values from wire transport to a Partial.
func FieldValuesToPartial[N Number](fvs []*modelv1.FieldValue) (Partial[N], error)For non-MEAN functions, this produces a single FieldValue. For MEAN, it produces two (sum and count).
Interface Usage by Context
| Context | Interface | Methods used |
|---|---|---|
Standalone (single node via Analyze) |
Map |
In(), Val(), Reset() |
| Distributed - data node | Map |
In(), Partial(), Reset() |
| Distributed - liaison node | Reduce |
Combine(), Val(), Reset() |
| TopN post-processor | Map |
In(), Val(), Reset() |
Usage Changes
Standalone / Data node: measure_plan_aggregation.go
The aggregationPlan and its iterators (aggGroupIterator, aggAllIterator) use Map[N] instead of Func[N]:
- Standalone (single node): Call
mapFunc.In(v)for each raw value, thenmapFunc.Val()for the final result. This is a drop-in replacement for the oldFunc— sameIn/Val/Resetcontract. - Distributed data node: Call
mapFunc.In(v)for each raw value, thenmapFunc.Partial()to produce the intermediate result. Convert viaPartialToFieldValuesfor the wire response.
Liaison node: measure_plan_aggregation.go
The liaison-side aggregation plan and iterators use Reduce[N]:
- Receive intermediate results from data nodes
- Call
reduceFunc.Combine(partial)for each intermediate - Call
reduceFunc.Val()to produce the final result
This likely means splitting the current aggregationPlan into two variants (map vs reduce) or parameterizing it with a mode flag, since the data node plan and liaison node plan serve different roles.
Distributed plan: measure_plan_distributed.go
- Remove the
needCompletePushDownAggflag (all aggregation functions can now be pushed down) - Remove the COUNT-to-SUM conversion hack in measure_analyzer.go:185-186
- The distributed plan always pushes down the Map phase and the liaison always runs Reduce
- The deduplication logic (
deduplicateAggregatedDataPointsWithShard) remains for replica handling
TopN post-processor: topn_post_processor.go
Replace int64Func aggregation.Func[int64] with mapFunc aggregation.Map[int64] on topNAggregatorItem. TopN feeds raw values into the aggregation, so it needs Map semantics (not Reduce). For COUNT, Map.In() correctly increments a counter rather than summing values:
type topNAggregatorItem struct {
mapFunc aggregation.Map[int64]
// ... other fields unchanged
}Usage changes:
exist.mapFunc.In(item.val)instead ofexist.int64Func.In(item.val)(same signature)item.mapFunc.Val()instead ofitem.int64Func.Val()(same signature)
Execution Flow (distributed)
sequenceDiagram
participant Client
participant Liaison as Liaison_Node
participant Data1 as Data_Node_1
participant Data2 as Data_Node_2
Client->>Liaison: QueryRequest with Agg
Liaison->>Data1: InternalQueryRequest (push down agg)
Liaison->>Data2: InternalQueryRequest (push down agg)
Note over Data1: Map phase
Data1->>Data1: mapFunc.In(rawValue) per data point
Data1->>Data1: mapFunc.Partial() -> intermediate
Data1-->>Liaison: InternalQueryResponse with Partial results
Note over Data2: Map phase
Data2->>Data2: mapFunc.In(rawValue) per data point
Data2->>Data2: mapFunc.Partial() -> intermediate
Data2-->>Liaison: InternalQueryResponse with Partial results
Note over Liaison: Reduce phase
Liaison->>Liaison: reduceFunc.Combine(partial1)
Liaison->>Liaison: reduceFunc.Combine(partial2)
Liaison->>Liaison: reduceFunc.Val() -> final result
Liaison-->>Client: QueryResponse
related to #13291
Use case
No response
Related issues
No response
Are you willing to submit a pull request to implement this on your own?
- Yes I am willing to submit a pull request on my own!
Code of Conduct
- I agree to follow this project's Code of Conduct