Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ endef

.PHONY: docker-build-all
docker-build-all:
make -j $(nproc) docker-build-controller-manager docker-build-gateway-plugins docker-build-runtime docker-build-metadata-service ## Build all docker images
make -j $(nproc) docker-build-controller-manager docker-build-gateway-plugins docker-build-runtime docker-build-metadata-service docker-build-kvcache-watcher ## Build all docker images

.PHONY: docker-build-controller-manager
docker-build-controller-manager: ## Build docker image with the manager.
Expand All @@ -171,9 +171,13 @@ docker-build-runtime: ## Build docker image with the AI Runtime.
docker-build-metadata-service: ## Build docker image with the metadata-service.
$(call build_and_tag,metadata-service,Dockerfile.metadata)

.PHONY: docker-build-kvcache-watcher
docker-build-kvcache-watcher: ## Build docker image with the kvcache-watcher.
$(call build_and_tag,kvcache-watcher,Dockerfile.kvcache)

.PHONY: docker-push-all
docker-push-all:
make -j $(nproc) docker-push-controller-manager docker-push-gateway-plugins docker-push-runtime docker-push-metadata-service ## Push all docker images
make -j $(nproc) docker-push-controller-manager docker-push-gateway-plugins docker-push-runtime docker-push-metadata-service docker-push-kvcache-watcher ## Push all docker images

.PHONY: docker-push-controller-manager
docker-push-controller-manager: ## Push docker image with the manager.
Expand All @@ -191,6 +195,10 @@ docker-push-runtime: ## Push docker image with the AI Runtime.
docker-push-metadata-service: ## Push docker image with the metadata-service.
$(call push_image,metadata-service)

.PHONY: docker-push-kvcache-watcher
docker-push-kvcache-watcher: ## Push docker image with the kvcache-watcher.
$(call push_image,kvcache-watcher)

