File tree Expand file tree Collapse file tree 2 files changed +19
-5
lines changed
projects/RabbitMQ.Client/client/impl Expand file tree Collapse file tree 2 files changed +19
-5
lines changed Original file line number Diff line number Diff line change @@ -9,13 +9,20 @@ namespace RabbitMQ.Client.Impl
99 internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1010 {
1111 private readonly ConcurrentDictionary < IModel , WorkPool > _workPools = new ConcurrentDictionary < IModel , WorkPool > ( ) ;
12+ private readonly Func < IModel , WorkPool > _startNewWorkPoolFunc = model => StartNewWorkPool ( model ) ;
1213
1314 public void Schedule < TWork > ( ModelBase model , TWork work ) where TWork : Work
1415 {
15- _workPools . GetOrAdd ( model , StartNewWorkPool ) . Enqueue ( work ) ;
16+ /*
17+ * rabbitmq/rabbitmq-dotnet-client#841
18+ * https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd
19+ * Note that the value delegate is not atomic but the Schedule method will not be called concurrently.
20+ */
21+ WorkPool workPool = _workPools . GetOrAdd ( model , _startNewWorkPoolFunc ) ;
22+ workPool . Enqueue ( work ) ;
1623 }
1724
18- private WorkPool StartNewWorkPool ( IModel model )
25+ private static WorkPool StartNewWorkPool ( IModel model )
1926 {
2027 var newWorkPool = new WorkPool ( model as ModelBase ) ;
2128 newWorkPool . Start ( ) ;
Original file line number Diff line number Diff line change 55
66namespace RabbitMQ . Client . Impl
77{
8- class ConsumerWorkService
8+ internal class ConsumerWorkService
99 {
1010 private readonly ConcurrentDictionary < IModel , WorkPool > _workPools = new ConcurrentDictionary < IModel , WorkPool > ( ) ;
11+ private readonly Func < IModel , WorkPool > _startNewWorkPoolFunc = model => StartNewWorkPool ( model ) ;
1112
1213 public void AddWork ( IModel model , Action fn )
1314 {
14- _workPools . GetOrAdd ( model , StartNewWorkPool ) . Enqueue ( fn ) ;
15+ /*
16+ * rabbitmq/rabbitmq-dotnet-client#841
17+ * https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd
18+ * Note that the value delegate is not atomic but the AddWork method will not be called concurrently.
19+ */
20+ WorkPool workPool = _workPools . GetOrAdd ( model , _startNewWorkPoolFunc ) ;
21+ workPool . Enqueue ( fn ) ;
1522 }
1623
17- private WorkPool StartNewWorkPool ( IModel model )
24+ private static WorkPool StartNewWorkPool ( IModel model )
1825 {
1926 var newWorkPool = new WorkPool ( ) ;
2027 newWorkPool . Start ( ) ;
You can’t perform that action at this time.
0 commit comments