Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New methods - they don't do anything all that functionally different than the main health check registration methods on the AkkaConfigurationBuilder. This is a pure DX choice to support Akka.Persistence plugin authors.

Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ namespace Akka.Persistence.Hosting
where TAdapter : Akka.Persistence.Journal.IReadEventAdapter { }
public Akka.Persistence.Hosting.AkkaPersistenceJournalBuilder AddWriteEventAdapter<TAdapter>(string eventAdapterName, System.Collections.Generic.IEnumerable<System.Type> boundTypes)
where TAdapter : Akka.Persistence.Journal.IWriteEventAdapter { }
public Akka.Persistence.Hosting.AkkaPersistenceJournalBuilder WithCustomHealthCheck(Akka.Hosting.AkkaHealthCheckRegistration registration) { }
public Akka.Persistence.Hosting.AkkaPersistenceJournalBuilder WithHealthCheck(Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus unHealthyStatus = 1, string? name = null, System.Collections.Generic.IEnumerable<string>? tags = null) { }
}
public sealed class AkkaPersistenceSnapshotBuilder
{
public AkkaPersistenceSnapshotBuilder(string snapshotStoreId, Akka.Hosting.AkkaConfigurationBuilder builder) { }
public Akka.Persistence.Hosting.AkkaPersistenceSnapshotBuilder WithCustomHealthCheck(Akka.Hosting.AkkaHealthCheckRegistration registration) { }
public Akka.Persistence.Hosting.AkkaPersistenceSnapshotBuilder WithHealthCheck(Microsoft.Extensions.Diagnostics.HealthChecks.HealthStatus unHealthyStatus = 1, string? name = null, System.Collections.Generic.IEnumerable<string>? tags = null) { }
}
public static class Extensions
Expand Down
133 changes: 0 additions & 133 deletions src/Akka.Persistence.Hosting/AkkaPersistenceHostingExtensions.cs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the builders to their own files because I was tired of having to go looking for them

Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Akka.Configuration;
using Akka.Hosting;
using Akka.Persistence.Journal;
using Akka.Util;
using Akka.Actor;
using Microsoft.Extensions.Diagnostics.HealthChecks;

#nullable enable
namespace Akka.Persistence.Hosting
Expand All @@ -30,133 +24,6 @@ public enum PersistenceMode
SnapshotStore,
}

/// <summary>
/// Used to help build journal configurations
/// </summary>
public sealed class AkkaPersistenceJournalBuilder
{
internal readonly string JournalId;
internal readonly AkkaConfigurationBuilder Builder;
internal readonly Dictionary<Type, HashSet<string>> Bindings = new Dictionary<Type, HashSet<string>>();
internal readonly Dictionary<string, Type> Adapters = new Dictionary<string, Type>();
internal AkkaHealthCheckRegistration? HealthCheckRegistration = null;

public AkkaPersistenceJournalBuilder(string journalId, AkkaConfigurationBuilder builder)
{
JournalId = journalId;
Builder = builder;
}

/// <summary>
/// Uses the built-in journal health check on the Akka.Persistence.Journal.
/// </summary>
/// <param name="unHealthyStatus">Default status to return when the plugin reports <see cref="PersistenceHealthStatus.Unhealthy"/>
/// or <see cref="PersistenceHealthStatus.Degraded"/>. Defaults to degraded.</param>
/// <param name="name">Optional name to add to the health check.</param>
/// <param name="tags">Custom tags for the health check. If null, defaults to ["akka", "persistence", "journal"].</param>
/// <returns>The current builder instance for method chaining.</returns>
public AkkaPersistenceJournalBuilder WithHealthCheck(HealthStatus unHealthyStatus = HealthStatus.Degraded,
string? name = null,
IEnumerable<string>? tags = null)
{
var registration = AddHealthCheck(name, unHealthyStatus, tags);
HealthCheckRegistration = registration;
return this;
}

public AkkaPersistenceJournalBuilder AddEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

public AkkaPersistenceJournalBuilder AddReadEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IReadEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

public AkkaPersistenceJournalBuilder AddWriteEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IWriteEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

private void AddAdapter<TAdapter>(string eventAdapterName, IEnumerable<Type> boundTypes)
{
Adapters[eventAdapterName] = typeof(TAdapter);
foreach (var t in boundTypes)
{
if (!Bindings.ContainsKey(t))
Bindings[t] = new HashSet<string>();
Bindings[t].Add(eventAdapterName);
}
}

private AkkaHealthCheckRegistration AddHealthCheck(string? name, HealthStatus unHealthyStatus, IEnumerable<string>? tags = null)
{
var pluginId = $"akka.persistence.journal.{JournalId}";
var healthCheckTags = tags?.ToList() ?? new List<string> { "akka", "persistence", "journal" };
var registration = new AkkaHealthCheckRegistration(
name ?? $"Akka.Persistence.Journal.{JournalId}",
new JournalHealthCheck(pluginId),
unHealthyStatus,
healthCheckTags);
return registration;
}

/// <summary>
/// INTERNAL API - Builds the HOCON and then injects it.
/// </summary>
internal void Build()
{
// add the health checks if specified - do this FIRST before any early returns
if(HealthCheckRegistration != null)
Builder.WithHealthCheck(HealthCheckRegistration);

// useless configuration - don't bother.
if (Adapters.Count == 0 || Bindings.Count == 0)
return;

var adapters = new StringBuilder()
.Append($"akka.persistence.journal.{JournalId}").Append("{");

AppendAdapters(adapters);

adapters.AppendLine("}");

var finalHocon = ConfigurationFactory.ParseString(adapters.ToString());
Builder.AddHocon(finalHocon, HoconAddMode.Prepend);
}

internal void AppendAdapters(StringBuilder sb)
{
// useless configuration - don't bother.
if (Adapters.Count == 0 || Bindings.Count == 0)
return;

sb.AppendLine("event-adapters {");
foreach (var kv in Adapters)
{
sb.AppendLine($"{kv.Key} = \"{kv.Value.TypeQualifiedName()}\"");
}

sb.AppendLine("}").AppendLine("event-adapter-bindings {");
foreach (var kv in Bindings)
{
sb.AppendLine($"\"{kv.Key.TypeQualifiedName()}\" = [{string.Join(",", kv.Value)}]");
}

sb.AppendLine("}");
}
}

