-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Kafka Component #951
Add Kafka Component #951
Conversation
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://localhost:29092,CONTROLLER://localhost:29093,PLAINTEXT_HOST://0.0.0.0:9092") | ||
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:{port}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do these ports come from? Is it worth leaving a comment about them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added comments and links. These ports are the defaults of the confluentinc/confluent-local
image which are set when the container starts if the env var is not supplied. See https://github.com/confluentinc/kafka-images/blob/master/local/include/etc/confluent/docker/configureDefaults
return Task.Factory.StartNew(() => | ||
{ | ||
consumer.Subscribe("topic"); | ||
while (!stoppingToken.IsCancellationRequested) | ||
{ | ||
var result = consumer.Consume(stoppingToken); | ||
logger.LogInformation($"Received message '{result.Message.Value}'."); | ||
} | ||
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API is very strange.. no async? Is it blocking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confluentic has bad API here. It prefetches messages (if available), but it will block if nothing is there. That would be a perfect fit for ValueTask
😉.
src/Components/Aspire.Kafka.Consumer/AspireKafkaConsumerExtensions.cs
Outdated
Show resolved
Hide resolved
AddKafkaConsumer(builder, $"{DefaultConfigSectionName}:{name}", configureConsumerBuilder, configureConsumerConfig, connectionName: name, serviceKey: name); | ||
} | ||
|
||
private static void AddKafkaConsumer<TKey, TValue>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any health checks, telemetry etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. I'm thinking to pull logs and metrics from librdkafka using the below delegates that can be registered within ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
SetLogHandler, SetErrorHandler, SetStatisticsHandler. The STATISTICS.json contains metrics.
3b05a47
to
f6a2a45
Compare
Overall, the app model side of this is looking pretty good. I think we need some end to end integration tests for this. |
d2d222a
to
9cbc7c2
Compare
Once health checks are implemented, please wire-up the Kafa component inside our smoke test (new):
|
14cc874
to
d79f45e
Compare
@mitchdenny Can you add |
I think we want a single component for kafka, not a package for the consumer and produce. Also, the component should be Aspire.Confluent.Kafka since that's the name of the underlying client library we're using. PS: People are out of vacation this week so it's slow 😄 |
Side comment: As additional components get developed/added, I wonder if we'll need an ingredients label so developers know which components (and which versions) do and do not have health checks, telemetry, etc. |
We probably shouldn't be merging it if it doesn't have telemetry, health-checks etc. In terms of dependency graph, I think we get that for free from NuGet so I don't think we need to do anything extra there. |
They should be in dotnet-public shortly. |
/azp run |
Commenter does not have sufficient privileges for PR 951 in repo dotnet/aspire |
9770d32
to
a75459e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add a README and ConfigurationSchema.json files
src/Components/Aspire.Confluent.Kafka/Aspire.Confluent.Kafka.csproj
Outdated
Show resolved
Hide resolved
src/Components/Aspire.Confluent.Kafka/AspireKafkaConsumerExtensions.cs
Outdated
Show resolved
Hide resolved
src/Components/Aspire.Confluent.Kafka/ConsumerConnectionFactory.cs
Outdated
Show resolved
Hide resolved
|
||
public static class Counters | ||
{ | ||
public const string ReplyQueue = "replyq"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these names follow the same naming patterns as https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/kafka.md?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not satisfied either by these names (basically same names as the ones from librdkafka statistics.json), but the naming pattern you linked seems to concern spans not metrics.
As these constants are used to expose specific metrics related to a given producer or consumer instance, I think we should instead follow a metric naming pattern.
edit: @eerhardt would prefixing these constants with messaging.kafka
be sufficient ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lmolkova you are needed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the time being I prefixed metrics with messaging.kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kafka metrics are in a weird place.
OTel is in the process of adding general messaging metrics into the spec - open-telemetry/semantic-conventions#163 (will be merged soon).
However Kafka as a project decided (AFAIK) to report a set of client metrics (KIP-714) that would be sent by default back to the broker and users would consume them from it. They decided not to follow otel semconv for the time being because of some back-compat reasons and limitations of existing metric impl on the broker side. Since they don't follow otel conventions, they also don't prefix with messaging.
namespace.
I have a couple of suggestion
-
let's get an opinion from Kafka folks to understand the bigger picture. @forlack @AndrewJSchofield what are your thoughts on Kafka integration component for .NET Aspire emitting it's own set of OTel-compatible metrics different from what's defined in KIP-714? Also, could you confirm if I remember the decision (above) correctly or maybe something has changed? I envision the same conversation in OTel instrumentation world in Java, Python and other languages.
-
in the short term, WRT metrics added here, I'd recommend aligning them with otel messaging semantic conventions when applicable and following otel general naming conventions otherwise. I'll go through the changes and add my suggestions.
/cc @pyohannes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should align with the semantic conventions as well, even if they are not final. Maybe we can help push towards that goal.
we can't because we just don't get the right data from the kafka client library.
We're going to discuss naming/approach with otel semconv community - https://github.com/open-telemetry/semantic-conventions/issues/578, I should have an update soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@g7ed6e would you be able to take a quick look at https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/bbfe950ad0ace8123a5e6817fb3767e27a1a2cee/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/README.md and check if any of those metrics apply here and if we can unify the naming? (just discovered they exist).
In any case, metric naming and OTEL community discussion should not block this PR and can be changed in the follow-ups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- in the short term, WRT metrics added here, I'd recommend aligning them with otel messaging semantic conventions when applicable and following otel general naming conventions otherwise. I'll go through the changes and add my suggestions.
💯 Agreed.
@lmolkova Ideally this could be modeled on top of generic messaging metrics that were recently defined, but it seems to me the overlap is quite small:
messaging.kafka.message.transmitted
could becomemessaging.publish.messages
messaging.kafka.message.received
could becomemessaging.receive.messages
Besides that we could validate on what attributes we can converge, I'll try to have a closer look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pyohannes : I updated and renamed messaging.kafka.message.tx
to messaging.publish.messages and
messaging.kafka.message.rx
to messaging.receive.messages.
@lmolkova : I had a look to the linked implementation. Currently (in this Aspire PR) we do not intercept messages between user code and kafka client library. Instead we use a librdkafka feature which perdiodically fetch statistics (please notice we only parse the top level properties from the statistics json for now, there should be other metrics we may probably use and map to metrics in the future).
Regarding the interceptor / decorator approach we may implement it, but before I would like to ensure that we want to do this in Aspire component as having a look to the other components they do not hold too much code/logic and only exposes existing clients.
@eerhardt / @davidfowl : What are your thoughts about implementing otel distributed tracing with an internal wrapper around IProducer<TKey, TValue>
/ IConsumer<TKey, TValue>
living in Aspire.Confluent.Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eerhardt / @davidfowl : What are your thoughts about implementing otel distributed tracing with an internal wrapper around IProducer<TKey, TValue> / IConsumer<TKey, TValue> living in Aspire.Confluent.Kafka.
Yes, this sounds reasonable as a way to drive the conversation forward for the default .NET Kafka library.
@eerhardt I know we don't do this today with rabbit MQ (I think it's harder there because there are more APIs to wrap).
src/Components/Aspire.Confluent.Kafka/AspireKafkaConsumerExtensions.cs
Outdated
Show resolved
Hide resolved
samples/kafkaBasic/KafkaBasic.sln
Outdated
@@ -0,0 +1,42 @@ | |||
Microsoft Visual Studio Solution File, Format Version 12.00 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this sample be moved to https://github.com/dotnet/aspire-samples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm planning a follow up PR to add more metrics and (maybe) implement distributed tracing support by intercepting calls between user code and Confluent.Kafka. Having the sample in the solution highly ease debugging things. I propose to move the sample to the samples repo after that.
950b59d
to
a68eb0d
Compare
@g7ed6e importing now. |
a68eb0d
to
0d720b0
Compare
Thanks @mitchdenny CI now completes successfully |
84fda9b
to
ba28026
Compare
ba28026
to
3a2c4e4
Compare
I pushed 2 commits to update the ConfigurationSchema.json file to be more complete (sorted and using the generator from #1383). I put them in 2 commits for easy reviewing - first just sorts the existing properties. The second can be diff'd by itself to see what the generator changed. Unfortunately I hit a bug with the ConfigurationBinder source generator - dotnet/runtime#96652. We aren't able to automate updating the ConfigurationSchema.json file automatically until we have a fix for that issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking pretty good. Thanks for the contribution. I have a few comments/questions. Once they are addressed I believe we can get this merged.
samples/kafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj
Outdated
Show resolved
Hide resolved
src/Components/Aspire.Confluent.Kafka/Aspire.Confluent.Kafka.csproj
Outdated
Show resolved
Hide resolved
|
||
ConsumerConfig config = new(); | ||
configSection.GetSection(nameof(KafkaConsumerSettings.Config)).Bind(config); | ||
settings.Config = config; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The typical pattern we have is that the "Settings" object and the underlying library's "Options" object (in this case ConsumerConfig
) are separate. There isn't a reference between them. Can we remove this property on KafkaConsumerSettings
(and the producer as well)?
That way we can use the IOptions
pattern for ConsumerConfig
, which is what the Redis component does for its ConfigurationOptions
object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more at Kafka, there is a bit of a special pattern here in that the ConsumerConfig
is separate from the ConsumerBuilder
. We don't really have that pattern in the other components - typically there is just 1 object - ConfigurationOptions in Redis, IConnectionFactory in RabbitMQ.
Maybe what we have here is fine for now. It would be awkward to have 3 "configure" callbacks - configureSettings, configureConfig, configureBuilder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eerhardt The reference to the underlying library configuration object in the Aspire's "Settings" object lets customer configure a dedicated serializer / deserialize per pair of generic parameters provided to the underlying confluent XXXBuilder. Using IOptions pattern here would defeat it.
I "reproduced" the pattern I observed in the RabbitMQ implementation.
@@ -331,6 +331,29 @@ public void EnsureAllRabitMQManifestTypesHaveVersion0Suffix() | |||
Assert.Equal("container.v0", server.GetProperty("type").GetString()); | |||
} | |||
|
|||
[Fact] | |||
public void EnsureAllKafkaManifestTypesHaveVersion0Suffix() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a test to makes sure the connectionString
field is being emitted.
I think this is really close. A question or two from my perspective, but from an app model point of view this looks good. |
} | ||
} | ||
|
||
[LoggerMessage(LogLevel.Warning, EventId = 42, Message = "Invalid statistics json payload received: `{json}`")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 42
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which value would you suggest ?
edit: Is there an already existing source file containing event id ? I did not find one yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at other usages around dotnet/aspnetcore, it looks like the convention is to start at 1
.
tests/Aspire.Confluent.Kafka.Tests/ProducerConfigurationTests.cs
Outdated
Show resolved
Hide resolved
|
||
ConsumerConfig config = new(); | ||
configSection.GetSection(nameof(KafkaConsumerSettings.Config)).Bind(config); | ||
settings.Config = config; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking more at Kafka, there is a bit of a special pattern here in that the ConsumerConfig
is separate from the ConsumerBuilder
. We don't really have that pattern in the other components - typically there is just 1 object - ConfigurationOptions in Redis, IConnectionFactory in RabbitMQ.
Maybe what we have here is fine for now. It would be awkward to have 3 "configure" callbacks - configureSettings, configureConfig, configureBuilder.
33133d7
to
30fba83
Compare
apply pr suggestions apply pr suggestions apply pr suggestions Sort ConfigurationSchema.json properties Update ConfigurationSchema.json using ConfigSchemaGenerator apply pr suggestions apply pr suggestions apply pr suggesstions apply pr suggesstions apply pr suggestions apply pr suggestions drop kafka sample from this repo apply pr suggestions
30fba83
to
c540de4
Compare
@eerhardt I just squashed end rebased until the Orleans support adding. |
- Make Kafka Settings Config object non-nullable and not settable from user code. - Use EventId 1 in Logging - Log response body in Functional tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks, @g7ed6e, for this contribution and all your great work.
I pushed some review feedback changes. Let me know if you have any issues with the changes.
Unless anyone else has any objections, I will merge this once CI passes.
…he type can be created publicly.
Thanks so much @g7ed6e! |
Relates to #884