Skip to content

Commit

Permalink
Add Aspire.Confluent.Kafka component
Browse files Browse the repository at this point in the history
apply pr suggestions

apply pr suggestions

apply pr suggestions
  • Loading branch information
g7ed6e committed Jan 3, 2024
1 parent 96fd8a1 commit 84fda9b
Show file tree
Hide file tree
Showing 57 changed files with 3,553 additions and 3 deletions.
45 changes: 45 additions & 0 deletions Aspire.sln
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,20 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.MongoDB.Driver.Tests
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConfigurationSchemaGenerator.Tests", "tests\ConfigurationSchemaGenerator.Tests\ConfigurationSchemaGenerator.Tests.csproj", "{00FEA181-84C9-42A7-AC81-29A9F176A1A0}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "KafkaBasic", "KafkaBasic", "{587D0C62-D596-4676-8081-3EFC72946D32}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Consumer", "samples\KafkaBasic\Consumer\Consumer.csproj", "{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.AppHost", "samples\KafkaBasic\KafkaBasic.AppHost\KafkaBasic.AppHost.csproj", "{51577092-DAC9-424E-A2E5-CE51BC58D827}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaBasic.ServiceDefaults", "samples\KafkaBasic\KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj", "{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Producer", "samples\KafkaBasic\Producer\Producer.csproj", "{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka", "src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj", "{174E0507-3BB0-4CDC-829E-9CA75DA66473}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Aspire.Confluent.Kafka.Tests", "tests\Aspire.Confluent.Kafka.Tests\Aspire.Confluent.Kafka.Tests.csproj", "{A8CB331A-1247-41D9-8118-538E5A2CC9DF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -458,6 +472,30 @@ Global
{00FEA181-84C9-42A7-AC81-29A9F176A1A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{00FEA181-84C9-42A7-AC81-29A9F176A1A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{00FEA181-84C9-42A7-AC81-29A9F176A1A0}.Release|Any CPU.Build.0 = Release|Any CPU
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4}.Release|Any CPU.Build.0 = Release|Any CPU
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Debug|Any CPU.Build.0 = Debug|Any CPU
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Release|Any CPU.ActiveCfg = Release|Any CPU
{51577092-DAC9-424E-A2E5-CE51BC58D827}.Release|Any CPU.Build.0 = Release|Any CPU
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5}.Release|Any CPU.Build.0 = Release|Any CPU
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B}.Release|Any CPU.Build.0 = Release|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Debug|Any CPU.Build.0 = Debug|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.ActiveCfg = Release|Any CPU
{174E0507-3BB0-4CDC-829E-9CA75DA66473}.Release|Any CPU.Build.0 = Release|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A8CB331A-1247-41D9-8118-538E5A2CC9DF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -537,6 +575,13 @@ Global
{20A5A907-A135-4735-B4BF-E13514F360E3} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{E592E447-BA3C-44FA-86C1-EBEDC864A644} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{00FEA181-84C9-42A7-AC81-29A9F176A1A0} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
{587D0C62-D596-4676-8081-3EFC72946D32} = {D173887B-AF42-4576-B9C1-96B9E9B3D9C0}
{FAD7EB8D-549A-4989-BA0A-EA66F454AAC4} = {587D0C62-D596-4676-8081-3EFC72946D32}
{51577092-DAC9-424E-A2E5-CE51BC58D827} = {587D0C62-D596-4676-8081-3EFC72946D32}
{FD6EDF50-57B1-41ED-B3A9-2353A64BD2E5} = {587D0C62-D596-4676-8081-3EFC72946D32}
{E72357DF-BB44-4E92-A70B-C354DB6D4F0B} = {587D0C62-D596-4676-8081-3EFC72946D32}
{174E0507-3BB0-4CDC-829E-9CA75DA66473} = {27381127-6C45-4B4C-8F18-41FF48DFE4B2}
{A8CB331A-1247-41D9-8118-538E5A2CC9DF} = {4981B3A5-4AFD-4191-BF7D-8692D9783D60}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {6DCEDFEC-988E-4CB3-B45B-191EB5086E0C}
Expand Down
4 changes: 4 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Queues" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.AzureServiceBus" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.Kafka" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.MongoDb" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.MySql" Version="8.0.0" />
<PackageVersion Include="AspNetCore.HealthChecks.NpgSql" Version="8.0.0" />
Expand Down Expand Up @@ -64,6 +65,7 @@
<PackageVersion Include="Microsoft.Extensions.Primitives" Version="$(MicrosoftExtensionsPrimitivesPackageVersion)" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="$(MicrosoftExtensionsHttpResiliencePackageVersion)" />
<!-- external dependencies -->
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
<PackageVersion Include="Dapr.AspNetCore" Version="1.12.0" />
<PackageVersion Include="DnsClient" Version="1.7.0" />
<PackageVersion Include="Grpc.AspNetCore" Version="2.59.0" />
Expand Down Expand Up @@ -104,5 +106,7 @@
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Workloads" Version="8.0.0-beta.23564.4" />
<PackageVersion Include="Microsoft.Signed.Wix" Version="1.0.0-v3.14.0.5722" />
<PackageVersion Include="Microsoft.DotNet.Build.Tasks.Installers" Version="8.0.0-beta.23564.4" />
<!-- unit test dependencies -->
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.0.0" />
</ItemGroup>
</Project>
16 changes: 16 additions & 0 deletions samples/kafkaBasic/Consumer/Consumer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj" />
<ProjectReference Include="..\KafkaBasic.ServiceDefaults\KafkaBasic.ServiceDefaults.csproj" />
</ItemGroup>
</Project>
37 changes: 37 additions & 0 deletions samples/kafkaBasic/Consumer/ConsumerWorker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Confluent.Kafka;

namespace Consumer;

internal sealed class ConsumerWorker(IConsumer<Ignore, string> consumer, ILogger<ConsumerWorker> logger) : BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
long i = 0;
return Task.Factory.StartNew(async () =>
{
consumer.Subscribe("topic");
while (!stoppingToken.IsCancellationRequested)
{
ConsumeResult<Ignore, string>? result = default;
try
{
result = consumer.Consume(TimeSpan.FromSeconds(1));
}
catch (ConsumeException ex) when (ex.Error.Code == ErrorCode.UnknownTopicOrPart)
{
await Task.Delay(100);
continue;
}
i++;
if (i % 1000 == 0)
{
logger.LogInformation($"Received {i} messages. current offset is '{result!.Offset}'");
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
}
15 changes: 15 additions & 0 deletions samples/kafkaBasic/Consumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Confluent.Kafka;
using Consumer;

var builder = Host.CreateApplicationBuilder(args);

builder.AddServiceDefaults();

builder.AddKafkaConsumer<Ignore, string>("kafka");

builder.Services.AddHostedService<ConsumerWorker>();

builder.Build().Run();
8 changes: 8 additions & 0 deletions samples/kafkaBasic/Consumer/appsettings.Development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}
21 changes: 21 additions & 0 deletions samples/kafkaBasic/Consumer/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information",
"Azure": "Warning"
}
},
"Aspire": {
"Confluent": {
"Kafka": {
"Consumer": {
"Config": {
"AutoOffsetReset": "Earliest",
"GroupId": "aspire"
}
}
}
}
}
}
8 changes: 8 additions & 0 deletions samples/kafkaBasic/KafkaBasic.AppHost/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Project>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
<Import Project="../../../src/Aspire.Hosting/build/Aspire.Hosting.props" />

</Project>
8 changes: 8 additions & 0 deletions samples/kafkaBasic/KafkaBasic.AppHost/Directory.Build.targets
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<Project>

<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.targets', '$(MSBuildThisFileDirectory)../'))" />

<!-- NOTE: This line is only required because we are using P2P references, not NuGet. It will not exist in real apps. -->
<Import Project="../../../src/Aspire.Hosting/build/Aspire.Hosting.targets" />

</Project>
17 changes: 17 additions & 0 deletions samples/kafkaBasic/KafkaBasic.AppHost/KafkaBasic.AppHost.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsAspireHost>true</IsAspireHost>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Aspire.Hosting\Aspire.Hosting.csproj" />
<ProjectReference Include="..\Consumer\Consumer.csproj" />
<ProjectReference Include="..\Producer\Producer.csproj" />
</ItemGroup>