/// <summary>
/// The set of options for generic Akka.Persistence.
/// </summary>
Expand Down
149 changes: 149 additions & 0 deletions src/Akka.Persistence.Hosting/AkkaPersistenceJournalBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Akka.Configuration;
using Akka.Hosting;
using Akka.Persistence.Journal;
using Akka.Util;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Akka.Persistence.Hosting;

/// <summary>
/// Used to help build journal configurations
/// </summary>
public sealed class AkkaPersistenceJournalBuilder
{
internal readonly string JournalId;
internal readonly AkkaConfigurationBuilder Builder;
internal readonly Dictionary<Type, HashSet<string>> Bindings = new Dictionary<Type, HashSet<string>>();
internal readonly Dictionary<string, Type> Adapters = new Dictionary<string, Type>();
internal readonly HashSet<AkkaHealthCheckRegistration> HealthCheckRegistrations = [];

public AkkaPersistenceJournalBuilder(string journalId, AkkaConfigurationBuilder builder)
{
JournalId = journalId;
Builder = builder;
}

/// <summary>
/// Uses the built-in journal health check on the Akka.Persistence.Journal.
/// </summary>
/// <param name="unHealthyStatus">Default status to return when the plugin reports <see cref="PersistenceHealthStatus.Unhealthy"/>
/// or <see cref="PersistenceHealthStatus.Degraded"/>. Defaults to degraded.</param>
/// <param name="name">Optional name to add to the health check.</param>
/// <param name="tags">Custom tags for the health check. If null, defaults to ["akka", "persistence", "journal"].</param>
/// <returns>The current builder instance for method chaining.</returns>
public AkkaPersistenceJournalBuilder WithHealthCheck(HealthStatus unHealthyStatus = HealthStatus.Degraded,
string? name = null,
IEnumerable<string>? tags = null)
{
var registration = AddDefaultHealthCheck(name, unHealthyStatus, tags);
HealthCheckRegistrations.Add(registration);
return this;
}

/// <summary>
/// For Akka.Persistence plugins that have custom health checks (see https://github.com/akkadotnet/Akka.Hosting/issues/678)
/// </summary>
/// <param name="registration">The custom health check registration.</param>
/// <returns>The current builder instance for method chaining.</returns>
public AkkaPersistenceJournalBuilder WithCustomHealthCheck(AkkaHealthCheckRegistration registration)
{
HealthCheckRegistrations.Add(registration);
return this;
}

public AkkaPersistenceJournalBuilder AddEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

public AkkaPersistenceJournalBuilder AddReadEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IReadEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

public AkkaPersistenceJournalBuilder AddWriteEventAdapter<TAdapter>(string eventAdapterName,
IEnumerable<Type> boundTypes) where TAdapter : IWriteEventAdapter
{
AddAdapter<TAdapter>(eventAdapterName, boundTypes);

return this;
}

private void AddAdapter<TAdapter>(string eventAdapterName, IEnumerable<Type> boundTypes)
{
Adapters[eventAdapterName] = typeof(TAdapter);
foreach (var t in boundTypes)
{
if (!Bindings.ContainsKey(t))
Bindings[t] = new HashSet<string>();
Bindings[t].Add(eventAdapterName);
}
}

private AkkaHealthCheckRegistration AddDefaultHealthCheck(string? name, HealthStatus unHealthyStatus, IEnumerable<string>? tags)
{
var pluginId = $"akka.persistence.journal.{JournalId}";
var healthCheckTags = tags?.ToList() ?? ["akka", "persistence", "journal"];
var registration = new AkkaHealthCheckRegistration(
name ?? pluginId,
new JournalHealthCheck(pluginId),
unHealthyStatus,
healthCheckTags);
return registration;
}

/// <summary>
/// INTERNAL API - Builds the HOCON and then injects it.
/// </summary>
internal void Build()
{
// add the health checks if specified - do this FIRST before any early returns
foreach(var hc in HealthCheckRegistrations)
Builder.WithHealthCheck(hc);

// useless configuration - don't bother.
if (Adapters.Count == 0 || Bindings.Count == 0)
return;

var adapters = new StringBuilder()
.Append($"akka.persistence.journal.{JournalId}").Append("{");

AppendAdapters(adapters);

adapters.AppendLine("}");

var finalHocon = ConfigurationFactory.ParseString(adapters.ToString());
Builder.AddHocon(finalHocon, HoconAddMode.Prepend);
}

internal void AppendAdapters(StringBuilder sb)
{
// useless configuration - don't bother.
if (Adapters.Count == 0 || Bindings.Count == 0)
return;

sb.AppendLine("event-adapters {");
foreach (var kv in Adapters)
{
sb.AppendLine($"{kv.Key} = \"{kv.Value.TypeQualifiedName()}\"");
}

sb.AppendLine("}").AppendLine("event-adapter-bindings {");
foreach (var kv in Bindings)
{
sb.AppendLine($"\"{kv.Key.TypeQualifiedName()}\" = [{string.Join(",", kv.Value)}]");
}

sb.AppendLine("}");
}
}
80 changes: 80 additions & 0 deletions src/Akka.Persistence.Hosting/AkkaPersistenceSnapshotBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System.Collections.Generic;
using System.Linq;
using Akka.Hosting;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Akka.Persistence.Hosting;

