1
1
using System ;
2
+ using System . Collections . Concurrent ;
2
3
using System . Collections . Generic ;
4
+ using System . Linq ;
5
+ using System . Linq . Expressions ;
3
6
using System . Runtime . Caching ;
4
7
using System . Threading . Tasks ;
5
8
using NUnit . Framework ;
@@ -21,43 +24,85 @@ public async Task Show()
21
24
Resolve . WhenEqualToHandlerMessageType ( new Projection ( ) . Handlers ) ) .
22
25
ProjectAsync ( cache , new object [ ]
23
26
{
24
- new Envelope < PortfolioAdded > (
27
+ new Envelope (
25
28
new PortfolioAdded { Id = portfolioId , Name = "My portfolio" } ,
26
29
new Dictionary < string , object >
27
30
{
28
31
{ "Position" , 0L }
29
- } ) ,
30
- new Envelope < PortfolioRenamed > (
32
+ } ) . ToGenericEnvelope ( ) ,
33
+ new Envelope (
31
34
new PortfolioRenamed { Id = portfolioId , Name = "Your portfolio" } ,
32
35
new Dictionary < string , object >
33
36
{
34
37
{ "Position" , 1L }
35
- } ) ,
36
- new Envelope < PortfolioRemoved > (
38
+ } ) . ToGenericEnvelope ( ) ,
39
+ new Envelope (
37
40
new PortfolioRemoved { Id = portfolioId } ,
38
41
new Dictionary < string , object >
39
42
{
40
43
{ "Position" , 2L }
41
- } )
44
+ } ) . ToGenericEnvelope ( )
42
45
} ) ;
43
46
}
44
47
}
45
48
46
- class Envelope < TMessage >
49
+ class Envelope < TMessage > //Used by handlers
47
50
{
48
- public Envelope ( TMessage message , IDictionary < string , object > metadata )
51
+ private readonly Envelope _envelope ;
52
+
53
+ public Envelope ( Envelope envelope )
54
+ {
55
+ if ( envelope == null )
56
+ throw new ArgumentNullException ( nameof ( envelope ) ) ;
57
+ _envelope = envelope ;
58
+ }
59
+
60
+ public TMessage Message => ( TMessage ) _envelope . Message ;
61
+ public long Position => ( long ) _envelope . Metadata [ "Position" ] ;
62
+ }
63
+
64
+ class Envelope //Used by dispatchers
65
+ {
66
+ //Note we could precompute these factories for all known message types.
67
+ private static readonly ConcurrentDictionary < Type , Func < Envelope , object > > Factories =
68
+ new ConcurrentDictionary < Type , Func < Envelope , object > > ( ) ;
69
+
70
+ public Envelope ( object message , IReadOnlyDictionary < string , object > metadata )
49
71
{
72
+ if ( message == null )
73
+ throw new ArgumentNullException ( nameof ( message ) ) ;
50
74
if ( metadata == null )
51
75
throw new ArgumentNullException ( nameof ( metadata ) ) ;
52
76
Message = message ;
53
77
Metadata = metadata ;
54
78
}
55
79
56
- public TMessage Message { get ; }
57
- public IDictionary < string , object > Metadata { get ; }
58
- public long Position => ( long ) Metadata [ "Position" ] ;
80
+ public object Message { get ; }
81
+ public IReadOnlyDictionary < string , object > Metadata { get ; }
82
+
83
+ public object ToGenericEnvelope ( )
84
+ {
85
+ var factory = Factories
86
+ . GetOrAdd ( Message . GetType ( ) , typeOfMessage =>
87
+ {
88
+ var parameter = Expression
89
+ . Parameter ( typeof ( Envelope ) , "envelope" ) ;
90
+ return Expression
91
+ . Lambda < Func < Envelope , object > > (
92
+ Expression . New (
93
+ typeof ( Envelope < > )
94
+ . MakeGenericType ( typeOfMessage )
95
+ . GetConstructors ( )
96
+ . Single ( ) ,
97
+ parameter ) ,
98
+ parameter )
99
+ . Compile ( ) ;
100
+ } ) ;
101
+ return factory ( this ) ;
102
+ }
59
103
}
60
104
105
+
61
106
class Projection : ConnectedProjection < MemoryCache >
62
107
{
63
108
public Projection ( )
@@ -79,7 +124,7 @@ public Projection()
79
124
} ) ;
80
125
When < Envelope < PortfolioRemoved > > ( ( cache , envelope ) =>
81
126
{
82
- cache . Remove ( envelope . Message . Id . ToString ( ) ) ;
127
+ ' cache . Remove ( envelope . Message . Id . ToString ( ) ) ;
83
128
} ) ;
84
129
When < Envelope < PortfolioRenamed > > ( ( cache , envelope ) =>
85
130
{
0 commit comments