Skip to content

Commit 8afa5c2

Browse files
authored
Merge pull request #74 from buildersoftio/v1/feature/37
[v1/feature/37]: Add support for FlatMap
2 parents e464c30 + 5761334 commit 8afa5c2

File tree

6 files changed

+344
-4
lines changed

6 files changed

+344
-4
lines changed

src/Cortex.Streams/Abstractions/IBranchStreamBuilder.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,16 @@ public interface IBranchStreamBuilder<TIn, TCurrent>
2525
/// <typeparam name="TNext">The type of data after the transformation.</typeparam>
2626
/// <param name="mapFunction">A function to transform data.</param>
2727
/// <returns>The branch stream builder with the new data type.</returns>
28-
IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);
29-
28+
IBranchStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);
3029

30+
/// <summary>
31+
/// Adds a FlatMap operator to the stream. For each input element, it produces zero or more output elements.
32+
/// </summary>
33+
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
34+
/// <typeparam name="TNext">The type of data emitted after flat-mapping.</typeparam>
35+
/// <param name="flatMapFunction">A function that maps an input element to zero or more output elements.</param>
36+
/// <returns>A stream builder emitting elements of type TNext.</returns>
37+
IBranchStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction);
3138

3239
/// <summary>
3340
/// Groups the stream data by a specified key selector.

src/Cortex.Streams/Abstractions/IStreamBuilder.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ public interface IStreamBuilder<TIn, TCurrent>
4040
/// <returns>The stream builder with the new data type.</returns>
4141
IStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction);
4242

43+
/// <summary>
44+
/// Adds a FlatMap operator to the stream. For each input element, it produces zero or more output elements.
45+
/// </summary>
46+
/// <typeparam name="TCurrent">The current type of data in the stream.</typeparam>
47+
/// <typeparam name="TNext">The type of data emitted after flat-mapping.</typeparam>
48+
/// <param name="flatMapFunction">A function that maps an input element to zero or more output elements.</param>
49+
/// <returns>A stream builder emitting elements of type TNext.</returns>
50+
IStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction);
51+
4352
/// <summary>
4453
/// Adds a sink function to the stream.
4554
/// </summary>

