Skip to content

Commit dbaad59

Browse files
authored
Blocking raw message reader (#5)
* Subscribing with a blocking reader belongs to Messaging package. Kafka implements reader * Renamed raw message reader and options * renamed options * Kafka testing * Test BlockingReaderRawMessageHandlerSubscriber
1 parent adc556b commit dbaad59

34 files changed

+1238
-209
lines changed
File renamed without changes.

codecov.yml renamed to .codecov.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ coverage:
33
status:
44
project:
55
default:
6-
enabled: yes
6+
enabled: yes
7+
target: 75%

Messaging.sln

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messaging.DependencyInjecti
8282
EndProject
8383
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messaging.Testing", "test\Messaging.Testing\Messaging.Testing.csproj", "{CB19D65C-571C-41D3-828C-0BCC86DAC252}"
8484
EndProject
85-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messaging.DependencyInjection.Serialization.MessagePack", "src\Messaging.DependencyInjection.Serialization.MessagePack\Messaging.DependencyInjection.Serialization.MessagePack.csproj", "{9E9BE3CF-619F-497E-8046-7401CB5E4CC8}"
85+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messaging.DependencyInjection.Serialization.MessagePack", "src\Messaging.DependencyInjection.Serialization.MessagePack\Messaging.DependencyInjection.Serialization.MessagePack.csproj", "{145DBF38-0B3A-451D-9491-8E2DE28C0440}"
8686
EndProject
87-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Serialization.MessagePack", "src\Serialization.MessagePack\Serialization.MessagePack.csproj", "{1A763FFD-03D7-4CEE-85CD-91A07CF1E056}"
87+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Serialization.MessagePack", "src\Serialization.MessagePack\Serialization.MessagePack.csproj", "{B2B223DB-5BB9-4335-B5D8-81A25CBECA25}"
8888
EndProject
89-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Messaging.DependencyInjection.Serialization.MessagePack.Tests", "test\Messaging.DependencyInjection.Serialization.MessagePack.Tests\Messaging.DependencyInjection.Serialization.MessagePack.Tests.csproj", "{11C133C7-FAA1-4650-9801-4EBF4A90E82D}"
89+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Messaging.DependencyInjection.Serialization.MessagePack.Tests", "test\Messaging.DependencyInjection.Serialization.MessagePack.Tests\Messaging.DependencyInjection.Serialization.MessagePack.Tests.csproj", "{184E2B8F-A6D0-48B1-AD81-B0F0A66F06E0}"
9090
EndProject
91-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Serialization.MessagePack.Tests", "test\Serialization.MessagePack.Tests\Serialization.MessagePack.Tests.csproj", "{B25FB4B9-AAD2-45E0-884F-61F86D2497AD}"
91+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Serialization.MessagePack.Tests", "test\Serialization.MessagePack.Tests\Serialization.MessagePack.Tests.csproj", "{216C7102-EF21-4F19-AB4B-781312C7FA2E}"
9292
EndProject
9393
Global
9494
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -220,22 +220,22 @@ Global
220220
{CB19D65C-571C-41D3-828C-0BCC86DAC252}.Debug|Any CPU.Build.0 = Debug|Any CPU
221221
{CB19D65C-571C-41D3-828C-0BCC86DAC252}.Release|Any CPU.ActiveCfg = Release|Any CPU
222222
{CB19D65C-571C-41D3-828C-0BCC86DAC252}.Release|Any CPU.Build.0 = Release|Any CPU
223-
{9E9BE3CF-619F-497E-8046-7401CB5E4CC8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
224-
{9E9BE3CF-619F-497E-8046-7401CB5E4CC8}.Debug|Any CPU.Build.0 = Debug|Any CPU
225-
{9E9BE3CF-619F-497E-8046-7401CB5E4CC8}.Release|Any CPU.ActiveCfg = Release|Any CPU
226-
{9E9BE3CF-619F-497E-8046-7401CB5E4CC8}.Release|Any CPU.Build.0 = Release|Any CPU
227-
{1A763FFD-03D7-4CEE-85CD-91A07CF1E056}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
228-
{1A763FFD-03D7-4CEE-85CD-91A07CF1E056}.Debug|Any CPU.Build.0 = Debug|Any CPU
229-
{1A763FFD-03D7-4CEE-85CD-91A07CF1E056}.Release|Any CPU.ActiveCfg = Release|Any CPU
230-
{1A763FFD-03D7-4CEE-85CD-91A07CF1E056}.Release|Any CPU.Build.0 = Release|Any CPU
231-
{11C133C7-FAA1-4650-9801-4EBF4A90E82D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
232-
{11C133C7-FAA1-4650-9801-4EBF4A90E82D}.Debug|Any CPU.Build.0 = Debug|Any CPU
233-
{11C133C7-FAA1-4650-9801-4EBF4A90E82D}.Release|Any CPU.ActiveCfg = Release|Any CPU
234-
{11C133C7-FAA1-4650-9801-4EBF4A90E82D}.Release|Any CPU.Build.0 = Release|Any CPU
235-
{B25FB4B9-AAD2-45E0-884F-61F86D2497AD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
236-
{B25FB4B9-AAD2-45E0-884F-61F86D2497AD}.Debug|Any CPU.Build.0 = Debug|Any CPU
237-
{B25FB4B9-AAD2-45E0-884F-61F86D2497AD}.Release|Any CPU.ActiveCfg = Release|Any CPU
238-
{B25FB4B9-AAD2-45E0-884F-61F86D2497AD}.Release|Any CPU.Build.0 = Release|Any CPU
223+
{145DBF38-0B3A-451D-9491-8E2DE28C0440}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
224+
{145DBF38-0B3A-451D-9491-8E2DE28C0440}.Debug|Any CPU.Build.0 = Debug|Any CPU
225+
{145DBF38-0B3A-451D-9491-8E2DE28C0440}.Release|Any CPU.ActiveCfg = Release|Any CPU
226+
{145DBF38-0B3A-451D-9491-8E2DE28C0440}.Release|Any CPU.Build.0 = Release|Any CPU
227+
{B2B223DB-5BB9-4335-B5D8-81A25CBECA25}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
228+
{B2B223DB-5BB9-4335-B5D8-81A25CBECA25}.Debug|Any CPU.Build.0 = Debug|Any CPU
229+
{B2B223DB-5BB9-4335-B5D8-81A25CBECA25}.Release|Any CPU.ActiveCfg = Release|Any CPU
230+
{B2B223DB-5BB9-4335-B5D8-81A25CBECA25}.Release|Any CPU.Build.0 = Release|Any CPU
231+
{184E2B8F-A6D0-48B1-AD81-B0F0A66F06E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
232+
{184E2B8F-A6D0-48B1-AD81-B0F0A66F06E0}.Debug|Any CPU.Build.0 = Debug|Any CPU
233+
{184E2B8F-A6D0-48B1-AD81-B0F0A66F06E0}.Release|Any CPU.ActiveCfg = Release|Any CPU
234+
{184E2B8F-A6D0-48B1-AD81-B0F0A66F06E0}.Release|Any CPU.Build.0 = Release|Any CPU
235+
{216C7102-EF21-4F19-AB4B-781312C7FA2E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
236+
{216C7102-EF21-4F19-AB4B-781312C7FA2E}.Debug|Any CPU.Build.0 = Debug|Any CPU
237+
{216C7102-EF21-4F19-AB4B-781312C7FA2E}.Release|Any CPU.ActiveCfg = Release|Any CPU
238+
{216C7102-EF21-4F19-AB4B-781312C7FA2E}.Release|Any CPU.Build.0 = Release|Any CPU
239239
EndGlobalSection
240240
GlobalSection(SolutionProperties) = preSolution
241241
HideSolutionNode = FALSE
@@ -272,10 +272,10 @@ Global
272272
{616376C9-DFCA-460C-9D9E-F49C21152280} = {454C1E74-61F0-4E54-9ABA-9BD1711B5A83}
273273
{C407716D-DE61-4656-9D4B-A6CE3CCAF1EF} = {FD29A32D-EE38-4C99-B172-8F830AEE3E07}
274274
{CB19D65C-571C-41D3-828C-0BCC86DAC252} = {454C1E74-61F0-4E54-9ABA-9BD1711B5A83}
275-
{9E9BE3CF-619F-497E-8046-7401CB5E4CC8} = {FD29A32D-EE38-4C99-B172-8F830AEE3E07}
276-
{1A763FFD-03D7-4CEE-85CD-91A07CF1E056} = {FD29A32D-EE38-4C99-B172-8F830AEE3E07}
277-
{11C133C7-FAA1-4650-9801-4EBF4A90E82D} = {454C1E74-61F0-4E54-9ABA-9BD1711B5A83}
278-
{B25FB4B9-AAD2-45E0-884F-61F86D2497AD} = {454C1E74-61F0-4E54-9ABA-9BD1711B5A83}
275+
{145DBF38-0B3A-451D-9491-8E2DE28C0440} = {FD29A32D-EE38-4C99-B172-8F830AEE3E07}
276+
{B2B223DB-5BB9-4335-B5D8-81A25CBECA25} = {FD29A32D-EE38-4C99-B172-8F830AEE3E07}
277+
{184E2B8F-A6D0-48B1-AD81-B0F0A66F06E0} = {454C1E74-61F0-4E54-9ABA-9BD1711B5A83}
278+
{216C7102-EF21-4F19-AB4B-781312C7FA2E} = {454C1E74-61F0-4E54-9ABA-9BD1711B5A83}
279279
EndGlobalSection
280280
GlobalSection(ExtensibilityGlobals) = postSolution
281281
SolutionGuid = {ECC8E5D6-3F47-4B40-A007-E12C770C9F45}

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,19 @@ Example serialization setup:
8282
```csharp
8383
services.AddMessaging(builder =>
8484
{
85-
builder.AddProtoBuf();
85+
builder.AddMessagePack();
8686
// or
87-
builder.AddXml();
87+
builder.AddProtoBuf();
8888
// or
8989
builder.AddJson();
90+
// or
91+
builder.AddXml();
9092
});
9193
```
9294

9395
Each implementation has some additional settings
9496

95-
#### MessagePack
97+
##### MessagePack
9698

9799
Define a custom IFormatterResolver and compressiong LZ4:
98100

src/Messaging.DependencyInjection.Kafka/KafkaMessagingBuilderExtensions.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using Messaging;
23
using Messaging.DependencyInjection;
34
using Messaging.Kafka;
45
using Microsoft.Extensions.DependencyInjection.Extensions;
@@ -79,7 +80,8 @@ public static MessagingBuilder AddKafkaPublisher(this MessagingBuilder builder)
7980
public static MessagingBuilder AddKafkaSubscriber(this MessagingBuilder builder)
8081
{
8182
builder.Services.TryAddSingleton(c => c.GetRequiredService<IOptions<KafkaOptions>>().Value);
82-
builder.AddRawMessageHandlerSubscriber<KafkaRawMessageHandlerSubscriber>();
83+
builder.Services.TryAddSingleton<IBlockingRawMessageReaderFactory<KafkaOptions>, KafkaBlockingRawMessageReaderFactory>();
84+
builder.AddRawMessageHandlerSubscriber<BlockingReaderRawMessageHandlerSubscriber<KafkaOptions>>();
8385
return builder;
8486
}
8587
}

src/Messaging.DependencyInjection.Serialization.MessagePack/MessagePackMessagingBuilderExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
namespace Microsoft.Extensions.DependencyInjection
88
{
99
/// <summary>
10-
/// Extensions to <see cref="MessageBuilder"/> to add <see cref="MessagePackSerializer"/>
10+
/// Extensions to <see cref="MessagingBuilder"/> to add <see cref="MessagePackSerializer"/>
1111
/// </summary>
1212
public static class MessagePackMessagingBuilderExtensions
1313
{

src/Messaging.Kafka/IKafkaConsumer.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using Confluent.Kafka;
3+
4+
namespace Messaging.Kafka
5+
{
6+
internal interface IKafkaConsumer : IDisposable
7+
{
8+
Consumer<Null, byte[]> KafkaConsumer { get; }
9+
bool Consume(out Message<Null, byte[]> message, TimeSpan timeout);
10+
void Subscribe(string topic);
11+
}
12+
}

src/Messaging.Kafka/IKafkaProducer.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Confluent.Kafka;
4+
5+
namespace Messaging.Kafka
6+
{
7+
internal interface IKafkaProducer: IDisposable
8+
{
9+
Task<Message<Null, byte[]>> ProduceAsync(string topic, Null key, byte[] val);
10+
int Flush(TimeSpan timeout);
11+
Producer<Null, byte[]> KafkaProducer { get; }
12+
}
13+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
3+
namespace Messaging.Kafka
4+
{
5+
/// <summary>
6+
/// An Apache Kafka implementation of <see cref="T:Messaging.IBlockingRawMessageReader`1" />
7+
/// </summary>
8+
/// <inheritdoc cref="IBlockingRawMessageReader{KafkaOptions}" />
9+
/// <inheritdoc cref="IDisposable" />
10+
public class KafkaBlockingRawMessageReader : IBlockingRawMessageReader<KafkaOptions>, IDisposable
11+
{
12+
private readonly IKafkaConsumer _consumer;
13+
14+
/// <summary>
15+
/// Creates an new instance of <see cref="KafkaBlockingRawMessageReader"/>
16+
/// </summary>
17+
/// <param name="consumer"></param>
18+
internal KafkaBlockingRawMessageReader(IKafkaConsumer consumer) =>
19+
_consumer = consumer ?? throw new ArgumentNullException(nameof(consumer));
20+
21+
/// <summary>
22+
/// Tries to read a message from the inner Confluent.Consumer implementation
23+
/// </summary>
24+
/// <param name="message">The message read if true was returned</param>
25+
/// <param name="options">Kafka options</param>
26+
/// <returns></returns>
27+
/// <inheritdoc />
28+
public bool TryGetMessage(out byte[] message, KafkaOptions options)
29+
{
30+
if (options == null) throw new ArgumentNullException(nameof(options));
31+
32+
var read = _consumer.Consume(out var kafkaMessage, options.Subscriber.ConsumeTimeout);
33+
message = read ? kafkaMessage.Value : null;
34+
return read;
35+
}
36+
37+
/// <inheritdoc />
38+
public void Dispose()
39+
{
40+
_consumer.Dispose();
41+
}
42+
}
43+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using Confluent.Kafka;
3+
using Confluent.Kafka.Serialization;
4+
5+
namespace Messaging.Kafka
6+
{
7+
/// <summary>
8+
/// Creates instances of <see cref="T:Messaging.Kafka.KafkaBlockingRawMessageReader" /> which are already subscribed to the specified topic
9+
/// </summary>
10+
/// <inheritdoc />
11+
public class KafkaBlockingRawMessageReaderFactory : IBlockingRawMessageReaderFactory<KafkaOptions>
12+
{
13+
private static readonly ByteArrayDeserializer Deserializer = new ByteArrayDeserializer();
14+
15+
/// <summary>
16+
/// Creates a new <see cref="T:Messaging.Kafka.KafkaBlockingRawMessageReader" /> subscribed already to the specified <param name="topic" />
17+
/// </summary>
18+
/// <param name="topic">The topic to subscribe the reader</param>
19+
/// <param name="options">Kafka Options</param>
20+
/// <returns><see cref="T:Messaging.Kafka.KafkaBlockingRawMessageReader" /> subscribed to <param name="topic" /></returns>
21+
/// <inheritdoc />
22+
public IBlockingRawMessageReader<KafkaOptions> Create(string topic, KafkaOptions options)
23+
{
24+
IKafkaConsumer ConsumerFunc() =>
25+
new KafkaConsumerAdapter(new Consumer<Null, byte[]>(options.Properties, null, Deserializer));
26+
27+
return Create(ConsumerFunc, topic, options);
28+
}
29+
30+
internal IBlockingRawMessageReader<KafkaOptions> Create(
31+
Func<IKafkaConsumer> consumerFunc,
32+
string topic,
33+
KafkaOptions options)
34+
{
35+
if (topic == null) throw new ArgumentNullException(nameof(topic));
36+
if (options == null) throw new ArgumentNullException(nameof(options));
37+
38+
var consumer = consumerFunc();
39+
try
40+
{
41+
options.Subscriber.ConsumerCreatedCallback?.Invoke(consumer.KafkaConsumer);
42+
consumer.Subscribe(topic);
43+
}
44+
catch
45+
{
46+
consumer.Dispose();
47+
throw;
48+
}
49+
50+
return new KafkaBlockingRawMessageReader(consumer);
51+
}
52+
53+
private class ByteArrayDeserializer : IDeserializer<byte[]>
54+
{
55+
public byte[] Deserialize(byte[] data) => data;
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)