Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/ateapi/internal/controlapi/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/agent-substrate/substrate/cmd/ateapi/internal/workercache"
"github.com/agent-substrate/substrate/internal/ateinterceptors"
"github.com/agent-substrate/substrate/internal/envtestbins"
"github.com/agent-substrate/substrate/internal/installdefaults"
"github.com/agent-substrate/substrate/internal/proto/ateletpb"
atev1alpha1 "github.com/agent-substrate/substrate/pkg/api/v1alpha1"
"github.com/agent-substrate/substrate/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -270,7 +271,7 @@ func setupTest(t *testing.T, ns string) *testContext {

// 3. Initialize Informers
workerFactory, workerInformer := WorkerPodInformer(k8sClient)
ateletFactory, ateletInformer := AteletInformer(k8sClient)
ateletFactory, ateletInformer := AteletInformer(k8sClient, installdefaults.SystemNamespace)

substrateInformerFactory := externalversions.NewSharedInformerFactory(substrateClient, 0)
actorTemplateLister := substrateInformerFactory.Api().V1alpha1().ActorTemplates().Lister()
Expand Down
6 changes: 3 additions & 3 deletions cmd/ateapi/internal/controlapi/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import (
)

const (
ateletNamespace = "ate-system"
byNamespaceAndName = "by-namespace-and-name"
byWorkerPool = "by-worker-pool"
byNode = "by-node"
workerPodLabel = "ate.dev/worker-pool"
)

