Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/ct.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ target-branch: master # change to main if your default branch is main
chart-dirs:
- charts/fluent-operator

check-version-increment: true
# Disabled to allow PRs from forks where master may be behind upstream
# Chart versioning is still enforced through maintainer review process
check-version-increment: false
validate-maintainers: true
1 change: 1 addition & 0 deletions .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name: E2E Tests
on:
push:
pull_request:
workflow_dispatch:

jobs:
test-e2e:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ build-fb-arm64:
# Build amd64 Fluentd container image
.PHONY: build-fd-amd64
build-fd-amd64:
docker build --platform=linux/amd64 -f cmd/fluent-watcher/fluentd/Dockerfile . -t ${FD_IMG}
docker build --platform=linux/amd64 -f cmd/fluent-watcher/fluentd/Dockerfile --build-arg FLUENTD_BASE_VERSION=$(shell cat cmd/fluent-watcher/fluentd/VERSION | tr -d '\n\r ') . -t ${FD_IMG}

# Build arm64 Fluentd container image
.PHONY: build-fd-arm64
build-fd-arm64:
docker build --platform=linux/arm64 -f cmd/fluent-watcher/fluentd/Dockerfile . -t ${FD_IMG}
docker build --platform=linux/arm64 -f cmd/fluent-watcher/fluentd/Dockerfile --build-arg FLUENTD_BASE_VERSION=$(shell cat cmd/fluent-watcher/fluentd/VERSION | tr -d '\n\r ') . -t ${FD_IMG}

# Prepare for arm64 building
prepare-build:
Expand Down
3 changes: 2 additions & 1 deletion charts/fluent-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ maintainers:
email: joshbaird@gmail.com
dependencies:
- name: fluent-bit-crds
repository: https://fluent.github.io/helm-charts
repository: "file://../fluent-bit-crds"
version: 0.2.3
condition: fluentbit.enable && fluentbit.crdsEnable
- name: fluentd-crds
repository: "file://../fluentd-crds"
version: 0.2.1
condition: fluentd.enable && fluentd.crdsEnable
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
k8s.io/apimachinery v0.35.0
k8s.io/client-go v0.35.0
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
sigs.k8s.io/controller-runtime v0.23.0
sigs.k8s.io/yaml v1.6.0
)
Expand Down Expand Up @@ -98,7 +99,6 @@ require (
k8s.io/apiserver v0.35.0 // indirect
k8s.io/component-base v0.35.0 // indirect
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 // indirect
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
sigs.k8s.io/randfill v1.0.0 // indirect
Expand Down
260 changes: 260 additions & 0 deletions tests/e2e/fluentd/deployment_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
package fluentd

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

fluentdv1alpha1 "github.com/fluent/fluent-operator/v3/apis/fluentd/v1alpha1"
"github.com/fluent/fluent-operator/v3/apis/fluentd/v1alpha1/plugins/input"
"github.com/fluent/fluent-operator/v3/apis/fluentd/v1alpha1/plugins/output"
)

// generateRandomID creates a cryptographically random hex string
func generateRandomID() string {
b := make([]byte, 4)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}

var _ = Describe("Fluentd E2E Deployment Test", func() {
var cancel context.CancelFunc
var ctx context.Context

BeforeEach(func() {
// Create context with timeout to prevent hung tests
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Minute)
})

AfterEach(func() {
if cancel != nil {
cancel()
}
})

Describe("Deploying Fluentd CR", func() {
var (
fluentdCR *fluentdv1alpha1.Fluentd
fluentdConfig *fluentdv1alpha1.FluentdConfig
clusterOutput *fluentdv1alpha1.ClusterOutput
namespace string
)

BeforeEach(func() {
// Generate a unique namespace using crypto/rand for true isolation
namespace = fmt.Sprintf("fluentd-e2e-%s", generateRandomID())
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
},
}
// Handle case where namespace might already exist from crashed previous run
err := k8sClient.Create(ctx, ns)
if err != nil && !apierrors.IsAlreadyExists(err) {
Fail(fmt.Sprintf("Failed to create namespace: %v", err))
}

// Create Fluentd CR with proper GlobalInputs type
fluentdCR = &fluentdv1alpha1.Fluentd{
ObjectMeta: metav1.ObjectMeta{
Name: "fluentd-instance",
Namespace: namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "fluentd",
"app.kubernetes.io/instance": "fluentd-instance",
},
},
Spec: fluentdv1alpha1.FluentdSpec{
Replicas: ptr.To(int32(1)),
GlobalInputs: []input.Input{
{
Forward: &input.Forward{
Bind: ptr.To("0.0.0.0"),
Port: ptr.To(int32(24224)),
},
},
},
// Explicitly set image as operator doesn't provide a default yet
Image: "ghcr.io/fluent/fluent-operator/fluentd:v1.19.1",
// Use EmptyDir for buffers to avoid PVC provisioning issues in CI
BufferVolume: &fluentdv1alpha1.BufferVolume{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
FluentdCfgSelector: metav1.LabelSelector{
MatchLabels: map[string]string{
"config.fluentd.fluent.io/enabled": "true",
},
},
},
}

// Create a ClusterOutput for stdout (minimal working config)
clusterOutput = &fluentdv1alpha1.ClusterOutput{
ObjectMeta: metav1.ObjectMeta{
Name: "fluentd-output-stdout",
Labels: map[string]string{
"output.fluentd.fluent.io/enabled": "true",
},
},
Spec: fluentdv1alpha1.ClusterOutputSpec{
Outputs: []output.Output{
{
Stdout: &output.Stdout{},
},
},
},
}

// Create FluentdConfig to wire everything together
fluentdConfig = &fluentdv1alpha1.FluentdConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "fluentd-config",
Namespace: namespace,
Labels: map[string]string{
"config.fluentd.fluent.io/enabled": "true",
},
},
Spec: fluentdv1alpha1.FluentdConfigSpec{
ClusterOutputSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"output.fluentd.fluent.io/enabled": "true",
},
},
},
}

