Skip to content

Commit 564ef92

Browse files
committed
Add ICrc32 interface
If a library user wishes to have Crc32 checking, they must implement the `ICrc32` interface and set an implementation on the configuration. References: * #19 * #285
1 parent c819931 commit 564ef92

13 files changed

+55
-22
lines changed

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,5 @@ public ushort InitialCredits
6969
// enables the check of the crc on the delivery.
7070
// the server will send the crc for each chunk and the client will check it.
7171
// It is not enabled by default because it is could reduce the performance.
72-
public bool CheckCrcOnDelivery { get; set; } = false;
72+
public ICrc32 Crc32 { get; set; } = null;
7373
}

RabbitMQ.Stream.Client/ICrc32.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
namespace RabbitMQ.Stream.Client
6+
{
7+
/// <summary>
8+
/// ICrc32
9+
/// </summary>
10+
public interface ICrc32
11+
{
12+
byte[] Hash(byte[] data);
13+
}
14+
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
2323
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
2424
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
2525
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
26-
RabbitMQ.Stream.Client.IConsumerConfig.CheckCrcOnDelivery.get -> bool
27-
RabbitMQ.Stream.Client.IConsumerConfig.CheckCrcOnDelivery.set -> void
26+
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
27+
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
2828
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
2929
RabbitMQ.Stream.Client.CommandVersions
3030
RabbitMQ.Stream.Client.CommandVersions.Command.get -> ushort
@@ -50,6 +50,8 @@ RabbitMQ.Stream.Client.ICommandVersions.MinVersion.get -> ushort
5050
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.set -> void
5151
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
5252
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
53+
RabbitMQ.Stream.Client.ICrc32
54+
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
5355
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
5456
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
5557
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
@@ -69,8 +71,8 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
6971
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
7072
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
7173
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
72-
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.CheckCrcOnDelivery.get -> bool
73-
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.CheckCrcOnDelivery.set -> void
74+
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
75+
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
7476
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
7577
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.set -> void
7678
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort

RabbitMQ.Stream.Client/RabbitMQ.Stream.Client.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
4646
<PrivateAssets>all</PrivateAssets>
4747
</PackageReference>
48-
<PackageReference Include="System.IO.Hashing" />
4948
<PackageReference Include="System.IO.Pipelines" />
5049
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
5150
</ItemGroup>

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
using System.Buffers;
77
using System.Collections.Generic;
88
using System.Diagnostics;
9-
using System.IO.Hashing;
109
using System.Runtime.CompilerServices;
1110
using System.Threading.Channels;
1211
using System.Threading.Tasks;
@@ -467,10 +466,10 @@ private async Task Init()
467466
if (Token.IsCancellationRequested)
468467
return;
469468

470-
if (_config.CheckCrcOnDelivery)
469+
if (_config.Crc32 is not null)
471470
{
472471
var crcCalculated = BitConverter.ToUInt32(
473-
Crc32.Hash(deliver.Chunk.Data.ToArray())
472+
_config.Crc32.Hash(deliver.Chunk.Data.ToArray())
474473
);
475474
if (crcCalculated != deliver.Chunk.Crc)
476475
{

RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
7070
IsSingleActiveConsumer = _config.IsSingleActiveConsumer,
7171
ConsumerUpdateListener = _config.ConsumerUpdateListener,
7272
ConsumerFilter = _config.ConsumerFilter,
73-
CheckCrcOnDelivery = _config.CheckCrcOnDelivery,
73+
Crc32 = _config.Crc32,
7474
ConnectionClosedHandler = async (string s) =>
7575
{
7676
// if the stream is still in the consumer list

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ public record ConsumerConfig : ReliableConfig
113113
public ConsumerFilter Filter { get; set; } = null;
114114

115115
/// <summary>
116-
/// CheckCrcOnDelivery enables the check of the crc on the delivery.
117-
/// the server will send the crc for each chunk and the client will check it.
116+
/// Eenables the check of the crc on the delivery when set to an implementation
117+
/// of <see cref="ICrc32"><code>ICrc32</code></see>.
118+
/// >he server will send the crc for each chunk and the client will check it.
118119
/// It is not enabled by default because it is could reduce the performance.
119-
/// Default value is false.
120120
/// </summary>
121-
public bool CheckCrcOnDelivery { get; set; } = false;
121+
public ICrc32 Crc32 { get; set; } = null;
122122

123123
public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
124124
{

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
5151
InitialCredits = _consumerConfig.InitialCredits,
5252
OffsetSpec = offsetSpec,
5353
ConsumerFilter = _consumerConfig.Filter,
54-
CheckCrcOnDelivery = _consumerConfig.CheckCrcOnDelivery,
54+
Crc32 = _consumerConfig.Crc32,
5555
ConnectionClosedHandler = async _ =>
5656
{
5757
await TryToReconnect(_consumerConfig.ReconnectStrategy).ConfigureAwait(false);
@@ -111,7 +111,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
111111
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
112112
InitialCredits = _consumerConfig.InitialCredits,
113113
ConsumerFilter = _consumerConfig.Filter,
114-
CheckCrcOnDelivery = _consumerConfig.CheckCrcOnDelivery,
114+
Crc32 = _consumerConfig.Crc32,
115115
OffsetSpec = offsetSpecs,
116116
MessageHandler = async (stream, consumer, ctx, message) =>
117117
{

Tests/Crc32.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
using RabbitMQ.Stream.Client;
6+
7+
namespace Tests;
8+
9+
public class Crc32 : ICrc32
10+
{
11+
public byte[] Hash(byte[] data)
12+
{
13+
return System.IO.Hashing.Crc32.Hash(data);
14+
}
15+
}

Tests/RawConsumerSystemTests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace Tests
1919
{
2020
public class ConsumerSystemTests
2121
{
22+
private readonly ICrc32 _crc32 = new Crc32();
2223
private readonly ITestOutputHelper testOutputHelper;
2324

2425
public ConsumerSystemTests(ITestOutputHelper testOutputHelper)
@@ -416,7 +417,7 @@ public async void Amqp091MessagesConsumer()
416417
var rawConsumer = await system.CreateRawConsumer(
417418
new RawConsumerConfig(stream)
418419
{
419-
CheckCrcOnDelivery = true,
420+
Crc32 = _crc32,
420421
Reference = "consumer",
421422
MessageHandler = async (consumer, ctx, message) =>
422423
{
@@ -463,7 +464,7 @@ public async void ConsumerQueryOffset()
463464
var rawConsumer = await system.CreateRawConsumer(
464465
new RawConsumerConfig(stream)
465466
{
466-
CheckCrcOnDelivery = true,
467+
Crc32 = _crc32,
467468
Reference = reference,
468469
OffsetSpec = new OffsetTypeOffset(),
469470
MessageHandler = async (consumer, ctx, message) =>
@@ -534,7 +535,7 @@ public async void ShouldConsumeFromStoredOffset()
534535
var rawConsumer = await system.CreateRawConsumer(
535536
new RawConsumerConfig(stream)
536537
{
537-
CheckCrcOnDelivery = false,
538+
Crc32 = _crc32,
538539
Reference = Reference,
539540
OffsetSpec = new OffsetTypeOffset(),
540541
MessageHandler = async (consumer, ctx, _) =>

0 commit comments

Comments
 (0)