src/Cortex.Streams/BranchStreamBuilder.cs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public IBranchStreamBuilder<TIn, TCurrent> Filter(Func<TCurrent, bool> predicate
5353
/// <typeparam name="TNext">The type of data after the transformation.</typeparam>
5454
/// <param name="mapFunction">A function to transform data.</param>
5555
/// <returns>The branch stream builder with the new data type.</returns>
56-
public IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction)
56+
public IBranchStreamBuilder<TIn, TNext> Map<TNext>(Func<TCurrent, TNext> mapFunction)
5757
{
5858
var mapOperator = new MapOperator<TCurrent, TNext>(mapFunction);
5959

@@ -68,7 +68,7 @@ public IBranchStreamBuilder<TCurrent, TNext> Map<TNext>(Func<TCurrent, TNext> ma
6868
_lastOperator = mapOperator;
6969
}
7070

71-
return new BranchStreamBuilder<TCurrent, TNext>(_name)
71+
return new BranchStreamBuilder<TIn, TNext>(_name)
7272
{
7373
_firstOperator = _firstOperator,
7474
_lastOperator = _lastOperator
@@ -246,5 +246,27 @@ public IBranchStreamBuilder<TIn, TCurrent> AggregateSilently<TKey, TAggregate>(F
246246
};
247247
}
248248

249+
public IBranchStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction)
250+
{
251+
var flatMapOperator = new FlatMapOperator<TCurrent, TNext>(flatMapFunction);
252+
253+
if (_firstOperator == null)
254+
{
255+
_firstOperator = flatMapOperator;
256+
_lastOperator = flatMapOperator;
257+
}
258+
else
259+
{
260+
_lastOperator.SetNext(flatMapOperator);
261+
_lastOperator = flatMapOperator;
262+
}
263+
264+
return new BranchStreamBuilder<TIn, TNext>(_name)
265+
{
266+
_firstOperator = _firstOperator,
267+
_lastOperator = _lastOperator,
268+
_sourceAdded = _sourceAdded
269+
};
270+
}
249271
}
250272
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
using Cortex.Telemetry;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Diagnostics;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
9+
namespace Cortex.Streams.Operators
10+
{
11+
/// <summary>
12+
/// The FlatMapOperator takes each input element, applies a function to produce zero or more output elements,
13+
/// and emits each output element individually into the stream.
14+
/// </summary>
15+
/// <typeparam name="TInput">The type of the input element.</typeparam>
16+
/// <typeparam name="TOutput">The type of the output element(s) produced.</typeparam>
17+
public class FlatMapOperator<TInput, TOutput> : IOperator, IHasNextOperators, ITelemetryEnabled
18+
{
19+
private readonly Func<TInput, IEnumerable<TOutput>> _flatMapFunction;
20+
private IOperator _nextOperator;
21+
22+
// Telemetry fields
23+
private ITelemetryProvider _telemetryProvider;
24+
private ICounter _processedCounter;
25+
private ICounter _emittedCounter;
26+
private IHistogram _processingTimeHistogram;
27+
private ITracer _tracer;
28+
private Action _incrementProcessedCounter;
29+
private Action _incrementEmittedCounter;
30+
private Action<double> _recordProcessingTime;
31+
32+
public FlatMapOperator(Func<TInput, IEnumerable<TOutput>> flatMapFunction)
33+
{
34+
_flatMapFunction = flatMapFunction ?? throw new ArgumentNullException(nameof(flatMapFunction));
35+
}
36+
37+
public void SetTelemetryProvider(ITelemetryProvider telemetryProvider)
38+
{
39+
_telemetryProvider = telemetryProvider;
40+
41+
if (_telemetryProvider != null)
42+
{
43+
var metrics = _telemetryProvider.GetMetricsProvider();
44+
_processedCounter = metrics.CreateCounter($"flatmap_operator_processed_{typeof(TInput).Name}_to_{typeof(TOutput).Name}", "Number of items processed by FlatMapOperator");
45+
_emittedCounter = metrics.CreateCounter($"flatmap_operator_emitted_{typeof(TInput).Name}_to_{typeof(TOutput).Name}", "Number of items emitted by FlatMapOperator");
46+
_processingTimeHistogram = metrics.CreateHistogram($"flatmap_operator_processing_time_{typeof(TInput).Name}_to_{typeof(TOutput).Name}", "Processing time for FlatMapOperator");
47+
_tracer = _telemetryProvider.GetTracingProvider().GetTracer($"FlatMapOperator_{typeof(TInput).Name}_to_{typeof(TOutput).Name}");
48+
49+
// Cache delegates
50+
_incrementProcessedCounter = () => _processedCounter.Increment();
51+
_incrementEmittedCounter = () => _emittedCounter.Increment();
52+
_recordProcessingTime = value => _processingTimeHistogram.Record(value);
53+
}
54+
else
55+
{
56+
_incrementProcessedCounter = null;
57+
_incrementEmittedCounter = null;
58+
_recordProcessingTime = null;
59+
}
60+
61+
// Propagate telemetry to next operator
62+
if (_nextOperator is ITelemetryEnabled telemetryEnabled)
63+
{
64+
telemetryEnabled.SetTelemetryProvider(telemetryProvider);
65+
}
66+
}
67+
68+
public void Process(object input)
69+
{
70+
if (input == null)
71+
throw new ArgumentNullException(nameof(input));
72+
73+
if (!(input is TInput typedInput))
74+
throw new ArgumentException($"Expected input of type {typeof(TInput).Name}, but received {input.GetType().Name}", nameof(input));
75+
76+
IEnumerable<TOutput> outputs;
77+
78+
if (_telemetryProvider != null)
79+
{
80+
var stopwatch = Stopwatch.StartNew();
81+
using (var span = _tracer.StartSpan("FlatMapOperator.Process"))
82+
{
83+
try
84+
{
85+
outputs = _flatMapFunction(typedInput) ?? Array.Empty<TOutput>();
86+
span.SetAttribute("status", "success");
87+
span.SetAttribute("input_type", typeof(TInput).Name);
88+
span.SetAttribute("output_type", typeof(TOutput).Name);
89+
}
90+
catch (Exception ex)
91+
{
92+
span.SetAttribute("status", "error");
93+
span.SetAttribute("exception", ex.ToString());
94+
throw;
95+
}
96+
finally
97+
{
98+
stopwatch.Stop();
99+
_recordProcessingTime?.Invoke(stopwatch.Elapsed.TotalMilliseconds);
100+
_incrementProcessedCounter?.Invoke();
101+
}
102+
}
103+
}
104+
else
105+
{
106+
outputs = _flatMapFunction(typedInput) ?? Array.Empty<TOutput>();
107+
}
108+
109+
// Emit each output element
110+
foreach (var output in outputs)
111+
{
112+
_incrementEmittedCounter?.Invoke();
113+
_nextOperator?.Process(output);
114+
}
115+
}
116+
117+
public void SetNext(IOperator nextOperator)
118+
{
119+
_nextOperator = nextOperator;
120+
121+
// Propagate telemetry
122+
if (_nextOperator is ITelemetryEnabled nextTelemetryEnabled && _telemetryProvider != null)
123+
{
124+
nextTelemetryEnabled.SetTelemetryProvider(_telemetryProvider);
125+
}
126+
}
127+
128+
public IEnumerable<IOperator> GetNextOperators()
129+
{
130+
if (_nextOperator != null)
131+
yield return _nextOperator;
132+
}
133+
}
134+
}

src/Cortex.Streams/StreamBuilder.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,5 +561,23 @@ public IStreamBuilder<TIn, TCurrent> SetNext(IOperator customOperator)
561561

