Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: E2E Integration Tests

on:
pull_request:
branches:
- develop
- main
workflow_dispatch:

jobs:
e2e:
runs-on: self-hosted
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

- name: Set up Go
uses: actions/setup-go@4a3601121dd01d1626a1e23e37211e3254c1c06c # v6.4.0
with:
go-version: stable
cache: true

- name: Download Go modules
run: go mod download

- name: Install gotestsum
run: go install gotest.tools/gotestsum@latest

- name: Download envtest binaries
run: |
ASSETS=$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use --bin-dir /tmp/envtest-bins -p path)
echo "KUBEBUILDER_ASSETS=$ASSETS" >> $GITHUB_ENV
echo "envtest binaries: $ASSETS"

- name: Run E2E integration tests
run: |
gotestsum \
--format="github-actions" \
--hide-summary="skipped" \
--format-hide-empty-pkg \
--rerun-fails="0" \
-- -count=1 -timeout=120s ./test/integration/...
4 changes: 4 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ test-unit: generate
test-integration: generate
go run gotest.tools/gotestsum@latest --format="testname" --hide-summary="skipped" --format-hide-empty-pkg --rerun-fails="0" -- -count=1 ./test/...

# Execute end-to-end integration tests (downloads envtest binaries on first run)
test-e2e: generate
KUBEBUILDER_ASSETS=$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use -p path) go run gotest.tools/gotestsum@latest --format="testname" --hide-summary="skipped" --format-hide-empty-pkg --rerun-fails="0" -- -count=1 -timeout=120s ./test/integration/...

# Execute golangci-lint
golangci-lint: generate
go run github.com/golangci/golangci-lint/cmd/golangci-lint@latest run '--fast=false' --sort-results '--max-same-issues=0' '--timeout=1h' ./src/...
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
)

