Skip to content

BUG: Remove nullability from attribute properties #362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,33 +54,33 @@ public KafkaAttribute()
/// <summary>
/// Gets or sets the Maximum transmit message size. Default: 1MB
/// </summary>
public int? MaxMessageBytes { get; set; }
public int MaxMessageBytes { get; set; } = 1_000_000;

/// <summary>
/// Maximum number of messages batched in one MessageSet. default: 10000
/// </summary>
public int? BatchSize { get; set; }
public int BatchSize { get; set; } = 10_000;

/// <summary>
/// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false
/// </summary>
public bool? EnableIdempotence { get; set; }
public bool EnableIdempotence { get; set; } = false;

/// <summary>
/// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
/// </summary>
public int? MessageTimeoutMs { get; set; }
public int MessageTimeoutMs { get; set; } = 300_000;

/// <summary>
/// The ack timeout of the producer request in milliseconds. default: 5000
/// </summary>
public int? RequestTimeoutMs { get; set; }
public int RequestTimeoutMs { get; set; } = 5_000;

/// <summary>
/// How many times to retry sending a failing Message. **Note:** default: 2
/// How many times to retry sending a failing Message. **Note:** default: 2147483647
/// </summary>
/// <remarks>Retrying may cause reordering unless <c>EnableIdempotence</c> is set to <c>true</c>.</remarks>
public int? MaxRetries { get; set; }
public int MaxRetries { get; set; } = int.MaxValue;

/// <summary>
/// SASL mechanism to use for authentication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
MessageSendMaxRetries = entity.Attribute.MaxRetries,
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,
MessageMaxBytes = entity.Attribute.MaxMessageBytes,
SaslPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Password),
SaslUsername = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Username),
SslKeyLocation = resolvedSslKeyLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Extensions.Tests;
using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -631,7 +633,7 @@ public async Task Produce_And_Consume_With_Headers()
return eventData;
});

var output = await ProduceAndConsumeAsync(input);
var output = await ProduceAndConsumeAsync<KafkaOutputFunctionsForProduceAndConsume<KafkaEventData<string>>, KafkaTriggerForProduceAndConsume<KafkaEventData<string>>>(x => x.Produce(default), input);

foreach (var inputEvent in input)
{
Expand All @@ -653,7 +655,7 @@ public async Task Produce_And_Consume_Without_Headers()
Value = x.ToString()
});

var output = await ProduceAndConsumeAsync(input.ToArray());
var output = await ProduceAndConsumeAsync<KafkaOutputFunctionsForProduceAndConsume<KafkaEventData<string>>, KafkaTriggerForProduceAndConsume<KafkaEventData<string>>>(x => x.Produce(default), input.ToArray());

foreach (var inputEvent in input)
{
Expand All @@ -669,16 +671,34 @@ public async Task Produce_And_Consume_Without_Headers()

}

private async Task<List<KafkaEventData<string>>> ProduceAndConsumeAsync(IEnumerable<KafkaEventData<string>> events)
[Fact]
public async Task Produce_With_MaxMessageBytes_Should_Throw_Error_When_Message_Is_Too_Large()
{
var input = new[] {
new KafkaEventData<string>
{
Value = new string('a', 1000)
}
};

var ex = await Assert.ThrowsAsync<FunctionInvocationException>(async () => await ProduceAndConsumeAsync<KafkaOutputFunctionsForProduceAndConsume<KafkaEventData<string>>, KafkaTriggerForProduceAndConsume<KafkaEventData<string>>>(x => x.ProduceWithMaxMessageBytes1000(default), input));
Assert.NotNull(ex.InnerException);
Assert.Contains("too large", ex.InnerException.Message);
}