/// <summary>
/// Used to help build snapshot store configurations
/// </summary>
public sealed class AkkaPersistenceSnapshotBuilder
{
internal readonly string SnapshotStoreId;
internal readonly AkkaConfigurationBuilder Builder;
internal readonly HashSet<AkkaHealthCheckRegistration> HealthCheckRegistrations = [];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we take a hashset of health check registrations, rather than a singular registration.


public AkkaPersistenceSnapshotBuilder(string snapshotStoreId, AkkaConfigurationBuilder builder)
{
SnapshotStoreId = snapshotStoreId;
Builder = builder;
}

/// <summary>
/// Uses the built-in snapshot store health check on the Akka.Persistence.SnapshotStore.
/// </summary>
/// <param name="unHealthyStatus">Default status to return when the plugin reports <see cref="PersistenceHealthStatus.Unhealthy"/>
/// or <see cref="PersistenceHealthStatus.Degraded"/>. Defaults to degraded.</param>
/// <param name="name">Optional name to add to the health check.</param>
/// <param name="tags">Custom tags for the health check. If null, defaults to ["akka", "persistence", "snapshot-store"].</param>
/// <returns>The current builder instance for method chaining.</returns>
public AkkaPersistenceSnapshotBuilder WithHealthCheck(HealthStatus unHealthyStatus = HealthStatus.Degraded,
string? name = null,
IEnumerable<string>? tags = null)
{
var registration = AddDefaultHealthCheck(name, unHealthyStatus, tags);
HealthCheckRegistrations.Add(registration);
return this;
}

/// <summary>
/// For Akka.Persistence plugins that have custom health checks (see https://github.com/akkadotnet/Akka.Hosting/issues/678)
/// </summary>
/// <param name="registration">The custom health check registration.</param>
/// <returns>The current builder instance for method chaining.</returns>
public AkkaPersistenceSnapshotBuilder WithCustomHealthCheck(AkkaHealthCheckRegistration registration)
{
HealthCheckRegistrations.Add(registration);
return this;
}

/// <summary>
/// Backward-compatible overload for external plugins that use the 2-parameter version.
/// </summary>
internal AkkaHealthCheckRegistration AddHealthCheck(string? name, HealthStatus unHealthyStatus)
{
return AddDefaultHealthCheck(name, unHealthyStatus, tags: null);
}

internal AkkaHealthCheckRegistration AddDefaultHealthCheck(string? name, HealthStatus unHealthyStatus, IEnumerable<string>? tags)
{
var pluginId = $"akka.persistence.snapshot-store.{SnapshotStoreId}";
var healthCheckTags = tags?.ToList() ?? ["akka", "persistence", "snapshot-store"];
var registration = new AkkaHealthCheckRegistration(
name ?? pluginId,
new SnapshotStoreHealthCheck(pluginId),
unHealthyStatus,
healthCheckTags);
return registration;
}

/// <summary>
/// INTERNAL API - Registers health checks if configured.
/// </summary>
internal void Build()
{
// add the health checks if specified
foreach(var hc in HealthCheckRegistrations)
Builder.WithHealthCheck(hc);
}
}
Loading