Skip to content

Conversation

@thomasqueirozb
Copy link
Contributor

@thomasqueirozb thomasqueirozb commented Dec 9, 2025

Summary

Fixes two memory leaks in the aggregate transform that caused unbounded memory growth with high-cardinality metrics. The first eliminated an unnecessary HashMap clone on every flush. The second prevented prev_map from accumulating stale metric series indefinitely.

Vector configuration

[sources.metrics_input]
type = "exec"
mode = "streaming"
command = ["python3", "/tmp/metric_generator.py"]
decoding.codec = "json"

[transforms.to_metric]
type = "log_to_metric"
inputs = ["metrics_input"]

[[transforms.to_metric.metrics]]
type = "gauge"
field = "value"
name = "{{ name }}"
tags.container = "{{ tags.container }}"
tags.batch = "{{ tags.batch }}"

[transforms.aggregate_diff]
type = "aggregate"
inputs = ["to_metric"]
interval_ms = 500  # Flush every 500ms
mode = "Diff"  # This mode uses prev_map heavily!

[sinks.blackhole]
type = "blackhole"
inputs = ["aggregate_diff"]
print_interval_secs = 10

How did you test this PR?

Created a Python script to generate high-cardinality changing metrics (500 unique series per batch, constantly changing to simulate pod churn). Ran both the old and fixed versions with Diff aggregation mode for 10 minutes each, monitoring memory usage every 10 seconds.

Metric generator script (metric_generator.py)
#!/usr/bin/env python3
"""
Generate high-cardinality changing metrics to simulate production workload
This mimics container/pod metrics that constantly change (pods being created/destroyed)
"""

import json
import time
import random
import sys

def generate_metrics():
    """Generate metrics with changing series (simulating ephemeral containers)"""
    metric_batch = 0

    while True:
        # Generate a batch of metrics with unique IDs (simulating containers)
        # Old containers "die" and new ones are created
        num_metrics = 500  # 500 unique series per batch

        for i in range(num_metrics):
            # Use batch number to create changing series
            # This simulates containers being created/destroyed
            container_id = f"container_{metric_batch}_{i}"

            metric = {
                "timestamp": time.time(),
                "name": f"cpu_usage_{container_id}",
                "value": random.uniform(0, 100),
                "tags": {
                    "container": container_id,
                    "batch": str(metric_batch)
                }
            }

            print(json.dumps(metric), flush=True)
            time.sleep(0.001)  # 1ms between metrics = 1000 metrics/sec

        metric_batch += 1

        # Every 10 batches, restart to create completely new series
        if metric_batch % 10 == 0:
            sys.stderr.write(f"Generated {metric_batch * num_metrics} total unique series\n")
            sys.stderr.flush()

if __name__ == "__main__":
    generate_metrics()
Test script (test_10min.sh)
#!/bin/bash

# 10 minute test to show the memory leak accumulation over time

TEST_DURATION=600  # 10 minutes
SAMPLE_INTERVAL=10  # Sample every 10 seconds