// AteletInformer creates a SharedInformerFactory and SharedIndexInformer for Atelet pods.
func AteletInformer(kc kubernetes.Interface) (informers.SharedInformerFactory, cache.SharedIndexInformer) {
// AteletInformer creates a SharedInformerFactory and SharedIndexInformer for
// Atelet pods in the given namespace.
func AteletInformer(kc kubernetes.Interface, ateletNamespace string) (informers.SharedInformerFactory, cache.SharedIndexInformer) {
factory := informers.NewSharedInformerFactoryWithOptions(kc, 0,
informers.WithNamespace(ateletNamespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
Expand Down
8 changes: 7 additions & 1 deletion cmd/ateapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/agent-substrate/substrate/cmd/ateapi/internal/store/ateredis"
"github.com/agent-substrate/substrate/cmd/ateapi/internal/workercache"
"github.com/agent-substrate/substrate/internal/ateinterceptors"
"github.com/agent-substrate/substrate/internal/installdefaults"
"github.com/agent-substrate/substrate/internal/serverboot"
"github.com/agent-substrate/substrate/internal/version"
"github.com/agent-substrate/substrate/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -122,8 +123,13 @@ func main() {
workerPoolLister := ateFactory.Api().V1alpha1().WorkerPools().Lister()
sandboxConfigLister := ateFactory.Api().V1alpha1().SandboxConfigs().Lister()

// atelet shares ateapi's namespace in every supported deployment topology,
// so we read it from Kubernetes' downward API rather than expose a flag.
ateletNamespace := installdefaults.NamespaceFromPodEnv()
slog.InfoContext(ctx, "Resolved atelet namespace", slog.String("atelet-namespace", ateletNamespace))

workerPodInformerFactory, workerPodInformer := controlapi.WorkerPodInformer(clientset)
ateletPodInformerFactory, ateletPodInformer := controlapi.AteletInformer(clientset)
ateletPodInformerFactory, ateletPodInformer := controlapi.AteletInformer(clientset, ateletNamespace)

syncer := controlapi.NewWorkerPoolSyncer(redisPersistence, workerPodInformer)
syncer.Start(ctx)
Expand Down
22 changes: 18 additions & 4 deletions cmd/atenet/internal/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/config"

"github.com/agent-substrate/substrate/cmd/atenet/internal/dns"
"github.com/agent-substrate/substrate/internal/installdefaults"
)

type DnsConfig struct {
LogLevel string
Kubeconfig string
ReconcileInterval time.Duration
CorefilePath string
RouterServiceName string
DNSServiceName string
}

func NewDnsCmd() *cobra.Command {
Expand Down Expand Up @@ -86,11 +89,20 @@ func NewDnsCmd() *cobra.Command {
return fmt.Errorf("failed to initialize cluster client: %w", err)
}

// atenet shares its namespace with atenet-router and substrate's
// CoreDNS in every supported deployment topology, so we read it
// from Kubernetes' downward API rather than expose a flag.
systemNamespace := installdefaults.NamespaceFromPodEnv()
slog.InfoContext(ctx, "Resolved system namespace", slog.String("system-namespace", systemNamespace))

dnsController := &dns.Controller{
Client: k8sClient,
Interval: cfg.ReconcileInterval,
CorefilePath: cfg.CorefilePath,
Reloader: dns.NewConfigReloader(),
Client: k8sClient,
Interval: cfg.ReconcileInterval,
CorefilePath: cfg.CorefilePath,
Reloader: dns.NewConfigReloader(),
SystemNamespace: systemNamespace,
RouterServiceName: cfg.RouterServiceName,
DNSServiceName: cfg.DNSServiceName,
}

slog.InfoContext(ctx, "Starting DNS Controller subsystem")
Expand All @@ -102,6 +114,8 @@ func NewDnsCmd() *cobra.Command {
cmd.Flags().StringVar(&cfg.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig configuration file")
cmd.Flags().DurationVar(&cfg.ReconcileInterval, "interval", 10*time.Second, "Interval for reconciling DNS configurations")
cmd.Flags().StringVar(&cfg.CorefilePath, "corefile-path", "/etc/coredns/Corefile", "Path to the local Corefile configuration on shared volume")
cmd.Flags().StringVar(&cfg.RouterServiceName, "router-service-name", installdefaults.RouterServiceName, "Service name of the atenet-router. Override when the deployment renames the Service.")
cmd.Flags().StringVar(&cfg.DNSServiceName, "dns-service-name", installdefaults.DNSServiceName, "Service name of substrate's CoreDNS. Override when the deployment renames the Service.")

return cmd
}
35 changes: 21 additions & 14 deletions cmd/atenet/internal/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,23 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
// serviceName is the name of the CoreDNS service.
serviceName = "dns"
systemNamespace = "ate-system"
)

// Controller manages the DNS configuration for the ATE.
type Controller struct {
Client client.Client
Interval time.Duration
CorefilePath string
Reloader ConfigReloader

// SystemNamespace is the namespace where atenet-router and the substrate
// CoreDNS Service live. Defaults to installdefaults.SystemNamespace.
SystemNamespace string
// RouterServiceName is the Service name of the atenet-router that the
// CoreDNS Corefile forwards actor traffic to. Defaults to
// installdefaults.RouterServiceName.
RouterServiceName string
// DNSServiceName is the Service name of substrate's CoreDNS. Defaults to
// installdefaults.DNSServiceName.
DNSServiceName string
}

// Run the DNS orchestration loop until ctx is canceled.
Expand All @@ -71,14 +76,15 @@ func (c *Controller) Run(ctx context.Context) error {
func (c *Controller) reconcile(ctx context.Context) error {
slog.DebugContext(ctx, "Reconciling DNS orchestration configuration...")

// 1. Get the ClusterIP of atenet-router in ate-system namespace
// 1. Get the ClusterIP of the atenet-router Service in the substrate namespace.
routerSvc := &corev1.Service{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: "atenet-router", Namespace: systemNamespace}, routerSvc); err != nil {
if err := c.Client.Get(ctx, types.NamespacedName{Name: c.RouterServiceName, Namespace: c.SystemNamespace}, routerSvc); err != nil {
if errors.IsNotFound(err) {
slog.WarnContext(ctx, "atenet-router service not found, skipping until it is available")
slog.WarnContext(ctx, "atenet-router service not found, skipping until it is available",
slog.String("name", c.RouterServiceName), slog.String("namespace", c.SystemNamespace))
return nil
}
return fmt.Errorf("failed to get atenet-router service: %w", err)
return fmt.Errorf("failed to get atenet-router service %s/%s: %w", c.SystemNamespace, c.RouterServiceName, err)
}

routerIP := routerSvc.Spec.ClusterIP
Expand All @@ -87,14 +93,15 @@ func (c *Controller) reconcile(ctx context.Context) error {
return nil
}

// 2. Get the ClusterIP of dns service in ate-system namespace
// 2. Get the ClusterIP of substrate's CoreDNS Service in the same namespace.
dnsSvc := &corev1.Service{}
if err := c.Client.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: systemNamespace}, dnsSvc); err != nil {
if err := c.Client.Get(ctx, types.NamespacedName{Name: c.DNSServiceName, Namespace: c.SystemNamespace}, dnsSvc); err != nil {
if errors.IsNotFound(err) {
slog.WarnContext(ctx, "dns service not found, skipping until it is available")
slog.WarnContext(ctx, "dns service not found, skipping until it is available",
slog.String("name", c.DNSServiceName), slog.String("namespace", c.SystemNamespace))
return nil
}
return fmt.Errorf("failed to get dns service: %w", err)
return fmt.Errorf("failed to get dns service %s/%s: %w", c.SystemNamespace, c.DNSServiceName, err)
}

dnsIP := dnsSvc.Spec.ClusterIP
Expand Down
24 changes: 16 additions & 8 deletions cmd/atenet/internal/dns/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/agent-substrate/substrate/internal/installdefaults"
)

type mockConfigReloader struct {
Expand Down Expand Up @@ -94,10 +96,13 @@ func TestReconcile(t *testing.T) {

reloader := &mockConfigReloader{}
controller := &Controller{
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: reloader,
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: reloader,
SystemNamespace: installdefaults.SystemNamespace,
RouterServiceName: installdefaults.RouterServiceName,
DNSServiceName: installdefaults.DNSServiceName,
}

// Run one reconciliation loop
Expand Down Expand Up @@ -185,10 +190,13 @@ func TestReconcileKubeDNSNotFound(t *testing.T) {
Build()

controller := &Controller{
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: &mockConfigReloader{},
Client: client,
Interval: 1 * time.Second,
CorefilePath: corefilePath,
Reloader: &mockConfigReloader{},
SystemNamespace: installdefaults.SystemNamespace,
RouterServiceName: installdefaults.RouterServiceName,
DNSServiceName: installdefaults.DNSServiceName,
}

ctx := context.Background()
Expand Down
2 changes: 2 additions & 0 deletions cmd/atenet/internal/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/spf13/cobra"

"github.com/agent-substrate/substrate/cmd/atenet/internal/router"
"github.com/agent-substrate/substrate/internal/installdefaults"
)

func NewRouterCmd() *cobra.Command {
Expand All @@ -44,6 +45,7 @@ func NewRouterCmd() *cobra.Command {
cmd.Flags().StringVar(&cfg.MetricsAddr, "metrics-listen-addr", ":9090", "Address and port the prometheus metrics server should listen on.")
cmd.Flags().BoolVar(&cfg.Standalone, "standalone", false, "Run in standalone mode, bypassing creation of managed deployment and services in Kubernetes cluster")
cmd.Flags().StringVar(&cfg.Namespace, "namespace", "default", "Target operations namespace")
cmd.Flags().StringVar(&cfg.RouterServiceName, "router-service-name", installdefaults.RouterServiceName, "Service name of this atenet-router in the operations namespace. Override when the deployment renames the Service.")
cmd.Flags().StringVar(&cfg.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig configuration file")
cmd.Flags().StringVar(&cfg.AteapiAddr, "ateapi-address", "api.ate-system.svc:443", "gRPC host address of the cluster ateapi Control instance")
cmd.Flags().IntVar(&cfg.HttpPort, "port-http", 8080, "TCP port for workload traffic entering through the Envoy Router")
Expand Down
12 changes: 8 additions & 4 deletions cmd/atenet/internal/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,14 @@ func init() {

// RouterConfig holds deployment setup and endpoint options for the router node instance.
type RouterConfig struct {
Standalone bool
Namespace string
Kubeconfig string
AteapiAddr string
Standalone bool
Namespace string
// RouterServiceName is the Service name of this atenet-router in the
// release namespace, used by /statusz to look up its own ClusterIP.
// Defaults to installdefaults.RouterServiceName.
RouterServiceName string
Kubeconfig string
AteapiAddr string
HttpPort int
XdsPort int
ExtprocPort int
Expand Down
2 changes: 1 addition & 1 deletion cmd/atenet/internal/router/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *RouterServer) getRouterIP(ctx context.Context) string {
return "Standalone Mode (No Cluster IP)"
}

svc, err := s.clientset.CoreV1().Services(s.cfg.Namespace).Get(ctx, "atenet-router", metav1.GetOptions{})
svc, err := s.clientset.CoreV1().Services(s.cfg.Namespace).Get(ctx, s.cfg.RouterServiceName, metav1.GetOptions{})
if err != nil {
return fmt.Sprintf("Lookup Failed: %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions internal/ateclient/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"sync"

"github.com/agent-substrate/substrate/internal/installdefaults"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -130,22 +131,22 @@ func dialPortForward(ctx context.Context, kubeconfigPath, k8sContext string, tra
return nil, fmt.Errorf("failed to create k8s client: %w", err)
}

// Look up the 'api' Service to dynamically get its pod selector
svc, err := clientset.CoreV1().Services("ate-system").Get(ctx, "api", metav1.GetOptions{})
// Look up the ateapi Service to dynamically get its pod selector.
svc, err := clientset.CoreV1().Services(installdefaults.SystemNamespace).Get(ctx, installdefaults.APIServiceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get api service: %w", err)
return nil, fmt.Errorf("failed to get ateapi service %s/%s: %w", installdefaults.SystemNamespace, installdefaults.APIServiceName, err)
}
selector := labels.SelectorFromSet(svc.Spec.Selector).String()

// Find the pods backing the service
pods, err := clientset.CoreV1().Pods("ate-system").List(ctx, metav1.ListOptions{
pods, err := clientset.CoreV1().Pods(installdefaults.SystemNamespace).List(ctx, metav1.ListOptions{
LabelSelector: selector,
})
if err != nil {
return nil, fmt.Errorf("failed to list ateapi pods: %w", err)
}
if len(pods.Items) == 0 {
return nil, fmt.Errorf("no ate-api-server pods found in ate-system namespace")
return nil, fmt.Errorf("no ate-api-server pods found in %q namespace", installdefaults.SystemNamespace)
}
targetPod := pods.Items[0]

Expand Down
47 changes: 47 additions & 0 deletions internal/installdefaults/installdefaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package installdefaults holds the default namespace and Service names
// that match the canonical install layout in manifests/ate-install/.
// Binaries use these as flag defaults; deployments that diverge from
// the canonical layout pass actual values via the corresponding flags.
package installdefaults

import "os"

const (
// SystemNamespace is the namespace where substrate's control-plane
// components and the atelet DaemonSet run.
SystemNamespace = "ate-system"
// APIServiceName is the Service name of ate-api-server.
APIServiceName = "api"
// RouterServiceName is the Service name of atenet-router.
RouterServiceName = "atenet-router"
// DNSServiceName is the Service name of substrate's CoreDNS.
DNSServiceName = "dns"

// PodNamespaceEnv is the conventional env var name for the namespace
// a pod is running in, exposed via Kubernetes' downward API.
PodNamespaceEnv = "POD_NAMESPACE"
)

// NamespaceFromPodEnv returns the namespace from the PodNamespaceEnv env
// var when set (typically populated via Kubernetes' downward API), and
// falls back to SystemNamespace for non-k8s invocations (tests, local dev).
func NamespaceFromPodEnv() string {
if ns := os.Getenv(PodNamespaceEnv); ns != "" {
return ns
}
return SystemNamespace
}
5 changes: 5 additions & 0 deletions manifests/ate-install/atenet-dns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ spec:
- "--log-level=debug"
- "--interval=10s"
- "--corefile-path=/etc/coredns/Corefile"
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: dns-config-volume
mountPath: /etc/coredns
Expand Down