Skip to content

Implement IDisposable method to Producer and Consumer Class #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 30, 2021

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Nov 18, 2021

Implement IDisposble method for Producer and Consumer class.

That's the idea:

    using var producer1 = await system.CreateProducer(
                     new ProducerConfig
                     {
                         Reference = Guid.NewGuid().ToString(),
                         Stream = stream,
                         ConfirmHandler = conf => { }
                     });
                await producer1.Send(1, message);
             

user can also call directly Close() to remove the producer from the server ( not mandatory btw)

for Consumer:

   using var consumer = await system.CreateConsumer(
                    new ConsumerConfig
                    {
                        Reference = "consumer",
                        Stream = stream,
                        MessageHandler = async (consumer, ctx, message) => { await Task.CompletedTask; }
                    });

user can also call directly Close() to remove the consumer from the server ( not mandatory btw)

@Gsantomaggio Gsantomaggio changed the title [DON'T Merge] Handle close Add Close() method to Producer and Consumer Class Nov 19, 2021
@Gsantomaggio Gsantomaggio changed the title Add Close() method to Producer and Consumer Class Implement IDisposable method to Producer and Consumer Class Nov 22, 2021
@Gsantomaggio
Copy link
Member Author

Gsantomaggio commented Nov 22, 2021

Hi @stebet
we are having this problem in the github pipeline.

  Passed Tests.SystemTests.CreateSystemThrowsWhenNoEndpointsAreReachable [40 ms]
The active test run was aborted. Reason: Test host process crashed : Unhandled exception. System.AggregateException: One or more errors occurred. (Operation is not valid due to the current state of the object.)
 ---> System.InvalidOperationException: Operation is not valid due to the current state of the object.
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.SignalCompletion()
   at System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore`1.SetException(Exception error)
   at RabbitMQ.Stream.Client.ManualResetValueTaskSource`1.SetException(Exception error) in /home/runner/work/rabbitmq-stream-dotnet-client/rabbitmq-stream-dotnet-client/RabbitMQ.Stream.Client/Client.cs:line 474
   at RabbitMQ.Stream.Client.Client.<>c__33`2.<Request>b__33_0(Object valueTaskSource) in /home/runner/work/rabbitmq-stream-dotnet-client/rabbitmq-stream-dotnet-client/RabbitMQ.Stream.Client/Client.cs:line 226
   at System.Threading.CancellationTokenSource.CallbackNode.<>c.<ExecuteCallback>b__9_0(Object s)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location ---
   at System.Threading.CancellationTokenSource.CallbackNode.ExecuteCallback()
   at System.Threading.CancellationTokenSource.ExecuteCallbackHandlers(Boolean throwOnFirstException)
   --- End of inner exception stack trace ---

See all the tests we did.

But we (I and @MarcialRosales) don't understand what going on there.

The problem seems related to the change we did to add the IDispose interface.
Locally the test works. We cannot reproduce the issue locally.

Wondering if we should remove Idispose and use a specific method to close the producer and consumer.

Thank you

p.s.
Should we add await here:

private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, int timeout = 10000) where TIn : struct, ICommand where TOut : struct, ICommand
        {
            var corr = NextCorrelationId();
            var tcs = PooledTaskSource<TOut>.Rent();
            requests.TryAdd(corr, tcs);
            await Publish(request(corr));
            using (CancellationTokenSource cts = new CancellationTokenSource(timeout))
            {
  /// Add await here???
                await using (

@Gsantomaggio
Copy link
Member Author

@stebet FYI: Ok I got the problem.
See: #25

Add Close/0 method as in Java and Go.
Implement IDisposable interface. Dispose internally calls Close/0.
The Close/0 method removes the Producer/Consumer from the server list and Close the TCP connection if there are no producers and consumers

Co-Authored-By: Ivan Maximov <sungam3r@yandex.ru>
@Gsantomaggio
Copy link
Member Author

per conversation with @kjnilsson

@Gsantomaggio Gsantomaggio deleted the handle_close branch November 30, 2021 15:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants