Skip to content

Conversation

@Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Sep 7, 2022

Fixes: #150
Reconnect ReliableProducer and/or ReliableConsumer in case the leader is down and if the Stream changes the topology.

Signed-off-by: Gabriele Santomaggio G.santomaggio@gmail.com

Refactor The Reliable part

How to test:

using a cluster with 3 nodes and:

 using System.Net;
using System.Text;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;

namespace example;

public static class MultiEndPoints
{
    public static async Task Start()
    {

        var config = new StreamSystemConfig()
        {
            UserName = "test",
            Password = "test",
            Endpoints = new EndPoint[]
            {
                new DnsEndPoint("node0", 5552),
                new DnsEndPoint("node1", 5552),
                new DnsEndPoint("node2", 5552),
            }
        };

        var system = await StreamSystem.Create(config);
        var streamName = Guid.NewGuid().ToString();
        await system.CreateStream(new StreamSpec(streamName));

        var producer = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
        {
            Stream = streamName,
            StreamSystem = system,
            ConfirmationHandler = confirmation =>
            {
                Console.WriteLine($"Confirmation: {confirmation.Status} {confirmation.PublishingId}");
                return Task.CompletedTask;
            },
        });

        var t= Task.Run(async () =>
        {
            for (int i = 0; i < 1000; i++)
            {
                if (!producer.IsOpen())
                {
                    Console.WriteLine("Producer is not connected");
                    return;
                }
                await producer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
                Console.WriteLine($"Sent message {i}");
                Thread.Sleep(1 * 1000);
            }
        });

        var consumer = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
        {
            Stream = streamName,
            StreamSystem = system,
            MessageHandler = (_, _, message) =>
            {
                Console.WriteLine($"Consumed message {Encoding.UTF8.GetString(message.Data.Contents)}");
                return Task.CompletedTask;
            },
        });
        Console.WriteLine("Press enter to stop");
        Console.ReadKey();
        t.Dispose();
        await producer.Close();
        await consumer.Close();
        await system.DeleteStream(streamName);
        await system.Close();
        Console.WriteLine("Closed");
    }
}

Then Identify the leader node for the queue:
Screen Shot 2022-09-07 at 19 20 11

And stop the node; in this case, it is the node0. The producer should reconnect to another node.

Note:

Locator connection can be in the same leader stream node. The locator connection should be able to reconnect to another node too.

@Gsantomaggio Gsantomaggio marked this pull request as draft September 7, 2022 14:41
@Gsantomaggio
Copy link
Member Author

@ricsiLT when you have time, can you please try it?

still work in progress but it should work :) !

@Gsantomaggio Gsantomaggio changed the title Fix Reconnect ReliableProducer in case leader is down Fix Reconnect ReliableProducer/ ReliableConsumer in case leader is down Sep 7, 2022
@ricsiLT
Copy link
Contributor

ricsiLT commented Sep 7, 2022

Had this issue today, would have been perfect :D Will try to deploy tomorrow and cause and issue with a forced restart ^^

@codecov
Copy link

codecov bot commented Sep 8, 2022

Codecov Report

Merging #163 (a454f5e) into main (56c0e68) will decrease coverage by 0.43%.
The diff coverage is 93.29%.

@@            Coverage Diff             @@
##             main     #163      +/-   ##
==========================================
- Coverage   92.29%   91.85%   -0.44%     
==========================================
  Files          78       78              
  Lines        6122     6201      +79     
  Branches      378      385       +7     
==========================================
+ Hits         5650     5696      +46     
- Misses        382      412      +30     
- Partials       90       93       +3     
Impacted Files Coverage Δ
...bitMQ.Stream.Client/Reliable/IReconnectStrategy.cs 78.94% <42.85%> (-21.06%) ⬇️
RabbitMQ.Stream.Client/StreamSystem.cs 91.75% <73.33%> (-1.78%) ⬇️
RabbitMQ.Stream.Client/MetaData.cs 91.58% <83.33%> (ø)
Tests/ReliableTests.cs 99.27% <93.18%> (-0.73%) ⬇️
RabbitMQ.Stream.Client/Reliable/ReliableBase.cs 98.63% <97.77%> (-1.37%) ⬇️
RabbitMQ.Stream.Client/Client.cs 89.76% <100.00%> (-1.03%) ⬇️
RabbitMQ.Stream.Client/ClientExceptions.cs 65.71% <100.00%> (+3.21%) ⬆️
...abbitMQ.Stream.Client/Reliable/ReliableConsumer.cs 100.00% <100.00%> (+1.31%) ⬆️
...abbitMQ.Stream.Client/Reliable/ReliableProducer.cs 86.50% <100.00%> (-0.41%) ⬇️
RabbitMQ.Stream.Client/CreditResponse.cs 0.00% <0.00%> (-85.00%) ⬇️
... and 8 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@Gsantomaggio Gsantomaggio marked this pull request as ready for review September 9, 2022 09:48
@Gsantomaggio Gsantomaggio added this to the 1.0.0-rc.6 milestone Sep 10, 2022
@Gsantomaggio Gsantomaggio force-pushed the fix_reconnect_leader_down branch from 3b9c9bd to da7321c Compare September 12, 2022 14:34
Fixes: #150
Reconnect ReliableProducer and/or ReliableConsumer in case the leader is down and if the Stream changes the topology.
Refactor the Reliable part

- Reduce the code to the consumer/producer part
- Add a base record for the configuration
- Add CreateNewEntity as abstract method
- Add tests for known and unknown exception
- Document methods

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio force-pushed the fix_reconnect_leader_down branch from da7321c to a454f5e Compare September 12, 2022 14:37
Copy link
Contributor

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I see the correct behavior. I do see a reconnection being made, but the "Stream" page in the management UI only shows one entry, even after I restart the node I stopped:

image

In addition, the page starts returning a 500 error (not good), and I see some log messages like this which seem fishy:

docker-node0-1  | 2022-09-12 20:53:47.105297+00:00 [debug] <0.611.0> Marking RabbitMQ as running
docker-node0-1  | 2022-09-12 20:53:47.105320+00:00 [debug] <0.611.0> Change boot state to `ready`
docker-node0-1  | 2022-09-12 20:53:47.105401+00:00 [debug] <0.133.0> Boot state/systemd: notifying of state `ready`
docker-node0-1  | 2022-09-12 20:53:47.230426+00:00 [debug] <0.9.0> Time to start RabbitMQ: 28938365 us
docker-node0-1  | 2022-09-12 20:53:47.426391+00:00 [debug] <0.554.0> rabbit_stream_coordinator: pre_vote election called for in term 1
docker-node0-1  | 2022-09-12 20:53:47.426585+00:00 [debug] <0.554.0> rabbit_stream_coordinator: follower -> pre_vote in term: 1 machine version: 3
docker-node0-1  | 2022-09-12 20:53:47.426747+00:00 [debug] <0.554.0> rabbit_stream_coordinator: pre_vote granted #Ref<0.2376076381.66060290.251237> for term 1 votes 1
docker-node1-1  | 2022-09-12 20:53:47.427289+00:00 [debug] <0.1019.0> rabbit_stream_coordinator declining pre-vote to {rabbit_stream_coordinator,rabbit@node0} for term 1, current term 2
docker-node0-1  | 2022-09-12 20:53:47.434834+00:00 [debug] <0.554.0> rabbit_stream_coordinator: pre_vote -> follower in term: 2 machine version: 3
docker-node0-1  | 2022-09-12 20:53:47.435044+00:00 [debug] <0.554.0> rabbit_stream_coordinator: is not new, setting election timeout.
docker-node0-1  | 2022-09-12 20:53:47.435193+00:00 [info] <0.554.0> rabbit_stream_coordinator: follower did not have entry at 29 in 2. Requesting {rabbit_stream_coordinator,rabbit@node2} from 11
docker-node0-1  | 2022-09-12 20:53:47.435425+00:00 [info] <0.554.0> rabbit_stream_coordinator: detected a new leader {rabbit_stream_coordinator,rabbit@node2} in term 2
docker-node0-1  | 2022-09-12 20:53:47.435564+00:00 [debug] <0.554.0> rabbit_stream_coordinator: follower -> await_condition in term: 2 machine version: 3
docker-node2-1  | 2022-09-12 20:53:47.435878+00:00 [debug] <0.1364.0> rabbit_stream_coordinator: setting last index to 10,  next_index 11 for {rabbit_stream_coordinator,rabbit@node0}
docker-node0-1  | 2022-09-12 20:53:47.437594+00:00 [debug] <0.554.0> rabbit_stream_coordinator: await_condition -> follower in term: 2 machine version: 3
docker-node0-1  | 2022-09-12 20:53:47.437785+00:00 [debug] <0.554.0> rabbit_stream_coordinator: is not new, setting election timeout.
docker-node0-1  | 2022-09-12 20:53:47.438991+00:00 [debug] <0.554.0> rabbit_stream_coordinator: enabling ra cluster changes in 2
docker-node0-1  | 2022-09-12 20:53:47.439137+00:00 [debug] <0.554.0> rabbit_stream_coordinator: unknown command {nodedown,rabbit@node0}
docker-node1-1  | 2022-09-12 20:54:18.814975+00:00 [debug] <0.1232.0> User 'guest' authenticated successfully by backend rabbit_auth_backend_internal
docker-node1-1  | 2022-09-12 20:54:18.836385+00:00 [debug] <0.1234.0> User 'guest' authenticated successfully by backend rabbit_auth_ba

unknown command makes me nervous!

This repo contains a docker compose environment to bring up a cluster, and a version of your test program -

https://github.com/lukebakken/rabbitmq-stream-dotnet-client-163

In order to use a locally built NuGet package, here's what I did:

mkdir ~/development/nuget-packages
cd ~/development/rabbitmq/rabbitmq-stream-dotnet-client
git pull && git checkout fix_reconnect_leader_down
make build
dotnet nuget push --source ~/development/nuget-packages ./packages/RabbitMQ.Stream.Client.1.0.0-rc.5.2.nupkg
cd ~/development/lukebakken/rabbitmq-stream-dotnet-client-163/test
git checkout lukebakken/local-nupkg
dotnet add package --prerelease --source ~/development/nuget-packages RabbitMQ.Stream.Client

@Gsantomaggio
Copy link
Member Author

I think the problem is that you are using only two nodes; with one node, it doesn't reach the quorum. I will investigate

@lukebakken
Copy link
Contributor

lukebakken commented Sep 13, 2022

I'm starting out with three nodes, then stopping one, then re-starting it. There are three nodes in that log output 😄

  • docker-node0-1
  • docker-node1-1
  • docker-node2-1

Copy link
Contributor

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @Gsantomaggio

@lukebakken lukebakken merged commit 36fa175 into main Sep 13, 2022
@lukebakken lukebakken deleted the fix_reconnect_leader_down branch September 13, 2022 16:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

ReliableProducer not reconnecting after leader goes down

4 participants