Skip to content

Commit

Permalink
Fix support for apiextensions.k8s.io/v1beta1 (#19)
Browse files Browse the repository at this point in the history
* Reverted k8s client to support apiextensions.k8s.io/v1beta1
  • Loading branch information
winromulus authored Oct 31, 2019
1 parent 65aca81 commit c61cba8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 14 deletions.
38 changes: 27 additions & 11 deletions ES.Kubernetes.Reflector.CertManager/Monitor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using ES.Kubernetes.Reflector.CertManager.Constants;
Expand All @@ -17,6 +18,7 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Rest;

namespace ES.Kubernetes.Reflector.CertManager
{
Expand All @@ -27,22 +29,25 @@ public class Monitor : IHostedService, IHealthCheck
private readonly Dictionary<string, ManagedWatcher<Certificate, object>> _certificatesWatchers =
new Dictionary<string, ManagedWatcher<Certificate, object>>();

private readonly ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> _crdWatcher;
private readonly ManagedWatcher<V1beta1CustomResourceDefinition, V1beta1CustomResourceDefinitionList> _crdWatcher;
private readonly FeederQueue<WatcherEvent> _eventQueue;
private readonly ILogger<Monitor> _logger;
private readonly IMediator _mediator;
private readonly ManagedWatcher<V1Secret, V1SecretList> _secretsWatcher;
private readonly IKubernetes _apiClient;

public Monitor(ILogger<Monitor> logger,
ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> crdWatcher,
ManagedWatcher<V1beta1CustomResourceDefinition, V1beta1CustomResourceDefinitionList> crdWatcher,
Func<ManagedWatcher<Certificate, object>> certificatesWatcherFactory,
ManagedWatcher<V1Secret, V1SecretList> secretsWatcher,
IKubernetes apiClient,
IMediator mediator)
{
_logger = logger;
_crdWatcher = crdWatcher;
_certificatesWatcherFactory = certificatesWatcherFactory;
_secretsWatcher = secretsWatcher;
_apiClient = apiClient;
_mediator = mediator;

_eventQueue = new FeederQueue<WatcherEvent>(OnEvent, OnEventHandlingError);
Expand All @@ -52,7 +57,8 @@ public Monitor(ILogger<Monitor> logger,
_secretsWatcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalSecretWatcherEvent
{
Item = e.Item, Type = e.Type,
Item = e.Item,
Type = e.Type,
CertificateResourceDefinitionVersions = _certificatesWatchers.Keys.ToList()
});
_secretsWatcher.RequestFactory = async c =>
Expand All @@ -67,16 +73,16 @@ public Monitor(ILogger<Monitor> logger,
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
_logger.LogDebug("{type} watcher {state}", typeof(V1beta1CustomResourceDefinition).Name,
update.State);
await sender.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}",
typeof(V1CustomResourceDefinition).Name, update.State);
typeof(V1beta1CustomResourceDefinition).Name, update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
_logger.LogDebug("{type} watcher {state}", typeof(V1beta1CustomResourceDefinition).Name,
update.State);
break;
}
Expand All @@ -94,7 +100,17 @@ public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,

public async Task StartAsync(CancellationToken cancellationToken)
{
await _crdWatcher.Start();
try
{
await _apiClient.ListCustomResourceDefinitionAsync(cancellationToken: cancellationToken);
await _crdWatcher.Start();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogError(
"Current kubernetes version does not support {type} apiVersion {version}.",
V1beta1CustomResourceDefinition.KubeKind, V1beta1CustomResourceDefinition.KubeApiVersion);
}
}

public async Task StopAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -153,7 +169,7 @@ private async Task OnEventHandlingError(WatcherEvent e, Exception ex)
}


private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
private async Task OnCrdEvent(WatcherEvent<V1beta1CustomResourceDefinition> request)
{
if (request.Type != WatchEventType.Added && request.Type != WatchEventType.Modified) return;
if (request.Item.Spec?.Names == null) return;
Expand All @@ -164,7 +180,7 @@ private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
if (versions.TrueForAll(s => _certificatesWatchers.ContainsKey(s))) return;

_logger.LogInformation("{crdType} {kind} in group {group} versions updated to {versions}",
typeof(V1CustomResourceDefinition).Name,
typeof(V1beta1CustomResourceDefinition).Name,
request.Item.Spec.Names.Kind,
request.Item.Spec.Group,
versions);
Expand All @@ -180,11 +196,11 @@ private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
watcher.Tag = version;
watcher.OnStateChanged = OnWatcherStateChanged;
watcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent {Item = e.Item, Type = e.Type});
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent { Item = e.Item, Type = e.Type });
watcher.RequestFactory = async client => await client.ListClusterCustomObjectWithHttpMessagesAsync(
request.Item.Spec.Group,
version, request.Item.Spec.Names.Plural, watch: true,
timeoutSeconds: (int) TimeSpan.FromHours(1).TotalSeconds);
timeoutSeconds: (int)TimeSpan.FromHours(1).TotalSeconds);
_certificatesWatchers.Add(version, watcher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<ItemGroup>
<PackageReference Include="Autofac" Version="4.9.4" />
<PackageReference Include="KubernetesClient" Version="1.6.3" />
<PackageReference Include="KubernetesClient" Version="1.5.28" />
<PackageReference Include="MediatR" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="2.2.5" />
Expand Down
2 changes: 1 addition & 1 deletion ES.Kubernetes.Reflector.Core/Monitoring/ManagedWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public async Task Start()
_semaphore.Wait();
var request = await _requestFactory(_client);

_watcher = request.Watch<TResource, TResourceList>((eventType, item) =>
_watcher = request.Watch<TResource>((eventType, item) =>
{
var notification = new TNotification {Item = item, Type = eventType};
OnBeforePublish?.Invoke(notification);
Expand Down
2 changes: 1 addition & 1 deletion ES.Kubernetes.Reflector/ES.Kubernetes.Reflector.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="KubernetesClient" Version="1.6.3" />
<PackageReference Include="KubernetesClient" Version="1.5.28" />
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="7.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Diagnostics.HealthChecks" Version="2.2.0" />
Expand Down

0 comments on commit c61cba8

Please sign in to comment.