private async Task<List<KafkaEventData<string>>> ProduceAndConsumeAsync<TOutputFunction, TTriggerFunction>(
Expression<Func<TOutputFunction,Task>> outputFunction,
IEnumerable<KafkaEventData<string>> events)
{
var outputMethod = (outputFunction.Body as MethodCallExpression).Method;
var eventList = events.ToList();
foreach (var kafkaEvent in eventList)
{
kafkaEvent.Topic = Constants.StringTopicWithTenPartitionsName;
}
var eventCount = eventList.Count;
var output = new ConcurrentBag<KafkaEventData<string>>();
using (var host = await StartHostAsync(new[] { typeof(KafkaOutputFunctionsForProduceAndConsume<KafkaEventData<string>>), typeof(KafkaTriggerForProduceAndConsume<KafkaEventData<string>>) },
using (var host = await StartHostAsync(new[] { typeof(TOutputFunction), typeof(TTriggerFunction) },
serviceRegistrationCallback: s =>
{
s.AddSingleton(eventList);
Expand All @@ -687,9 +707,7 @@ private async Task<List<KafkaEventData<string>>> ProduceAndConsumeAsync(IEnumera
))
{
var jobHost = host.GetJobHost();
await jobHost.CallAsync(
typeof(KafkaOutputFunctionsForProduceAndConsume<KafkaEventData<string>>).GetMethod(nameof(KafkaOutputFunctionsForProduceAndConsume<KafkaEventData<string>>.Produce))
);
await jobHost.CallAsync(outputMethod);

await TestHelpers.Await(() =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,14 @@ public async Task Produce(
await output.AddAsync(message);
}
}

public async Task ProduceWithMaxMessageBytes1000(
[Kafka(BrokerList = "LocalBroker", MaxMessageBytes = 1000)] IAsyncCollector<T> output)
{
foreach (var message in testData)
{
await output.AddAsync(message);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Reflection;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests
{
public class KafkaAttributeTests
{
[Fact]
public void When_Nothing_Is_Set_It_Returns_Default_Values()
{
var attribute = typeof(TestClass)
.GetMethod(nameof(TestClass.FunctionWithDefaults))
.GetParameters()[0]
.GetCustomAttribute<KafkaAttribute>();

Assert.NotNull(attribute);

Assert.Equal(10_000, attribute.BatchSize);
Assert.Equal(false, attribute.EnableIdempotence);
Assert.Equal(1_000_000, attribute.MaxMessageBytes);
Assert.Equal(int.MaxValue, attribute.MaxRetries);
Assert.Equal(300_000, attribute.MessageTimeoutMs);
Assert.Equal(5_000, attribute.RequestTimeoutMs);
}

[Fact]
public void When_Everything_Is_Set_It_Returns_Set_Values()
{
var attribute = typeof(TestClass)
.GetMethod(nameof(TestClass.FunctionWithEverythingSet))
.GetParameters()[0]
.GetCustomAttribute<KafkaAttribute>();

Assert.NotNull(attribute);

Assert.Equal(BrokerAuthenticationMode.ScramSha512, attribute.AuthenticationMode);
Assert.Equal("AvroSchema", attribute.AvroSchema);
Assert.Equal("BrokerList", attribute.BrokerList);
Assert.Equal(1, attribute.BatchSize);
Assert.Equal(true, attribute.EnableIdempotence);
Assert.Equal(2, attribute.MaxMessageBytes);
Assert.Equal(3, attribute.MaxRetries);
Assert.Equal(4, attribute.MessageTimeoutMs);
Assert.Equal("Password", attribute.Password);
Assert.Equal(BrokerProtocol.SaslPlaintext, attribute.Protocol);
Assert.Equal(5, attribute.RequestTimeoutMs);
Assert.Equal("SslCaLocation", attribute.SslCaLocation);
Assert.Equal("SslCertificateLocation", attribute.SslCertificateLocation);
Assert.Equal("SslKeyLocation", attribute.SslKeyLocation);
Assert.Equal("SslKeyPassword", attribute.SslKeyPassword);
Assert.Equal("Username", attribute.Username);
}

private class TestClass
{

public void FunctionWithDefaults(
[Kafka("brokerList", "topic")]
string parameterWithDefaults)
{
}

public void FunctionWithEverythingSet(
[Kafka(
AuthenticationMode = BrokerAuthenticationMode.ScramSha512,
AvroSchema = "AvroSchema",
BrokerList = "BrokerList",
BatchSize = 1,
EnableIdempotence = true,
MaxMessageBytes = 2,
MaxRetries = 3,
MessageTimeoutMs = 4,
Password = "Password",
Protocol = BrokerProtocol.SaslPlaintext,
RequestTimeoutMs = 5,
SslCaLocation = "SslCaLocation",
SslCertificateLocation = "SslCertificateLocation",
SslKeyLocation = "SslKeyLocation",
SslKeyPassword = "SslKeyPassword",
Username = "Username"
)]
string parameterWithDefaults)
{
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;

using Avro.Generic;
using Confluent.Kafka;
Expand All @@ -14,7 +13,6 @@
using Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests.Helpers;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging.Abstractions;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests
Expand Down Expand Up @@ -169,7 +167,7 @@ public void When_Value_Type_Is_Protobuf_Should_Create_Protobuf_Listener()
}

[Fact]
public void GetProducerConfig_When_No_Auth_Defined_Should_Contain_Only_BrokerList()
public void GetProducerConfig_When_No_Auth_Defined_Should_Not_Contain_Auth_Settings()
{
var attribute = new KafkaAttribute("brokers:9092", "myTopic")
{
Expand All @@ -183,7 +181,10 @@ public void GetProducerConfig_When_No_Auth_Defined_Should_Contain_Only_BrokerLis

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Single(config);
Assert.Equal(0, config.Count(x=>x.Key.StartsWith("sasl.")));
Assert.Null(config.SaslMechanism);
Assert.Null(config.SaslPassword);
Assert.Null(config.SaslUsername);
Assert.Equal("brokers:9092", config.BootstrapServers);
}

Expand All @@ -206,7 +207,7 @@ public void GetProducerConfig_When_Auth_Defined_Should_Contain_Them()

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Equal(5, config.Count());
Assert.Equal(11, config.Count());
Assert.Equal("brokers:9092", config.BootstrapServers);
Assert.Equal(SecurityProtocol.SaslSsl, config.SecurityProtocol);
Assert.Equal(SaslMechanism.Plain, config.SaslMechanism);
Expand Down Expand Up @@ -234,7 +235,7 @@ public void GetProducerConfig_When_Ssl_Auth_Defined_Should_Contain_Them()

var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance);
var config = factory.GetProducerConfig(entity);
Assert.Equal(6, config.Count());
Assert.Equal(12, config.Count());
Assert.Equal("brokers:9092", config.BootstrapServers);
Assert.Equal(SecurityProtocol.Ssl, config.SecurityProtocol);
Assert.Equal("path/to/key", config.SslKeyLocation);
Expand Down