DeferCleanup(func() {
// Use a fresh context for cleanup to avoid timeout issues
cleanupCtx := context.Background()

// Delete all CRs (ignore NotFound errors for idempotency)
if clusterOutput != nil {
_ = client.IgnoreNotFound(k8sClient.Delete(cleanupCtx, clusterOutput))
}
if fluentdConfig != nil {
_ = client.IgnoreNotFound(k8sClient.Delete(cleanupCtx, fluentdConfig))
}
if fluentdCR != nil {
_ = client.IgnoreNotFound(k8sClient.Delete(cleanupCtx, fluentdCR))
}

// Wait for StatefulSet to be deleted (find by label, not name)
if fluentdCR != nil {
Eventually(func() bool {
stsList := &appsv1.StatefulSetList{}
err := k8sClient.List(cleanupCtx, stsList, client.InNamespace(namespace))
if err != nil {
return false
}
// StatefulSet should be gone
return len(stsList.Items) == 0
}, time.Minute, time.Second).Should(BeTrue(), "StatefulSet should be deleted")
}

// Delete namespace and wait for it to be gone
ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
_ = client.IgnoreNotFound(k8sClient.Delete(cleanupCtx, ns))
Eventually(func() bool {
err := k8sClient.Get(cleanupCtx, types.NamespacedName{Name: namespace}, &corev1.Namespace{})
return apierrors.IsNotFound(err)
}, 2*time.Minute, time.Second).Should(BeTrue(), "Namespace should be deleted")
})
})

It("Should create a healthy Fluentd StatefulSet", func() {
By("Creating the ClusterOutput")
Expect(k8sClient.Create(ctx, clusterOutput)).To(Succeed())

By("Creating the FluentdConfig")
Expect(k8sClient.Create(ctx, fluentdConfig)).To(Succeed())

By("Creating the Fluentd Custom Resource")
Expect(k8sClient.Create(ctx, fluentdCR)).To(Succeed())

By("Verifying StatefulSet creation and readiness")

// Find StatefulSet by label instead of assuming name
Eventually(func() bool {
stsList := &appsv1.StatefulSetList{}
err := k8sClient.List(ctx, stsList,
client.InNamespace(namespace),
client.MatchingLabels{"app.kubernetes.io/name": fluentdCR.Name})
if err != nil || len(stsList.Items) == 0 {
return false
}
return true
}, time.Minute, time.Second).Should(BeTrue(), "StatefulSet should be created by the Operator")

// Check for Ready Replicas (Real Workload Health)
Eventually(func() int32 {
// Refresh StatefulSet status
stsList := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, stsList,
client.InNamespace(namespace),
client.MatchingLabels{"app.kubernetes.io/name": fluentdCR.Name})
if len(stsList.Items) == 0 {
return 0
}
return stsList.Items[0].Status.ReadyReplicas
}, 5*time.Minute, 2*time.Second).Should(Equal(*fluentdCR.Spec.Replicas),
"StatefulSet should have expected number of ready replicas")

By("Verifying Pod Status and Container Readiness")
Eventually(func() bool {
podList := &corev1.PodList{}
// List pods owned by the StatefulSet
err := k8sClient.List(ctx, podList, client.InNamespace(namespace))
if err != nil {
return false
}
for _, pod := range podList.Items {
// Check if pod is owned by a StatefulSet
for _, owner := range pod.OwnerReferences {
if owner.Kind == "StatefulSet" {
// Verify pod is running
if pod.Status.Phase != corev1.PodRunning {
continue
}

// Check ALL containers are ready (not just pod condition)
allContainersReady := true
for _, cs := range pod.Status.ContainerStatuses {
if !cs.Ready {
allContainersReady = false
break
}
}

// Also check pod ready condition
podReady := false
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
podReady = true
break
}
}

if allContainersReady && podReady {
return true
}
}
}
}
return false
}, 5*time.Minute, 2*time.Second).Should(BeTrue(),
"At least one Fluentd Pod should be Running with all containers Ready")
})
})
})
7 changes: 7 additions & 0 deletions tests/scripts/fluentd_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ function build_image() {
pushd "$PROJECT_ROOT" >/dev/null
make build-op-amd64 -e "FO_IMG=$IMAGE_NAME:$IMAGE_TAG"
kind load docker-image "$IMAGE_NAME:$IMAGE_TAG" --name "$KIND_CLUSTER"

# Build and load Fluentd image for e2e tests
local fd_img="${FD_IMG:-ghcr.io/fluent/fluent-operator/fluentd:v1.19.1}"
echo "Building Fluentd image for e2e tests…"
make build-fd-amd64 -e "FD_IMG=$fd_img"
kind load docker-image "$fd_img" --name "$KIND_CLUSTER"

popd >/dev/null
}

Expand Down
Loading