</Project>
14 changes: 14 additions & 0 deletions samples/kafkaBasic/KafkaBasic.AppHost/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

var builder = DistributedApplication.CreateBuilder(args);

var containerResource = builder.AddKafkaContainer("kafka");

builder.AddProject<Projects.Producer>("producer")
.WithReference(containerResource);

builder.AddProject<Projects.Consumer>("consumer")
.WithReference(containerResource);

builder.Build().Run();
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"applicationUrl": "http://localhost:15024",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development",
"DOTNET_ENVIRONMENT": "Development",
"DOTNET_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:16132"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
9 changes: 9 additions & 0 deletions samples/kafkaBasic/KafkaBasic.AppHost/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning",
"Aspire.Hosting.Dcp": "Warning"
}
}
}
119 changes: 119 additions & 0 deletions samples/kafkaBasic/KafkaBasic.ServiceDefaults/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;

namespace Microsoft.Extensions.Hosting;

public static class Extensions
{
public static IHostApplicationBuilder AddServiceDefaults(this IHostApplicationBuilder builder)
{
builder.ConfigureOpenTelemetry();

builder.AddDefaultHealthChecks();

builder.Services.AddServiceDiscovery();

builder.Services.ConfigureHttpClientDefaults(http =>
{
// Turn on resilience by default
http.AddStandardResilienceHandler();
// Turn on service discovery by default
http.UseServiceDiscovery();
});

return builder;
}

public static IHostApplicationBuilder ConfigureOpenTelemetry(this IHostApplicationBuilder builder)
{
builder.Logging.AddOpenTelemetry(logging =>
{
logging.IncludeFormattedMessage = true;
logging.IncludeScopes = true;
});

builder.Services.AddOpenTelemetry()
.WithMetrics(metrics =>
{
metrics.AddRuntimeInstrumentation()
.AddBuiltInMeters();
})
.WithTracing(tracing =>
{
if (builder.Environment.IsDevelopment())
{
// We want to view all traces in development
tracing.SetSampler(new AlwaysOnSampler());
}
tracing.AddAspNetCoreInstrumentation()
.AddGrpcClientInstrumentation()
.AddHttpClientInstrumentation();
});

builder.AddOpenTelemetryExporters();

return builder;
}

private static IHostApplicationBuilder AddOpenTelemetryExporters(this IHostApplicationBuilder builder)
{
var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]);

if (useOtlpExporter)
{
builder.Services.Configure<OpenTelemetryLoggerOptions>(logging => logging.AddOtlpExporter());
builder.Services.ConfigureOpenTelemetryMeterProvider(metrics => metrics.AddOtlpExporter());
builder.Services.ConfigureOpenTelemetryTracerProvider(tracing => tracing.AddOtlpExporter());
}

// Uncomment the following lines to enable the Prometheus exporter (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package)
// builder.Services.AddOpenTelemetry()
// .WithMetrics(metrics => metrics.AddPrometheusExporter());

// Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.Exporter package)
// builder.Services.AddOpenTelemetry()
// .UseAzureMonitor();

return builder;
}

public static IHostApplicationBuilder AddDefaultHealthChecks(this IHostApplicationBuilder builder)
{
builder.Services.AddHealthChecks()
// Add a default liveness check to ensure app is responsive
.AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]);

return builder;
}

public static WebApplication MapDefaultEndpoints(this WebApplication app)
{
// Uncomment the following line to enable the Prometheus endpoint (requires the OpenTelemetry.Exporter.Prometheus.AspNetCore package)
// app.MapPrometheusScrapingEndpoint();

// All health checks must pass for app to be considered ready to accept traffic after starting
app.MapHealthChecks("/health");

// Only health checks tagged with the "live" tag must pass for app to be considered alive
app.MapHealthChecks("/alive", new HealthCheckOptions
{
Predicate = r => r.Tags.Contains("live")
});

return app;
}

private static MeterProviderBuilder AddBuiltInMeters(this MeterProviderBuilder meterProviderBuilder) =>
meterProviderBuilder.AddMeter(
"Microsoft.AspNetCore.Hosting",
"Microsoft.AspNetCore.Server.Kestrel",
"System.Net.Http");
}
Loading

0 comments on commit 84fda9b

Please sign in to comment.