562562
return this; // Returns the current builder for method chaining
563563
}
564+
565+
public IStreamBuilder<TIn, TNext> FlatMap<TNext>(Func<TCurrent, IEnumerable<TNext>> flatMapFunction)
566+
{
567+
var flatMapOperator = new FlatMapOperator<TCurrent, TNext>(flatMapFunction);
568+
569+
if (_firstOperator == null)
570+
{
571+
_firstOperator = flatMapOperator;
572+
_lastOperator = flatMapOperator;
573+
}
574+
else
575+
{
576+
_lastOperator.SetNext(flatMapOperator);
577+
_lastOperator = flatMapOperator;
578+
}
579+
580+
return new StreamBuilder<TIn, TNext>(_name, _firstOperator, _lastOperator, _sourceAdded);
581+
}
564582
}
565583
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
using Cortex.Streams;
2+
using Cortex.Streams.Operators;
3+
4+
namespace Cortex.Tests.Streams.Tests
5+
{
6+
public class CollectingSink<TInput> : ISinkOperator<TInput>
7+
{
8+
private readonly List<TInput> _collected = new List<TInput>();
9+
public IReadOnlyList<TInput> Collected => _collected;
10+
11+
public void Start() { }
12+
public void Stop() { }
13+
14+
public void Process(TInput input)
15+
{
16+
_collected.Add(input);
17+
}
18+
}
19+
20+
21+
public class FlatMapOperatorTests
22+
{
23+
[Fact]
24+
public void Stream_FlatMap_SplitsInputIntoMultipleOutputs()
25+
{
26+
// Arrange
27+
var collectingSink = new CollectingSink<string>();
28+
29+
// Build the stream:
30+
// Start a stream without a dedicated source, we will just Emit into it.
31+
var stream = StreamBuilder<string, string>
32+
.CreateNewStream("TestStream")
33+
.Stream()
34+
.FlatMap(line => line.Split(' ')) // Use FlatMap to split a sentence into words
35+
.Sink(collectingSink)
36+
.Build();
37+
38+
stream.Start();
39+
40+
// Act
41+
stream.Emit("Hello world from stream");
42+
43+
// Assert
44+
Assert.Equal(4, collectingSink.Collected.Count);
45+
Assert.Contains("Hello", collectingSink.Collected);
46+
Assert.Contains("world", collectingSink.Collected);
47+
Assert.Contains("from", collectingSink.Collected);
48+
Assert.Contains("stream", collectingSink.Collected);
49+
50+
stream.Stop();
51+
}
52+
53+
[Fact]
54+
public void Stream_FlatMap_EmptyResult_EmitsNoOutput()
55+
{
56+
// Arrange
57+
var collectingSink = new CollectingSink<int>();
58+
59+
var stream = StreamBuilder<int, int>
60+
.CreateNewStream("EmptyResultStream")
61+
.Stream()
62+
.FlatMap(num => new int[0]) // Always empty
63+
.Sink(collectingSink)
64+
.Build();
65+
66+
stream.Start();
67+
68+
// Act
69+
stream.Emit(42);
70+
71+
// Assert
72+
Assert.Empty(collectingSink.Collected);
73+
74+
stream.Stop();
75+
}
76+
77+
[Fact]
78+
public void Stream_FlatMap_NullResult_TreatedAsEmpty()
79+
{
80+
// Arrange
81+
var collectingSink = new CollectingSink<int>();
82+
83+
var stream = StreamBuilder<int, int>
84+
.CreateNewStream("NullResultStream")
85+
.Stream()
86+
.FlatMap<int>(num => null) // Always null
87+
.Sink(collectingSink)
88+
.Build();
89+
90+
stream.Start();
91+
92+
// Act
93+
stream.Emit(10);
94+
95+
// Assert
96+
Assert.Empty(collectingSink.Collected);
97+
98+
stream.Stop();
99+
}
100+
101+
[Fact]
102+
public void Stream_FlatMap_ExceptionInFunction_BubblesUp()
103+
{
104+
// Arrange
105+
var collectingSink = new CollectingSink<int>();
106+
107+
var stream = StreamBuilder<int, int>
108+
.CreateNewStream("ExceptionStream")
109+
.Stream()
110+
.FlatMap<int>(num => throw new InvalidOperationException("Test exception"))
111+
.Sink(collectingSink)
112+
.Build();
113+
114+
stream.Start();
115+
116+
// Act & Assert
117+
var ex = Assert.Throws<InvalidOperationException>(() => stream.Emit(5));
118+
Assert.Equal("Test exception", ex.Message);
119+
120+
stream.Stop();
121+
}
122+
123+
[Fact]
124+
public void Stream_FlatMap_SingleOutputEmittedForEachInput()
125+
{
126+
// Arrange
127+
var collectingSink = new CollectingSink<string>();
128+
129+
var stream = StreamBuilder<string, string>
130+
.CreateNewStream("SingleOutputStream")
131+
.Stream()
132+
.FlatMap(line => new[] { line.ToUpper() }) // One-to-one mapping but via flatmap
133+
.Sink(collectingSink)
134+
.Build();
135+
136+
stream.Start();
137+
138+
// Act
139+
stream.Emit("hello");
140+
stream.Emit("world");
141+
142+
// Assert
143+
Assert.Equal(2, collectingSink.Collected.Count);
144+
Assert.Contains("HELLO", collectingSink.Collected);
145+
Assert.Contains("WORLD", collectingSink.Collected);
146+
147+
stream.Stop();
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)