require (
github.com/alicebob/miniredis/v2 v2.38.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.2 // indirect
Expand All @@ -52,6 +53,7 @@ require (
github.com/standard-webhooks/standard-webhooks/libraries v0.0.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.yaml.in/yaml/v4 v4.0.0-rc.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/streaming v0.36.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ github.com/alecthomas/kong v1.15.0 h1:BVJstKbpO73zKpmIu+m/aLRrNmWwxXPIGTNin9VmLV
github.com/alecthomas/kong v1.15.0/go.mod h1:wrlbXem1CWqUV5Vbmss5ISYhsVPkBb1Yo7YKJghju2I=
github.com/alecthomas/repr v0.5.2 h1:SU73FTI9D1P5UNtvseffFSGmdNci/O6RsqzeXJtP0Qs=
github.com/alecthomas/repr v0.5.2/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/alicebob/miniredis/v2 v2.38.0 h1:nZAzCR+Lj+Vxk4ZXzm2NuKq2O33RXj1XxJ2e2uP9jiw=
github.com/alicebob/miniredis/v2 v2.38.0/go.mod h1:TcL7YfarKPGDAthEtl5NBeHZfeUQj6OXMm/+iu5cLMM=
github.com/anthropics/anthropic-sdk-go v1.50.1 h1:XTd1RkdeHCPusPpzcBY5RIWj/WW6ZktjftxrHvQBJfU=
github.com/anthropics/anthropic-sdk-go v1.50.1/go.mod h1:3EfIfmFqxH6rbiLcIP4tPFyXL/IHakx2wDG4OU+TIEI=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -389,6 +391,8 @@ github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ=
github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/bridges/prometheus v0.67.0 h1:dkBzNEAIKADEaFnuESzcXvpd09vxvDZsOjx11gjUqLk=
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/baseSystems.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func initializeBaseSystems(
assert.Assert(cmdLogger != nil)

clientProvider := k8sclient.NewK8sClientProvider(logManagerModule.CreateLogger("client-provider"), configModule)
if !clientProvider.RunsInCluster() {
if !clientProvider.RunsInCluster() && configModule.Get("MO_SKIP_IMPERSONATION") != "true" {
impersonated, err := clientProvider.WithImpersonate(v1.Subject{
Kind: "ServiceAccount",
Name: "mogenius-operator-service-account-app",
Expand Down
131 changes: 73 additions & 58 deletions src/cmd/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"log/slog"
"mogenius-operator/src/ai"
Expand Down Expand Up @@ -217,86 +218,100 @@ func printReady(version string, addr string, startTime time.Time) {
fmt.Fprintf(os.Stderr, "%s\n\n", separator)
}

func RunCluster(logManagerModule logging.SlogManager, configModule *config.Config, cmdLogger *slog.Logger, valkeyLogChannel chan logging.LogLine) {
go func() {
defer shutdown.SendShutdownSignal(true)
startTime := time.Now()
// startClusterSystems initializes and starts all cluster services. It logs each
// startup step, connects the WebSocket servers, and prints the ready banner.
// Returns an error if any startup step fails.
func startClusterSystems(logManagerModule logging.SlogManager, configModule *config.Config, cmdLogger *slog.Logger, valkeyLogChannel chan logging.LogLine, startTime time.Time) error {
configModule.Validate()

configModule.Validate()
base := initializeBaseSystems(logManagerModule, configModule, cmdLogger)
logStep("Base systems initialized (kubernetes client, valkey, store)")

base := initializeBaseSystems(logManagerModule, configModule, cmdLogger)
logStep("Base systems initialized (kubernetes client, valkey, store)")
systems := initializeClusterSystems(base, logManagerModule, configModule, valkeyLogChannel)
logStep("Cluster systems initialized (websocket, monitors, helm, ai)")

systems := initializeClusterSystems(base, logManagerModule, configModule, valkeyLogChannel)
logStep("Cluster systems initialized (websocket, monitors, helm, ai)")
systems.versionModule.PrintVersionInfo()

systems.versionModule.PrintVersionInfo()
if err := systems.mocore.Initialize(); err != nil {
return fmt.Errorf("core initialize: %w", err)
}
logStep("Core initialized (valkey, cluster secret, CRDs)")

err := systems.mocore.Initialize()
if err != nil {
cmdLogger.Error("failed to initialize kubernetes resources", "error", err)
return
}
logStep("Core initialized (valkey, cluster secret, CRDs)")
systems.httpApi.Run()
logStep("HTTP API server started on " + configModule.Get("MO_HTTP_ADDR"))

systems.httpApi.Run()
logStep("HTTP API server started on " + configModule.Get("MO_HTTP_ADDR"))
systems.socketApi.Run()
logStep("Socket API started")

systems.socketApi.Run()
logStep("Socket API started")
systems.podStatsCollector.Run()
logStep("Pod stats collector started")

systems.podStatsCollector.Run()
logStep("Pod stats collector started")
systems.nodeMetricsCollector.Orchestrate()
logStep("Node metrics collector started")

systems.nodeMetricsCollector.Orchestrate()
logStep("Node metrics collector started")
systems.valkeyLoggerService.Run()
logStep("Valkey logger started")

systems.valkeyLoggerService.Run()
logStep("Valkey logger started")
systems.dbstatsService.Run()
logStep("DB stats service started")

systems.dbstatsService.Run()
logStep("DB stats service started")
systems.leaderElector.OnLeading(func() {
systems.reconciler.Start()
logStep("Reconciler started")
})
systems.leaderElector.OnLeadingEnded(func() {
systems.reconciler.Stop()
logStep("Reconciler stopped")
})
systems.leaderElector.Run()
logStep("Leader elector started")

systems.leaderElector.OnLeading(func() {
systems.reconciler.Start()
logStep("Reconciler started")
})
systems.aiManager.Run()
logStep("AI manager started")

systems.leaderElector.OnLeadingEnded(func() {
// services have to be started before this otherwise watcher events will get missing
if err := mokubernetes.WatchStoreResources(systems.watcherModule, systems.aiManager, systems.eventConnectionClient); err != nil {
return fmt.Errorf("start watcher: %w", err)
}
logStep("Kubernetes resource watcher started")

systems.reconciler.Stop()
logStep("Reconciler stopped")
})
// Connect WebSocket clients last so all handlers are registered first.
systems.mocore.InitializeWebsocketEventServer()
logStep("WebSocket event server connected")

systems.leaderElector.Run()
logStep("Leader elector started")
systems.mocore.InitializeWebsocketApiServers()
logStep("WebSocket API server(s) connected")

systems.aiManager.Run()
logStep("AI manager started")
printReady(systems.versionModule.Version, configModule.Get("MO_HTTP_ADDR"), startTime)

// services have to be started before this otherwise watcher events will get missing
err = mokubernetes.WatchStoreResources(systems.watcherModule, systems.aiManager, systems.eventConnectionClient)
if err != nil {
cmdLogger.Error("failed to start watcher", "error", err)
return nil
}

func RunCluster(logManagerModule logging.SlogManager, configModule *config.Config, cmdLogger *slog.Logger, valkeyLogChannel chan logging.LogLine) {
go func() {
defer shutdown.SendShutdownSignal(true)
if err := startClusterSystems(logManagerModule, configModule, cmdLogger, valkeyLogChannel, time.Now()); err != nil {
cmdLogger.Error("failed to start cluster", "error", err)
return
}
logStep("Kubernetes resource watcher started")
select {}
}()

// connect socket after everything is ready
systems.mocore.InitializeWebsocketEventServer()
logStep("WebSocket event server connected")
shutdown.Listen()
}

systems.mocore.InitializeWebsocketApiServers()
logStep("WebSocket API server(s) connected")
// RunClusterWithContext starts the full operator and blocks until ctx is cancelled or
// a fatal startup error occurs. Unlike RunCluster it does not call shutdown.Listen(),
// making it suitable for test harnesses and programmatic embedding.
func RunClusterWithContext(ctx context.Context, logManagerModule logging.SlogManager, configModule *config.Config, cmdLogger *slog.Logger, valkeyLogChannel chan logging.LogLine) error {
if err := startClusterSystems(logManagerModule, configModule, cmdLogger, valkeyLogChannel, time.Now()); err != nil {
return err
}

printReady(
systems.versionModule.Version,
configModule.Get("MO_HTTP_ADDR"),
startTime,
)
<-ctx.Done()

select {}
}()
// Run registered shutdown hooks to clean up goroutines (WebSocket clients, watchers, etc.).
<-shutdown.ExecuteShutdownHandlers()

shutdown.Listen()
return nil
}
12 changes: 12 additions & 0 deletions src/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,4 +432,16 @@ func LoadConfigDeclarations(configModule *config.Config) {
return nil
},
})
configModule.Declare(config.ConfigDeclaration{
Key: "MO_SKIP_IMPERSONATION",
DefaultValue: new("false"),
Description: new("skip service-account impersonation; set to true in local/test environments that lack the operator SA"),
Validate: func(value string) error {
_, err := strconv.ParseBool(value)
if err != nil {
return fmt.Errorf("'MO_SKIP_IMPERSONATION' needs to be a boolean: %s", err.Error())
}
return nil
},
})
}
35 changes: 3 additions & 32 deletions src/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,18 +277,16 @@ func SetResourceWithIndex(
// the discovered shard matches the primary key shape exactly.
client := valkey.GetValkeyClient()
cmds := []vgo.Completed{
client.B().Multi().Build(),
client.B().Set().Key(primaryKey).Value(string(payload)).ExSeconds(ttlSeconds).Build(),
client.B().Zadd().Key(byCreationKey).ScoreMember().ScoreMember(creationScore, name).Build(),
client.B().Expire().Key(byCreationKey).Seconds(ttlSeconds).Build(),
client.B().Zadd().Key(byNameKey).ScoreMember().ScoreMember(0, name).Build(),
client.B().Expire().Key(byNameKey).Seconds(ttlSeconds).Build(),
client.B().Sadd().Key(nsRegistryKey).Member(namespace).Build(),
client.B().Expire().Key(nsRegistryKey).Seconds(ttlSeconds).Build(),
client.B().Exec().Build(),
}

if err := checkMultiExec(client.DoMulti(valkey.GetContext(), cmds...)); err != nil {
if err := checkPipeline(client.DoMulti(valkey.GetContext(), cmds...)); err != nil {
return fmt.Errorf("set resource with index pipeline: %w", err)
}
return nil
Expand All @@ -311,50 +309,23 @@ func DeleteResourceWithIndex(
// to an empty/expired ZSET shard (ZCARD 0) that the reader skips.
client := valkey.GetValkeyClient()
cmds := []vgo.Completed{
client.B().Multi().Build(),
client.B().Del().Key(primaryKey).Build(),
client.B().Zrem().Key(byCreationKey).Member(name).Build(),
client.B().Zrem().Key(byNameKey).Member(name).Build(),
client.B().Exec().Build(),
}

if err := checkMultiExec(client.DoMulti(valkey.GetContext(), cmds...)); err != nil {
if err := checkPipeline(client.DoMulti(valkey.GetContext(), cmds...)); err != nil {
return fmt.Errorf("delete resource with index pipeline: %w", err)
}
return nil
}

// checkMultiExec validates the responses of a MULTI ... EXEC pipeline sent via
// DoMulti. The per-response Error() check catches connection failures,
// queue-time command errors and EXECABORT (where the EXEC reply itself is an
// error). It additionally walks the EXEC reply array so per-command runtime
// errors (e.g. a WRONGTYPE that only surfaces during EXEC) are not silently
// swallowed - those are nested inside the array element, not on the array
// result itself.
func checkMultiExec(resps []vgo.ValkeyResult) error {
func checkPipeline(resps []vgo.ValkeyResult) error {
for _, resp := range resps {
if err := resp.Error(); err != nil {
return err
}
}
if len(resps) == 0 {
return nil
}
// The last response is the EXEC reply: an array with one entry per queued
// command. A nil reply means the transaction was discarded (only happens
// with WATCH, which we don't use, so treat it as a failure).
execResults, err := resps[len(resps)-1].ToArray()
if err != nil {
if errors.Is(err, vgo.Nil) {
return fmt.Errorf("transaction discarded")
}
return err
}
for _, r := range execResults {
if err := r.Error(); err != nil {
return err
}
}
return nil
}

Expand Down
12 changes: 10 additions & 2 deletions src/valkeyclient/valkeyclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (self *valkeyClient) Connect() error {
valkeyAddr := valkeyHost + ":" + valkeyPort
valkeyPwd := self.config.Get("MO_VALKEY_PASSWORD")

client, err := valkeyclient.NewClient(valkeyclient.ClientOption{
baseOpts := valkeyclient.ClientOption{
InitAddress: []string{valkeyAddr},
Password: valkeyPwd,
SelectDB: 0,
Expand All @@ -122,7 +122,15 @@ func (self *valkeyClient) Connect() error {
WriteBufferEachConn: 512 * (1 << 10), // 512 KiB
ConnWriteTimeout: 10 * time.Second,
MaxFlushDelay: 100 * time.Microsecond, // Reduce latency for pipelined commands
})
}
client, err := valkeyclient.NewClient(baseOpts)
if err != nil && strings.Contains(err.Error(), "DisableCache must be true") {
// Server does not support RESP3 client-side caching (e.g. older Redis, miniredis).
self.logger.Info("server does not support client-side caching, retrying without it", "addr", valkeyAddr)
noCache := baseOpts
noCache.DisableCache = true
client, err = valkeyclient.NewClient(noCache)
}
if err != nil {
self.logger.Info("connection to Valkey failed", "valkeyAddr", valkeyAddr, "error", err)
return fmt.Errorf("could not connect to Valkey: %s", err)
Expand Down
Loading