Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ public interface IBranchStreamBuilder<TIn, TCurrent>
/// <typeparam name="TNext">The type of data after the transformation.</typeparam>
/// <param name="mapFunction">A function to transform data.</param>
/// <returns>The branch stream builder with the new data type.</returns>
IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);

IBranchStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);

/// <summary>
/// Adds a FlatMap operator to the stream. For each input element, it produces zero or more output elements.
/// </summary>
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
/// <typeparam name="TNext">The type of data emitted after flat-mapping.</typeparam>
/// <param name="flatMapFunction">A function that maps an input element to zero or more output elements.</param>
/// <returns>A stream builder emitting elements of type TNext.</returns>
IBranchStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction);

/// <summary>
/// Groups the stream data by a specified key selector.
Expand Down
9 changes: 9 additions & 0 deletions src/Cortex.Streams/Abstractions/IStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public interface IStreamBuilder<TIn, TCurrent>
/// <returns>The stream builder with the new data type.</returns>
IStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);

/// <summary>
/// Adds a FlatMap operator to the stream. For each input element, it produces zero or more output elements.
/// </summary>
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
/// <typeparam name="TNext">The type of data emitted after flat-mapping.</typeparam>
/// <param name="flatMapFunction">A function that maps an input element to zero or more output elements.</param>
/// <returns>A stream builder emitting elements of type TNext.</returns>
IStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction);

/// <summary>
/// Adds a sink function to the stream.
/// </summary>
Expand Down
26 changes: 24 additions & 2 deletions src/Cortex.Streams/BranchStreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public IBranchStreamBuilder<TIn, TCurrent> Filter(Func<TCurrent, bool> predicate
/// <typeparam name="TNext">The type of data after the transformation.</typeparam>
/// <param name="mapFunction">A function to transform data.</param>
/// <returns>The branch stream builder with the new data type.</returns>
public IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction)
public IBranchStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction)
{
var mapOperator = new MapOperator<TCurrent, TNext>(mapFunction);

Expand All @@ -68,7 +68,7 @@ public IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> ma
_lastOperator = mapOperator;
}

return new BranchStreamBuilder<TCurrent, TNext>(_name)
return new BranchStreamBuilder<TIn, TNext>(_name)
{
_firstOperator = _firstOperator,
_lastOperator = _lastOperator
Expand Down Expand Up @@ -246,5 +246,27 @@ public IBranchStreamBuilder<TIn, TCurrent> AggregateSilently<TKey, TAggregate>(F
};
}

public IBranchStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction)
{
var flatMapOperator = new FlatMapOperator<TCurrent, TNext>(flatMapFunction);

if (_firstOperator == null)
{
_firstOperator = flatMapOperator;
_lastOperator = flatMapOperator;
}
else
{
_lastOperator.SetNext(flatMapOperator);
_lastOperator = flatMapOperator;
}

return new BranchStreamBuilder<TIn, TNext>(_name)
{
_firstOperator = _firstOperator,
_lastOperator = _lastOperator,
_sourceAdded = _sourceAdded
};
}
}
}
134 changes: 134 additions & 0 deletions src/Cortex.Streams/Operators/FlatMapOperator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
using Cortex.Telemetry;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Cortex.Streams.Operators
{
/// <summary>
/// The FlatMapOperator takes each input element, applies a function to produce zero or more output elements,
/// and emits each output element individually into the stream.
/// </summary>
/// <typeparam name="TInput">The type of the input element.</typeparam>
/// <typeparam name="TOutput">The type of the output element(s) produced.</typeparam>
public class FlatMapOperator<TInput, TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled
{
private readonly Func<TInput, IEnumerable<TOutput>> _flatMapFunction;
private IOperator _nextOperator;

// Telemetry fields
private ITelemetryProvider _telemetryProvider;
private ICounter _processedCounter;
private ICounter _emittedCounter;
private IHistogram _processingTimeHistogram;
private ITracer _tracer;
private Action _incrementProcessedCounter;
private Action _incrementEmittedCounter;
private Action<double> _recordProcessingTime;

public FlatMapOperator(Func<TInput, IEnumerable<TOutput>> flatMapFunction)
{
_flatMapFunction = flatMapFunction ?? throw new ArgumentNullException(nameof(flatMapFunction));
}

public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
{
_telemetryProvider = telemetryProvider;

if (_telemetryProvider != null)
{
var metrics = _telemetryProvider.GetMetricsProvider();
_processedCounter = metrics.CreateCounter($"flatmap_operator_processed_{typeof(TInput).Name}_to_{typeof(TOutput).Name}", "Number of items processed by FlatMapOperator");
_emittedCounter = metrics.CreateCounter($"flatmap_operator_emitted_{typeof(TInput).Name}_to_{typeof(TOutput).Name}", "Number of items emitted by FlatMapOperator");
_processingTimeHistogram = metrics.CreateHistogram($"flatmap_operator_processing_time_{typeof(TInput).Name}_to_{typeof(TOutput).Name}", "Processing time for FlatMapOperator");
_tracer = _telemetryProvider.GetTracingProvider().GetTracer($"FlatMapOperator_{typeof(TInput).Name}_to_{typeof(TOutput).Name}");

// Cache delegates
_incrementProcessedCounter = () => _processedCounter.Increment();
_incrementEmittedCounter = () => _emittedCounter.Increment();
_recordProcessingTime = value => _processingTimeHistogram.Record(value);
}
else
{
_incrementProcessedCounter = null;
_incrementEmittedCounter = null;
_recordProcessingTime = null;
}

// Propagate telemetry to next operator
if (_nextOperator is ITelemetryEnabled telemetryEnabled)
{
telemetryEnabled.SetTelemetryProvider(telemetryProvider);
}
}

public void Process(object input)
{
if (input == null)
throw new ArgumentNullException(nameof(input));

if (!(input is TInput typedInput))
throw new ArgumentException($"Expected input of type {typeof(TInput).Name}, but received {input.GetType().Name}", nameof(input));

IEnumerable<TOutput> outputs;

if (_telemetryProvider != null)
{
var stopwatch = Stopwatch.StartNew();
using (var span = _tracer.StartSpan("FlatMapOperator.Process"))
{
try
{
outputs = _flatMapFunction(typedInput) ?? Array.Empty<TOutput>();
span.SetAttribute("status", "success");
span.SetAttribute("input_type", typeof(TInput).Name);
span.SetAttribute("output_type", typeof(TOutput).Name);
}
catch (Exception ex)
{
span.SetAttribute("status", "error");
span.SetAttribute("exception", ex.ToString());
throw;
}
finally
{
stopwatch.Stop();
_recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds);
_incrementProcessedCounter?.Invoke();
}
}
}
else
{
outputs = _flatMapFunction(typedInput) ?? Array.Empty<TOutput>();
}

// Emit each output element
foreach (var output in outputs)
{
_incrementEmittedCounter?.Invoke();
_nextOperator?.Process(output);
}
}

public void SetNext(IOperator nextOperator)
{
_nextOperator = nextOperator;

// Propagate telemetry
if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null)
{
nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider);
}
}

public IEnumerable<IOperator> GetNextOperators()
{
if (_nextOperator != null)
yield return _nextOperator;
}
}
}
18 changes: 18 additions & 0 deletions src/Cortex.Streams/StreamBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -561,5 +561,23 @@ public IStreamBuilder<TIn, TCurrent> SetNext(IOperator customOperator)

return this; // Returns the current builder for method chaining
}

public IStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction)
{
var flatMapOperator = new FlatMapOperator<TCurrent, TNext>(flatMapFunction);

if (_firstOperator == null)
{
_firstOperator = flatMapOperator;
_lastOperator = flatMapOperator;
}
else
{
_lastOperator.SetNext(flatMapOperator);
_lastOperator = flatMapOperator;
}

return new StreamBuilder<TIn, TNext>(_name, _firstOperator, _lastOperator, _sourceAdded);
}
}
}
150 changes: 150 additions & 0 deletions src/Cortex.Tests/Streams/Tests/FlatMapOperatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
using Cortex.Streams;
using Cortex.Streams.Operators;

namespace Cortex.Tests.Streams.Tests
{
public class CollectingSink<TInput> : ISinkOperator<TInput>
{
private readonly List<TInput> _collected = new List<TInput>();
public IReadOnlyList<TInput> Collected => _collected;

public void Start() { }
public void Stop() { }

public void Process(TInput input)
{
_collected.Add(input);
}
}


public class FlatMapOperatorTests
{
[Fact]
public void Stream_FlatMap_SplitsInputIntoMultipleOutputs()
{
// Arrange
var collectingSink = new CollectingSink<string>();

// Build the stream:
// Start a stream without a dedicated source, we will just Emit into it.
var stream = StreamBuilder<string, string>
.CreateNewStream("TestStream")
.Stream()
.FlatMap(line => line.Split(' ')) // Use FlatMap to split a sentence into words
.Sink(collectingSink)
.Build();

stream.Start();

// Act
stream.Emit("Hello world from stream");

// Assert
Assert.Equal(4, collectingSink.Collected.Count);
Assert.Contains("Hello", collectingSink.Collected);
Assert.Contains("world", collectingSink.Collected);
Assert.Contains("from", collectingSink.Collected);
Assert.Contains("stream", collectingSink.Collected);

stream.Stop();
}

[Fact]
public void Stream_FlatMap_EmptyResult_EmitsNoOutput()
{
// Arrange
var collectingSink = new CollectingSink<int>();

var stream = StreamBuilder<int, int>
.CreateNewStream("EmptyResultStream")
.Stream()
.FlatMap(num => new int[0]) // Always empty
.Sink(collectingSink)
.Build();

stream.Start();

// Act
stream.Emit(42);

// Assert
Assert.Empty(collectingSink.Collected);

stream.Stop();
}

[Fact]
public void Stream_FlatMap_NullResult_TreatedAsEmpty()
{
// Arrange
var collectingSink = new CollectingSink<int>();

var stream = StreamBuilder<int, int>
.CreateNewStream("NullResultStream")
.Stream()
.FlatMap<int>(num => null) // Always null
.Sink(collectingSink)
.Build();

stream.Start();

// Act
stream.Emit(10);

// Assert
Assert.Empty(collectingSink.Collected);

stream.Stop();
}

[Fact]
public void Stream_FlatMap_ExceptionInFunction_BubblesUp()
{
// Arrange
var collectingSink = new CollectingSink<int>();

var stream = StreamBuilder<int, int>
.CreateNewStream("ExceptionStream")
.Stream()
.FlatMap<int>(num => throw new InvalidOperationException("Test exception"))
.Sink(collectingSink)
.Build();

stream.Start();

// Act & Assert
var ex = Assert.Throws<InvalidOperationException>(() => stream.Emit(5));
Assert.Equal("Test exception", ex.Message);

stream.Stop();
}

[Fact]
public void Stream_FlatMap_SingleOutputEmittedForEachInput()
{
// Arrange
var collectingSink = new CollectingSink<string>();

var stream = StreamBuilder<string, string>
.CreateNewStream("SingleOutputStream")
.Stream()
.FlatMap(line => new[] { line.ToUpper() }) // One-to-one mapping but via flatmap
.Sink(collectingSink)
.Build();

stream.Start();

// Act
stream.Emit("hello");
stream.Emit("world");

// Assert
Assert.Equal(2, collectingSink.Collected.Count);
Assert.Contains("HELLO", collectingSink.Collected);
Assert.Contains("WORLD", collectingSink.Collected);

stream.Stop();
}
}
}