# PLATFORMS defines the target platforms for the manager image be built to provide support to multiple
# architectures. (i.e. make docker-buildx IMG=myregistry/mypoperator:0.0.1). To use this option you need to:
# - be able to use docker buildx. More info: https://docs.docker.com/build/buildx/
Expand Down
6 changes: 3 additions & 3 deletions api/orchestration/v1alpha1/kvcache_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type RedisConfig struct {
Image string `json:"image"`
Replicas int32 `json:"replicas"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
Storage MetadataStorage `json:"storage"`
Storage *MetadataStorage `json:"storage,omitempty"`
}

// EtcdConfig provides the configuration fields for deploying etcd.
Expand All @@ -59,7 +59,7 @@ type EtcdConfig struct {
// +kubebuilder:default:=1
Replicas int32 `json:"replicas"`
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
Storage MetadataStorage `json:"storage"`
Storage *MetadataStorage `json:"storage,omitempty"`
}

// MetadataStorage configures the persistent storage used by the metadata service.
Expand All @@ -83,7 +83,7 @@ type CacheSpec struct {
// +kubebuilder:default:="IfNotPresent"
ImagePullPolicy string `json:"imagePullPolicy,omitempty"`

// shared memory size for kvcach
// shared memory size for kvcache
// +kubebuilder:validation:Optional
// +kubebuilder:default:=""
SharedMemorySize string `json:"sharedMemorySize,omitempty"`
Expand Down
12 changes: 10 additions & 2 deletions api/orchestration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions build/container/Dockerfile.kvcache
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Build the manager binary
FROM golang:1.22 AS builder
ARG TARGETOS
ARG TARGETARCH

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum
# cache deps before building and copying source so that we don't need to re-download as much
# and so that source changes don't invalidate our downloaded layer
RUN go mod download

# Copy the go source
COPY cmd/ cmd/
COPY api/ api/
COPY pkg/ pkg/

# Build
# the GOARCH has not a default value to allow the binary be built according to the host where the command
# was called. For example, if we call make docker-build in a local env which has the Apple Silicon M1 SO
# the docker BUILDPLATFORM arg will be linux/arm64 when for Apple x86 it will be linux/amd64. Therefore,
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
RUN CGO_ENABLED=0 GOOS=${TARGETOS:-linux} GOARCH=${TARGETARCH} go build -a -o kvcache-watcher cmd/kvcache-watcher/main.go

# Use distroless as minimal base image to package the manager binary
# Refer to https://github.com/GoogleContainerTools/distroless for more details
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/kvcache-watcher .
USER 65532:65532

ENTRYPOINT ["/kvcache-watcher"]
216 changes: 216 additions & 0 deletions cmd/kvcache-watcher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
Copyright 2025 The Aibrix Team.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"strconv"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/redis/go-redis/v9"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

const DefaultKVCacheServerPort = 9600
const KVCacheLabelKeyIdentifier = "kvcache.orchestration.aibrix.ai/name"
const KVCacheLabelKeyRole = "kvcache.orchestration.aibrix.ai/role"
const KVCacheLabelValueRoleCache = "cache"
const KVCacheLabelValueRoleMetadata = "metadata"
const KVCacheLabelValueRoleKVWatcher = "kvwatcher"

type NodeInfo struct {
Name string `json:"name"`
Addr string `json:"addr"`
Port int `json:"port"`
}

type ClusterNodes struct {
Nodes []NodeInfo `json:"nodes"`
Version int64 `json:"version"`
}

func main() {
ctx := context.Background()

// read environment variables from env
namespace := os.Getenv("WATCH_NAMESPACE")
if namespace == "" {
namespace = "default"
}
kvClusterId := os.Getenv("WATCH_KVCACHE_CLUSTER") // e.g., "kvcache.aibrix.ai=llama4"

redisAddr := os.Getenv("REDIS_ADDR")
redisPass := os.Getenv("REDIS_PASSWORD")
// Database to be selected after connecting to the server.
redisDatabaseStr := os.Getenv("REDIS_DATABASE")
redisDatabase, err := strconv.Atoi(redisDatabaseStr)
if err != nil {
klog.Warningf("Conversion error: %v", err)
// Use default database
redisDatabase = 0
}

// create Kubernetes client
var config *rest.Config
var kubeConfig string
kFlag := flag.Lookup("kubeconfig")
if kFlag != nil {
kubeConfig = kFlag.Value.String()
} else {
klog.Warning("kubeconfig flag not defined")
}

if kubeConfig == "" {
klog.Info("using in-cluster configuration")
config, err = rest.InClusterConfig()
} else {
klog.Infof("using configuration from '%s'", kubeConfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
}
if err != nil {
klog.Fatalf("Failed to read kube configs: %v", err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create clientset: %v", err)
}

rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: redisPass,
DB: redisDatabase,
})

// Create informer factory
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 15*time.Second,
informers.WithNamespace(namespace),
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
if kvClusterId != "" {
kvClusterLabel := fmt.Sprintf("%s=%s", KVCacheLabelKeyIdentifier, kvClusterId)
kvClusterRoleLabel := fmt.Sprintf("%s=%s", KVCacheLabelKeyRole, KVCacheLabelValueRoleCache)
opts.LabelSelector = fmt.Sprintf("%s,%s", kvClusterLabel, kvClusterRoleLabel)
klog.Infof(opts.LabelSelector)
}
}),
)

podInformer := factory.Core().V1().Pods().Informer()

_, err = podInformer.AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { syncPods(ctx, rdb, podInformer, kvClusterId) },
UpdateFunc: func(oldObj, newObj interface{}) { syncPods(ctx, rdb, podInformer, kvClusterId) },
DeleteFunc: func(obj interface{}) { syncPods(ctx, rdb, podInformer, kvClusterId) },
})
if err != nil {
return
}

stopCh := make(chan struct{})
defer close(stopCh)

klog.Info("Starting pod registration watcher...")
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)

<-stopCh
}

func syncPods(ctx context.Context, rdb *redis.Client, informer cache.SharedIndexInformer, selector string) {
pods := informer.GetStore().List()

var nodeInfos []NodeInfo
for _, obj := range pods {
pod := obj.(*corev1.Pod)
if _, ok := pod.Labels[KVCacheLabelKeyIdentifier]; !ok {
continue
}

if value, ok := pod.Labels[KVCacheLabelKeyRole]; !ok || value != KVCacheLabelValueRoleCache {
continue
}

if pod.Status.Phase != corev1.PodRunning || pod.DeletionTimestamp != nil {
continue
}

ip := pod.Status.PodIP
if ip == "" {
continue
}
nodeInfos = append(nodeInfos, NodeInfo{
Name: pod.Name,
Addr: ip,
Port: DefaultKVCacheServerPort,
})
}

// get existing nodes
val, err := rdb.Get(ctx, "kvcache_nodes").Result()
// TODO: debug, whether it's correct or not.
klog.Infof("redis get result: %v", val)
existing := ClusterNodes{}
if err == nil {
_ = json.Unmarshal([]byte(val), &existing)
}

// only change if there's node update
if isNodeListEqual(existing.Nodes, nodeInfos) {
klog.Infof("No changes to node list, skipping update")
return
}

newData := ClusterNodes{
Nodes: nodeInfos,
Version: existing.Version + 1,
}
encoded, _ := json.Marshal(newData)
klog.Infof("nodes: %v, encoded %v", nodeInfos, encoded)
err = rdb.Set(ctx, "kvcache_nodes", encoded, 0).Err()
if err != nil {
klog.Errorf("Failed to write Redis: %v", err)
}
klog.Infof("Updated Redis node info with %d nodes", len(nodeInfos))
}

func isNodeListEqual(a, b []NodeInfo) bool {
if len(a) != len(b) {
return false
}
m := map[string]NodeInfo{}
for _, n := range a {
m[n.Name] = n
}
for _, n := range b {
if m[n.Name] != n {
return false
}
}
return true
}
12 changes: 8 additions & 4 deletions config/crd/orchestration/orchestration.aibrix.ai_kvcaches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
controller-gen.kubebuilder.io/version: v0.16.1
name: kvcaches.orchestration.aibrix.ai
spec:
group: orchestration.aibrix.ai
Expand Down Expand Up @@ -108,7 +108,7 @@ spec:
default: IfNotPresent
type: string
memory:
default: "2"
default: 2Gi
type: string
replicas:
default: 3
Expand Down Expand Up @@ -144,6 +144,8 @@ spec:
properties:
name:
type: string
request:
type: string
required:
- name
type: object
Expand Down Expand Up @@ -177,7 +179,6 @@ spec:
type: object
required:
- image
- storage
type: object
redis:
properties:
Expand All @@ -193,6 +194,8 @@ spec:
properties:
name:
type: string
request:
type: string
required:
- name
type: object
Expand Down Expand Up @@ -227,7 +230,6 @@ spec:
required:
- image
- replicas
- storage
type: object
type: object
replicas:
Expand All @@ -250,6 +252,8 @@ spec:
default: ClusterIP
type: string
type: object
required:
- replicas
type: object
status:
properties:
Expand Down
Loading
Loading