test_extended() {
    local BINARY=$1
    local LABEL=$2

    echo "========================================"
    echo "$LABEL - 10 MINUTE TEST"
    echo "========================================"
    echo "Start time: $(date)"
    echo ""

    $BINARY -c /tmp/vector-diff-mode.toml > /tmp/vector_10min_$$.log 2>&1 &
    local PID=$!

    sleep 5

    if ! ps -p $PID > /dev/null; then
        echo "Failed to start! Check /tmp/vector_10min_$$.log"
        cat /tmp/vector_10min_$$.log
        return 1
    fi

    echo "PID: $PID"
    echo "Generating high-cardinality changing metrics in Diff mode..."
    echo ""
    echo "Time(m:s) | Memory(KB) | Growth(KB) | Rate(KB/s)"
    echo "----------|------------|------------|------------"

    local START_MEM=$(ps -o rss= -p $PID | tr -d ' ')
    local PREV_MEM=$START_MEM
    local SAMPLES=0

    for i in $(seq $SAMPLE_INTERVAL $SAMPLE_INTERVAL $TEST_DURATION); do
        if ps -p $PID > /dev/null; then
            MEM=$(ps -o rss= -p $PID | tr -d ' ')
            GROWTH=$((MEM - START_MEM))
            DELTA=$((MEM - PREV_MEM))
            RATE=$((DELTA / SAMPLE_INTERVAL))

            # Format time as minutes:seconds
            MINS=$((i / 60))
            SECS=$((i % 60))

            if [ $SAMPLES -eq 0 ]; then
                printf "%5d:%02d | %10d | %10s | %10s\n" $MINS $SECS "$MEM" "baseline" "-"
            else
                printf "%5d:%02d | %10d | %+10d | %+10d\n" $MINS $SECS "$MEM" "$GROWTH" "$RATE"
            fi

            PREV_MEM=$MEM
            SAMPLES=$((SAMPLES + 1))
            sleep $SAMPLE_INTERVAL
        else
            echo "Process died! Check /tmp/vector_10min_$$.log"
            cat /tmp/vector_10min_$$.log | tail -30
            return 1
        fi
    done

    local END_MEM=$MEM
    local TOTAL=$((END_MEM - START_MEM))
    local AVG=$((TOTAL / TEST_DURATION))
    local PERCENT=$((TOTAL * 100 / START_MEM))

    echo ""
    echo "========================================="
    echo "FINAL RESULTS for $LABEL:"
    echo "========================================="
    echo "  Start memory:   ${START_MEM} KB"
    echo "  End memory:     ${END_MEM} KB"
    echo "  Total growth:   ${TOTAL} KB"
    echo "  Average rate:   ${AVG} KB/s"
    echo "  Growth %:       ${PERCENT}%"
    echo ""
    echo "  Duration:       10 minutes"
    echo "  End time:       $(date)"
    echo ""

    # Extract series generation stats
    echo "Metric generation stats:"
    grep "Generated" /tmp/vector_10min_$$.log 2>/dev/null | tail -1 || echo "  (stats not available)"
    echo ""

    kill $PID 2>/dev/null
    wait $PID 2>/dev/null

    echo "$TOTAL" > /tmp/growth_10min_${LABEL// /_}.txt
}

echo "============================================="
echo "10 MINUTE MEMORY LEAK TEST"
echo "============================================="
echo "This extended test will show the accumulation"
echo "of memory over a longer period, simulating"
echo "the production issue from #23093"
echo ""
echo "Test configuration:"
echo "  - Diff mode (uses prev_map heavily)"
echo "  - 500 unique series per batch"
echo "  - Series constantly changing (simulating pod churn)"
echo "  - Duration: 10 minutes per version"
echo ""
echo "This will take approximately 20 minutes total..."
echo "============================================="
echo ""

echo "Phase 1: Testing OLD Vector (with memory leak)..."
test_extended "../vector/target/release/vector" "OLD"

echo ""
echo "Pausing 5 seconds between tests..."
sleep 5
echo ""

echo "Phase 2: Testing NEW Vector (with fix)..."
test_extended "./target/release/vector" "NEW"

echo ""
echo "============================================="
echo "FINAL COMPARISON - 10 MINUTE TEST"
echo "============================================="

OLD_GROWTH=$(cat /tmp/growth_10min_OLD.txt 2>/dev/null || echo "0")
NEW_GROWTH=$(cat /tmp/growth_10min_NEW.txt 2>/dev/null || echo "0")

echo "OLD version total growth: ${OLD_GROWTH} KB"
echo "NEW version total growth: ${NEW_GROWTH} KB"
echo ""

if [ "$OLD_GROWTH" -gt "$NEW_GROWTH" ]; then
    DIFF=$((OLD_GROWTH - NEW_GROWTH))
    PERCENT=$((DIFF * 100 / OLD_GROWTH))

    echo "✓ MEMORY LEAK FIX CONFIRMED!"
    echo ""
    echo "  Reduction:    ${PERCENT}%"
    echo "  Saved:        ${DIFF} KB in 10 minutes"
    echo ""
    echo "Projected savings over time:"
    echo "  1 hour:       $((DIFF * 6)) KB (~$((DIFF * 6 / 1024)) MB)"
    echo "  1 day:        $((DIFF * 144)) KB (~$((DIFF * 144 / 1024)) MB)"
    echo "  1 week:       $((DIFF * 1008)) KB (~$((DIFF * 1008 / 1024)) MB)"
    echo "  1 month:      $((DIFF * 4320)) KB (~$((DIFF * 4320 / 1024)) MB)"
else
    echo "Both versions showed similar growth."
    echo "The leak may require even longer durations to manifest,"
    echo "or the workload may not be triggering it."
fi

echo ""
echo "Test logs saved in /tmp/vector_10min_*.log"
rm -f /tmp/growth_10min_*.txt

Results:

  • Old version: 16,464 KB growth (27 KB/s)
  • Fixed version: 14,384 KB growth (23 KB/s)
  • Improvement: 12.6% reduction in memory growth

All 14 existing aggregate transform unit tests pass.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

  • Closes: 23093

@github-actions github-actions bot added the domain: transforms Anything related to Vector's transform components label Dec 9, 2025
@julienlau
Copy link

This is a bugfix, please integrate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: transforms Anything related to Vector's transform components

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants