-
Notifications
You must be signed in to change notification settings - Fork 49
Single Active Consumer #152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportBase: 92.45% // Head: 92.42% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## main #152 +/- ##
==========================================
- Coverage 92.45% 92.42% -0.04%
==========================================
Files 78 89 +11
Lines 6220 7138 +918
Branches 385 445 +60
==========================================
+ Hits 5751 6597 +846
- Misses 380 439 +59
- Partials 89 102 +13
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Single Active Consumer implementation
aa6c7eb to
a59f131
Compare
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
|
We need to add tests for |
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
How to test the standard consumers:public static async Task Start()
{
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
var streamName = Guid.NewGuid().ToString();
await system.CreateStream(new StreamSpec(streamName));
// The application name must be the same for all consumers
const string applicationName = "myApplication";
var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
for (var i = 0; i < 100; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(Convert.ToUInt64(i), message);
}
var c1 = await system.CreateConsumer(new ConsumerConfig()
{
Stream = streamName,
Reference = applicationName,
// First consumer, it will use OffsetTypeFirst
OffsetSpec = new OffsetTypeFirst(),
// Single Active Consumer Active is mandatory
IsSingleActiveConsumer = true,
MessageHandler = async (c, context, arg3) =>
{
await c.StoreOffset(context.Offset);
Console.WriteLine(
$" first received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
// ONLY the first consumer c1 is active and will receive messages
var c2 = await system.CreateConsumer(new ConsumerConfig()
{
Stream = streamName,
Reference = applicationName,
IsSingleActiveConsumer = true,
// ConsumerUpdateListener is implemented
// so the client will use ConsumerUpdateListener to start teh offset
ConsumerUpdateListener = async (reference, stream, isActive) =>
{
Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
var o = await system.QueryOffset(applicationName, stream);
return new OffsetTypeOffset(o);
},
MessageHandler = async (c, context, arg3) =>
{
await c.StoreOffset(context.Offset);
Console.WriteLine(
$" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
// When the first consumer is closed, the second consumer will become active
// in this case the second consumer will start from the last offset: stem.QueryOffset(applicationName, stream)
Console.WriteLine("Press any key to close the consumer 1");
Console.ReadKey();
await c1.Close();
// c2 is now active
Thread.Sleep(3_000);
await system.DeleteStream(streamName);
}RSingleActiveConsumer: public static async Task Start()
{
var config = new StreamSystemConfig();
var system = await StreamSystem.Create(config);
var streamName = Guid.NewGuid().ToString();
await system.CreateStream(new StreamSpec(streamName));
const string applicationName = "myApplication";
var producer = await system.CreateProducer(new ProducerConfig() {Stream = streamName});
for (var i = 0; i < 100; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(Convert.ToUInt64(i), message);
}
var c1 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
{
StreamSystem = system,
Stream = streamName,
Reference = applicationName,
ClientProvidedName = "first",
OffsetSpec = new OffsetTypeFirst(),
// IsSingleActiveConsumer = true,
MessageHandler = async (c, context, arg3) =>
{
Thread.Sleep(1);
await c.StoreOffset(context.Offset);
Console.WriteLine(
$"first Consumer received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
var c2 = await ReliableConsumer.CreateReliableConsumer(new ReliableConsumerConfig()
{
StreamSystem = system,
Stream = streamName,
Reference = applicationName,
ClientProvidedName = "second",
IsSingleActiveConsumer = true,
OffsetSpec = new OffsetTypeFirst(),
ConsumerUpdateListener = async (reference, stream, isActive) =>
{
Console.WriteLine($"{reference} is {(isActive ? "active" : "inactive")}");
var o = await system.QueryOffset(applicationName, streamName);
return new OffsetTypeOffset(o);
},
MessageHandler = async (c, context, arg3) =>
{
await c.StoreOffset(context.Offset);
Console.WriteLine($" second received message: {Encoding.Default.GetString(arg3.Data.Contents)} offset {context.Offset}");
}
});
try
{
Console.ReadKey();
await c1.Close();
Thread.Sleep(2000);
await c2.Close();
Console.WriteLine("closing");
Thread.Sleep(10);
await c2.Close();
await system.DeleteStream(streamName);
}
catch (Exception e)
{
Console.WriteLine(e);
}
} |
- Closes #167 - Refactor the producer classes; now, there is a new Interface: IProducer . Standard Producer and Super Stream producer implement the interface - IProducer introduces breaking changes since the Producer, and SuperStream producers return the interface - Add a new Class SuperStreamProducer to handle the SuperStream - Implement Hash Routing Murmur Strategy as default Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Work on this feature with @Zerpet
Signed-off-by: Gabriele Santomaggio G.santomaggio@gmail.com
Closes #146
Note:
SacTestswill fail until the 3.11 is released