Skip to content

Conversation

@Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Jul 19, 2022

Work on this feature with @Zerpet

Signed-off-by: Gabriele Santomaggio G.santomaggio@gmail.com

Closes #146

Note: SacTests will fail until the 3.11 is released

@Gsantomaggio Gsantomaggio marked this pull request as draft July 19, 2022 13:55
@codecov
Copy link

codecov bot commented Jul 19, 2022

Codecov Report

Base: 92.45% // Head: 92.42% // Decreases project coverage by -0.03% ⚠️

Coverage data is based on head (7a1b408) compared to base (dae7e12).
Patch coverage: 92.61% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #152      +/-   ##
==========================================
- Coverage   92.45%   92.42%   -0.04%     
==========================================
  Files          78       89      +11     
  Lines        6220     7138     +918     
  Branches      385      445      +60     
==========================================
+ Hits         5751     6597     +846     
- Misses        380      439      +59     
- Partials       89      102      +13     
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/Reliable/ReliableBase.cs 100.00% <ø> (ø)
RabbitMQ.Stream.Client/Hash/Extensions.cs 31.25% <31.25%> (ø)
Tests/Utils.cs 72.82% <58.33%> (-4.54%) ⬇️
RabbitMQ.Stream.Client/Producer.cs 79.31% <75.00%> (+1.36%) ⬆️
RabbitMQ.Stream.Client/StreamSystem.cs 89.37% <75.86%> (-2.39%) ⬇️
...bitMQ.Stream.Client/ConsumerUpdateQueryResponse.cs 85.00% <85.00%> (ø)
RabbitMQ.Stream.Client/Subscribe.cs 78.94% <85.71%> (+13.72%) ⬆️
RabbitMQ.Stream.Client/Consumer.cs 90.47% <88.37%> (-1.42%) ⬇️
RabbitMQ.Stream.Client/Client.cs 90.84% <90.00%> (-1.03%) ⬇️
RabbitMQ.Stream.Client/PartitionsQueryResponse.cs 91.30% <91.30%> (ø)
... and 18 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>

Single Active Consumer implementation
@Gsantomaggio Gsantomaggio force-pushed the sigle-active-consumer branch from aa6c7eb to a59f131 Compare July 28, 2022 15:10
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@lukebakken lukebakken added this to the 1.0.0-rc.5 milestone Aug 3, 2022
@lukebakken lukebakken self-requested a review August 3, 2022 22:58
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio
Copy link
Member Author

We need to add tests for ReliableConsumer and some Example

@lukebakken lukebakken modified the milestones: 1.0.0-rc.5, 1.0.0-rc.6 Sep 6, 2022
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio changed the title Work in progress Single Active Consumer Single Active Consumer Sep 30, 2022
@Gsantomaggio Gsantomaggio marked this pull request as ready for review September 30, 2022 15:34
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio
Copy link
Member Author

How to test the standard consumers:

public static async Task Start()
    {
        var config = new StreamSystemConfig();
        var system = await StreamSystem.Create(config);
        var streamName = Guid.NewGuid().ToString();
        await system.CreateStream(new StreamSpec(streamName));
        
        // The application name must be the same for all consumers
        const string applicationName = "myApplication";
        
        
        var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
        for (var i = 0; i < 100; i++)
        {
            var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
            await producer.Send(Convert.ToUInt64(i), message);
        }


        var c1 = await system.CreateConsumer(new ConsumerConfig()
        {
            Stream = streamName,
            Reference = applicationName,
            // First consumer, it will use OffsetTypeFirst
            OffsetSpec = new OffsetTypeFirst(),
            // Single Active Consumer Active is mandatory
            IsSingleActiveConsumer = true,
            MessageHandler = async (c, context, arg3) =>
            {
                await c.StoreOffset(context.Offset);
                Console.WriteLine(
                    $" first received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });
        // ONLY the first consumer c1 is active and will receive messages

        var c2 = await system.CreateConsumer(new ConsumerConfig()
        {
            Stream = streamName,
            Reference = applicationName,
            IsSingleActiveConsumer = true,
            // ConsumerUpdateListener is implemented
            // so the client will use ConsumerUpdateListener to start teh offset

            ConsumerUpdateListener = async (reference, stream, isActive) =>
            {
                Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
                var o = await system.QueryOffset(applicationName, stream);
                return new OffsetTypeOffset(o);
            },
            MessageHandler = async (c, context, arg3) =>
            {
                await c.StoreOffset(context.Offset);
                Console.WriteLine(
                    $" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });

        // When the first consumer is closed, the second consumer will become active
        // in this case the second consumer will start from the last offset: stem.QueryOffset(applicationName, stream)
        
        Console.WriteLine("Press any key to close the consumer 1");
        Console.ReadKey();
        await c1.Close();
        // c2 is now active
        Thread.Sleep(3_000);
        await system.DeleteStream(streamName);
    }

RSingleActiveConsumer:

  public static async Task Start()
    {
        var config = new StreamSystemConfig();
        var system = await StreamSystem.Create(config);
        var streamName = Guid.NewGuid().ToString();
        await system.CreateStream(new StreamSpec(streamName));
        const string applicationName = "myApplication";
        var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
        for (var i = 0; i < 100; i++)
        {
            var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
            await producer.Send(Convert.ToUInt64(i), message);
        }

        var c1 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
        {
            StreamSystem = system,
            Stream = streamName,
            Reference = applicationName,
            ClientProvidedName = "first",
            OffsetSpec = new OffsetTypeFirst(),
            // IsSingleActiveConsumer = true,
            MessageHandler = async (c, context, arg3) =>
            {
                Thread.Sleep(1);
                await c.StoreOffset(context.Offset);
                Console.WriteLine(
                    $"first Consumer received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });

        var c2 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
        {
            StreamSystem = system,
            Stream = streamName,
            Reference = applicationName,
            ClientProvidedName = "second",
            IsSingleActiveConsumer = true,
            OffsetSpec = new OffsetTypeFirst(),
            ConsumerUpdateListener = async (reference, stream, isActive) =>
            {
                Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
                var o = await system.QueryOffset(applicationName, streamName);
                return new OffsetTypeOffset(o);
            },
            MessageHandler = async (c, context, arg3) =>
            {

                await c.StoreOffset(context.Offset);
                Console.WriteLine($" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
            }
        });

        try
        {
            Console.ReadKey();
            await c1.Close();

            Thread.Sleep(2000);

            await c2.Close();
            Console.WriteLine("closing");
            Thread.Sleep(10);

            await c2.Close();
            await system.DeleteStream(streamName);
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
    }

@Gsantomaggio Gsantomaggio requested a review from Zerpet October 3, 2022 08:28
- Closes #167
- Refactor the producer classes; now, there is a new  Interface: IProducer . Standard Producer and Super Stream producer implement the interface
- IProducer introduces breaking changes since the Producer, and SuperStream producers return the interface
- Add a new Class SuperStreamProducer to handle the SuperStream
- Implement Hash Routing Murmur Strategy as default  


Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio merged commit c3151db into main Oct 4, 2022
@Gsantomaggio Gsantomaggio deleted the sigle-active-consumer branch October 4, 2022 09:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature: Single-Active Consumer feature

4 participants