Skip to content

Commit

Permalink
Support v1beta1 version of CRDs in apiextensions.k8s.io (#30)
Browse files Browse the repository at this point in the history
Added timed updates for the older versions of CRDs
Should support kubernetes 1.14.8+
  • Loading branch information
winromulus authored Apr 7, 2020
1 parent ac44c6f commit 1da7e61
Show file tree
Hide file tree
Showing 45 changed files with 123 additions and 76 deletions.
21 changes: 12 additions & 9 deletions azure-pipelines.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: $(version).$(Rev:r)

variables:
version: 4.1
version: 4.2
buildConfiguration: "Release"
imageRepository: "emberstack/kubernetes-reflector"
DOCKER_CLI_EXPERIMENTAL: 'enabled'
Expand All @@ -10,6 +10,9 @@ trigger:
branches:
include:
- "*"
paths:
include:
- src/*

stages:

Expand All @@ -31,14 +34,14 @@ stages:
- task: HelmInstaller@1
inputs:
helmVersionToInstall: 'latest'
helmVersionToInstall: '3.1.1'

- script: |
helm package --destination $(Build.ArtifactStagingDirectory)/artifacts/helm --version $(Build.BuildNumber) --app-version $(Build.BuildNumber) helm/reflector
helm package --destination $(Build.ArtifactStagingDirectory)/artifacts/helm --version $(Build.BuildNumber) --app-version $(Build.BuildNumber) src/helm/reflector
displayName: 'helm package'
- script: 'helm template --namespace kube-system reflector reflector > $(Build.ArtifactStagingDirectory)/artifacts/kubectl/reflector-$(Build.BuildNumber).yaml'
workingDirectory: helm
workingDirectory: src/helm
displayName: 'helm template'

- publish: '$(Build.ArtifactStagingDirectory)/artifacts/helm'
Expand Down Expand Up @@ -81,9 +84,9 @@ stages:
inputs:
containerRegistry: 'Emberstack Docker Hub'
repository: $(imageRepository)
Dockerfile: ES.Kubernetes.Reflector.Host/Dockerfile
Dockerfile: src/ES.Kubernetes.Reflector.Host/Dockerfile
command: build
buildContext: .
buildContext: src
tags: 'build-$(Build.BuildNumber)-arm32'

- task: Docker@2
Expand Down Expand Up @@ -112,9 +115,9 @@ stages:
inputs:
containerRegistry: 'Emberstack Docker Hub'
repository: $(imageRepository)
Dockerfile: ES.Kubernetes.Reflector.Host/Dockerfile
Dockerfile: src/ES.Kubernetes.Reflector.Host/Dockerfile
command: build
buildContext: .
buildContext: src
tags: 'build-$(Build.BuildNumber)-amd64'

- task: Docker@2
Expand Down Expand Up @@ -188,7 +191,7 @@ stages:

- task: HelmInstaller@1
inputs:
helmVersionToInstall: 'latest'
helmVersionToInstall: '3.1.1'

- script: |
docker pull $(imageRepository):build-$(Build.BuildNumber)-amd64
Expand Down
12 changes: 3 additions & 9 deletions samples/helm/reflector-sample/templates/certificate.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: certmanager.k8s.io/v1alpha1
apiVersion: cert-manager.io/v1alpha2
kind: Certificate
metadata:
metadata:
name: {{ template "reflector-sample.fullname" $ }}-cert
annotations:
reflector.v1.k8s.emberstack.com/secret-reflection-allowed: "true"
Expand All @@ -11,10 +11,4 @@ spec:
name: {{ template "reflector-sample.fullname" $ }}-someissuer
kind: ClusterIssuer
dnsNames:
- '*.dev.winromulus.com'
acme:
config:
- dns01:
provider: someprovider
domains:
- '*.dev.winromulus.com'
- '*.dev.winromulus.com'
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Rest;
using Timer = System.Timers.Timer;

namespace ES.Kubernetes.Reflector.CertManager
{
Expand All @@ -29,22 +30,26 @@ public class Monitor : IHostedService, IHealthCheck
private readonly Dictionary<string, ManagedWatcher<Certificate, object>> _certificatesWatchers =
new Dictionary<string, ManagedWatcher<Certificate, object>>();

private readonly ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> _crdWatcher;
private readonly ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> _crdV1Watcher;

private readonly FeederQueue<WatcherEvent> _eventQueue;
private readonly ILogger<Monitor> _logger;
private readonly IMediator _mediator;
private readonly ManagedWatcher<V1Secret, V1SecretList> _secretsWatcher;
private readonly IKubernetes _apiClient;

private readonly Timer _v1Beta1CrdMonitorTimer = new Timer();
private bool _v1Beta1CrdMonitorTimerFaulted;

public Monitor(ILogger<Monitor> logger,
ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> crdWatcher,
ManagedWatcher<V1CustomResourceDefinition, V1CustomResourceDefinitionList> crdV1Watcher,
Func<ManagedWatcher<Certificate, object>> certificatesWatcherFactory,
ManagedWatcher<V1Secret, V1SecretList> secretsWatcher,
IKubernetes apiClient,
IMediator mediator)
{
_logger = logger;
_crdWatcher = crdWatcher;
_crdV1Watcher = crdV1Watcher;
_certificatesWatcherFactory = certificatesWatcherFactory;
_secretsWatcher = secretsWatcher;
_apiClient = apiClient;
Expand All @@ -65,34 +70,22 @@ public Monitor(ILogger<Monitor> logger,
await c.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true);


_crdWatcher.EventHandlerFactory = OnCrdEvent;
_crdWatcher.RequestFactory = async c =>
_crdV1Watcher.EventHandlerFactory = OnCrdEventV1;
_crdV1Watcher.RequestFactory = async c =>
await c.ListCustomResourceDefinitionWithHttpMessagesAsync(watch: true);
_crdWatcher.OnStateChanged = async (sender, update) =>
{
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
update.State);
await sender.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}",
typeof(V1CustomResourceDefinition).Name, update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(V1CustomResourceDefinition).Name,
update.State);
break;
}
};
_crdV1Watcher.OnStateChanged = OnCrdWatcherStateChanged;

_v1Beta1CrdMonitorTimer.Elapsed += (_,__)=> Onv1Beta1CrdRefresh();
_v1Beta1CrdMonitorTimer.Interval = 30_000;
}


public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult(_crdWatcher.IsFaulted || _secretsWatcher.IsFaulted ||
return Task.FromResult(_crdV1Watcher.IsFaulted ||
_v1Beta1CrdMonitorTimerFaulted ||
_secretsWatcher.IsFaulted ||
_certificatesWatchers.Values.Any(s => s.IsFaulted)
? HealthCheckResult.Unhealthy()
: HealthCheckResult.Healthy());
Expand All @@ -103,31 +96,94 @@ public async Task StartAsync(CancellationToken cancellationToken)
try
{
await _apiClient.ListCustomResourceDefinitionAsync(cancellationToken: cancellationToken);
await _crdWatcher.Start();
await _crdV1Watcher.Start();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogError(
_logger.LogWarning(
"Current kubernetes version does not support {type} apiVersion {version}.",
V1CustomResourceDefinition.KubeKind, V1CustomResourceDefinition.KubeApiVersion);
Onv1Beta1CrdRefresh();
_v1Beta1CrdMonitorTimer.Start();
}
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await _crdWatcher.Stop();
await _crdV1Watcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
await _secretsWatcher.Stop();
}

private async Task OnWatcherStateChanged<TS, TSL>(ManagedWatcher<TS, TSL, WatcherEvent<TS>> sender,
ManagedWatcherStateUpdate update) where TS : class, IKubernetesObject
private void Onv1Beta1CrdRefresh()
{
try
{
_logger.LogDebug(
"Updating {type} {kind} in group {group}",
typeof(V1beta1CustomResourceDefinition).Name,
CertManagerConstants.CertificateKind, CertManagerConstants.CrdGroup);

_v1Beta1CrdMonitorTimerFaulted = false;
var crd = _apiClient.ListCustomResourceDefinition1().Items
.FirstOrDefault(s =>
s.Spec?.Names != null && s.Spec.Group == CertManagerConstants.CrdGroup &&
s.Spec.Names.Kind == CertManagerConstants.CertificateKind);

if (crd == null) return;
OnCrdVersionUpdate(crd.GetType().Name, crd.Spec.Names.Kind,
crd.Spec.Group, crd.Spec.Names.Plural, crd.Spec.Versions.Select(s => s.Name).ToList()).Wait();


}
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogWarning(
"Current kubernetes version does not support {type} apiVersion {version}.",
V1beta1CustomResourceDefinition.KubeKind, V1beta1CustomResourceDefinition.KubeApiVersion);

_v1Beta1CrdMonitorTimer.Stop();
}
catch (Exception exception)
{
_v1Beta1CrdMonitorTimerFaulted = true;
_v1Beta1CrdMonitorTimer.Stop();
_logger.LogError(exception,
"Error occured while getting {kind} version {version}",
V1beta1CustomResourceDefinition.KubeKind, V1beta1CustomResourceDefinition.KubeApiVersion);
}


}

private async Task OnCrdWatcherStateChanged<TResource, TResourceList>(
ManagedWatcher<TResource, TResourceList, WatcherEvent<TResource>> sender, ManagedWatcherStateUpdate update)
where TResource : class, IKubernetesObject
{
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {state}", typeof(TResource).Name, update.State);
await sender.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {state}",
typeof(TResource).Name, update.State);
break;
default:
_logger.LogDebug("{type} watcher {state}", typeof(TResource).Name, update.State);
break;
}
}

private async Task OnWatcherStateChanged<TResource, TResourceList>(ManagedWatcher<TResource, TResourceList, WatcherEvent<TResource>> sender,
ManagedWatcherStateUpdate update) where TResource : class, IKubernetesObject
{
var tag = sender.Tag ?? string.Empty;
switch (update.State)
{
case ManagedWatcherState.Closed:
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TS).Name, tag, update.State);
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TResource).Name, tag, update.State);
await _secretsWatcher.Stop();
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();

Expand All @@ -137,11 +193,11 @@ private async Task OnWatcherStateChanged<TS, TSL>(ManagedWatcher<TS, TSL, Watche
foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Start();
break;
case ManagedWatcherState.Faulted:
_logger.LogError(update.Exception, "{type} watcher {tag} {state}", typeof(TS).Name, tag,
_logger.LogError(update.Exception, "{type} watcher {tag} {state}", typeof(TResource).Name, tag,
update.State);
break;
default:
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TS).Name, tag, update.State);
_logger.LogDebug("{type} watcher {tag} {state}", typeof(TResource).Name, tag, update.State);
break;
}
}
Expand Down Expand Up @@ -169,7 +225,7 @@ private async Task OnEventHandlingError(WatcherEvent e, Exception ex)
}


private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
private async Task OnCrdEventV1(WatcherEvent<V1CustomResourceDefinition> request)
{
if (request.Type != WatchEventType.Added && request.Type != WatchEventType.Modified) return;
if (request.Item.Spec?.Names == null) return;
Expand All @@ -179,11 +235,16 @@ private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
var versions = request.Item.Spec.Versions.Select(s => s.Name).ToList();
if (versions.TrueForAll(s => _certificatesWatchers.ContainsKey(s))) return;

await OnCrdVersionUpdate(request.Item.GetType().Name, request.Item.Spec.Names.Kind, request.Item.Spec.Group,
request.Item.Spec.Names.Plural, versions);
}

private async Task OnCrdVersionUpdate(string crdType, string crdKind, string crdGroup, string crdPlural, List<string> versions)
{
if (versions.TrueForAll(s => _certificatesWatchers.ContainsKey(s))) return;

_logger.LogInformation("{crdType} {kind} in group {group} versions updated to {versions}",
request.Item.GetType().Name,
request.Item.Spec.Names.Kind,
request.Item.Spec.Group,
versions);
crdType, crdKind, crdGroup, versions);

foreach (var certificatesWatcher in _certificatesWatchers.Values) await certificatesWatcher.Stop();
await _secretsWatcher.Stop();
Expand All @@ -198,8 +259,7 @@ private async Task OnCrdEvent(WatcherEvent<V1CustomResourceDefinition> request)
watcher.EventHandlerFactory = e =>
_eventQueue.FeedAsync(new InternalCertificateWatcherEvent { Item = e.Item, Type = e.Type });
watcher.RequestFactory = async client => await client.ListClusterCustomObjectWithHttpMessagesAsync(
request.Item.Spec.Group,
version, request.Item.Spec.Names.Plural, watch: true,
crdGroup, version, crdPlural, watch: true,
timeoutSeconds: (int)TimeSpan.FromHours(1).TotalSeconds);
_certificatesWatchers.Add(version, watcher);
}
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

<ItemGroup>
<PackageReference Include="Autofac" Version="5.1.2" />
<PackageReference Include="KubernetesClient" Version="1.6.21" />
<PackageReference Include="KubernetesClient" Version="1.6.33" />
<PackageReference Include="MediatR" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.3" />
</ItemGroup>

</Project>
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<ItemGroup>
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="KubernetesClient" Version="1.6.21" />
<PackageReference Include="KubernetesClient" Version="1.6.33" />
<PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.9.10" />
<PackageReference Include="Serilog" Version="2.9.0" />
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,7 @@ public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,

public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
await _apiClient.ListCustomResourceDefinitionAsync(cancellationToken: cancellationToken);
await _secretsWatcher.Start();
}
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
{
_logger.LogError(
"Current kubernetes version does not support {type} apiVersion {version}.",
V1CustomResourceDefinition.KubeKind, V1CustomResourceDefinition.KubeApiVersion);
}
await _secretsWatcher.Start();
}

public async Task StopAsync(CancellationToken cancellationToken)
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 1da7e61

Please sign in to comment.