Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

Commit

Permalink
Support annotation pods that will no longer maintain status and will …
Browse files Browse the repository at this point in the history
…be left to others to modify it
  • Loading branch information
wzshiming committed Jul 19, 2022
1 parent d6a9c7b commit 1575f3b
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 90 deletions.
63 changes: 33 additions & 30 deletions cmd/fake-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,27 @@ import (
)

var (
cidr = getEnv("CIDR", "10.0.0.1/24")
nodeIP = net.ParseIP(getEnv("NODE_IP", "196.168.0.1"))
nodeName = getEnv("NODE_NAME", "fake")
takeOverAll = getEnvBool("TAKE_OVER_ALL", false)
takeOverLabelsSelector = getEnv("TAKE_OVER_LABELS_SELECTOR", "type=fake-kubelet")
generateNodeName = getEnv("GENERATE_NODE_NAME", "")
generateReplicas = getEnvUint("GENERATE_REPLICAS", 0)
generateSerialLength = getEnvUint("GENERATE_SERIAL_LENGTH", 1)
kubeconfig = getEnv("KUBECONFIG", "")
healthAddress = getEnv("HEALTH_ADDRESS", "") // deprecated: use serverAddress instead
serverAddress = getEnv("SERVER_ADDRESS", healthAddress)
podStatusTemplatePath = ""
podStatusTemplate = getEnv("POD_STATUS_TEMPLATE", templates.DefaultPodStatusTemplate)
nodeTemplatePath = ""
nodeTemplate = getEnv("NODE_TEMPLATE", templates.DefaultNodeTemplate)
nodeHeartbeatTemplateePath = ""
nodeHeartbeatTemplate = getEnv("NODE_HEARTBEAT_TEMPLATE", templates.DefaultNodeHeartbeatTemplate)
nodeInitializationTemplatePath = ""
nodeInitializationTemplate = getEnv("NODE_INITIALIZATION_TEMPLATE", templates.DefaultNodeInitializationTemplate)
master = ""
cidr = getEnv("CIDR", "10.0.0.1/24")
nodeIP = net.ParseIP(getEnv("NODE_IP", "196.168.0.1"))
nodeName = getEnv("NODE_NAME", "fake")
takeOverAll = getEnvBool("TAKE_OVER_ALL", false)
takeOverLabelsSelector = getEnv("TAKE_OVER_LABELS_SELECTOR", "type=fake-kubelet")
podCustomStatusAnnotationSelector = getEnv("POD_CUSTOM_STATUS_ANNOTATION_SELECTOR", "fake=custom")
generateNodeName = getEnv("GENERATE_NODE_NAME", "")
generateReplicas = getEnvUint("GENERATE_REPLICAS", 0)
generateSerialLength = getEnvUint("GENERATE_SERIAL_LENGTH", 1)
kubeconfig = getEnv("KUBECONFIG", "")
healthAddress = getEnv("HEALTH_ADDRESS", "") // deprecated: use serverAddress instead
serverAddress = getEnv("SERVER_ADDRESS", healthAddress)
podStatusTemplatePath = ""
podStatusTemplate = getEnv("POD_STATUS_TEMPLATE", templates.DefaultPodStatusTemplate)
nodeTemplatePath = ""
nodeTemplate = getEnv("NODE_TEMPLATE", templates.DefaultNodeTemplate)
nodeHeartbeatTemplateePath = ""
nodeHeartbeatTemplate = getEnv("NODE_HEARTBEAT_TEMPLATE", templates.DefaultNodeHeartbeatTemplate)
nodeInitializationTemplatePath = ""
nodeInitializationTemplate = getEnv("NODE_INITIALIZATION_TEMPLATE", templates.DefaultNodeInitializationTemplate)
master = ""

logger = log.New(os.Stderr, "[fake-kubelet] ", log.LstdFlags)
)
Expand All @@ -58,6 +59,7 @@ func init() {
pflag.StringVarP(&nodeName, "node-name", "n", nodeName, "Names of the node")
pflag.BoolVar(&takeOverAll, "take-over-all", takeOverAll, "Take over all nodes, there should be no nodes maintained by real Kubelet in the cluster")
pflag.StringVar(&takeOverLabelsSelector, "take-over-labels-selector", takeOverLabelsSelector, "Selector of nodes to take over")
pflag.StringVar(&podCustomStatusAnnotationSelector, "pod-custom-status-annotation-selector", podCustomStatusAnnotationSelector, "Selector of pods that with this annotation will no longer maintain status and will be left to others to modify it")
pflag.StringVar(&generateNodeName, "generate-node-name", generateNodeName, "Generate node name")
pflag.UintVar(&generateReplicas, "generate-replicas", generateReplicas, "Generate replicas")
pflag.UintVar(&generateSerialLength, "generate-serial-length", generateSerialLength, "Generate serial length")
Expand Down Expand Up @@ -167,16 +169,17 @@ func main() {
}

controller, err := fake_kubelet.NewController(fake_kubelet.Config{
ClientSet: clientset,
TakeOverAll: takeOverAll,
TakeOverLabelsSelector: takeOverLabelsSelector,
CIDR: cidr,
NodeIP: nodeIP.String(),
Logger: logger,
PodStatusTemplate: podStatusTemplate,
NodeTemplate: nodeTemplate,
NodeHeartbeatTemplate: nodeHeartbeatTemplate,
NodeInitializationTemplate: nodeInitializationTemplate,
ClientSet: clientset,
TakeOverAll: takeOverAll,
TakeOverLabelsSelector: takeOverLabelsSelector,
PodCustomStatusAnnotationSelector: podCustomStatusAnnotationSelector,
CIDR: cidr,
NodeIP: nodeIP.String(),
Logger: logger,
PodStatusTemplate: podStatusTemplate,
NodeTemplate: nodeTemplate,
NodeHeartbeatTemplate: nodeHeartbeatTemplate,
NodeInitializationTemplate: nodeInitializationTemplate,
})
if err != nil {
logger.Fatalln(err)
Expand Down
40 changes: 21 additions & 19 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ type Controller struct {
}

type Config struct {
ClientSet kubernetes.Interface
TakeOverAll bool
TakeOverLabelsSelector string
CIDR string
NodeIP string
Logger Logger
PodStatusTemplate string
NodeTemplate string
NodeInitializationTemplate string
NodeHeartbeatTemplate string
ClientSet kubernetes.Interface
TakeOverAll bool
TakeOverLabelsSelector string
PodCustomStatusAnnotationSelector string
CIDR string
NodeIP string
Logger Logger
PodStatusTemplate string
NodeTemplate string
NodeInitializationTemplate string
NodeHeartbeatTemplate string
}

type Logger interface {
Expand Down Expand Up @@ -105,15 +106,16 @@ func NewController(conf Config) (*Controller, error) {
}

pods, err := NewPodController(PodControllerConfig{
ClientSet: conf.ClientSet,
NodeIP: conf.NodeIP,
CIDR: conf.CIDR,
PodStatusTemplate: conf.PodStatusTemplate,
LockPodParallelism: 16,
DeletePodParallelism: 16,
NodeHasFunc: nodes.Has, // just handle pods that are on nodes we have
Logger: conf.Logger,
FuncMap: funcMap,
ClientSet: conf.ClientSet,
NodeIP: conf.NodeIP,
CIDR: conf.CIDR,
PodCustomStatusAnnotationSelector: conf.PodCustomStatusAnnotationSelector,
PodStatusTemplate: conf.PodStatusTemplate,
LockPodParallelism: 16,
DeletePodParallelism: 16,
NodeHasFunc: nodes.Has, // just handle pods that are on nodes we have
Logger: conf.Logger,
FuncMap: funcMap,
})
if err != nil {
return nil, fmt.Errorf("failed to create pods controller: %v", err)
Expand Down
79 changes: 47 additions & 32 deletions pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand All @@ -30,31 +31,33 @@ var (

// PodController is a fake pods implementation that can be used to test
type PodController struct {
clientSet kubernetes.Interface
nodeIP string
cidrIPNet *net.IPNet
nodeHasFunc func(nodeName string) bool
ipPool *ipPool
podStatusTemplate string
logger Logger
funcMap template.FuncMap
lockPodChan chan *corev1.Pod
lockPodParallelism int
deletePodChan chan *corev1.Pod
deletePodParallelism int
clientSet kubernetes.Interface
podCustomStatusAnnotationSelector labels.Selector
nodeIP string
cidrIPNet *net.IPNet
nodeHasFunc func(nodeName string) bool
ipPool *ipPool
podStatusTemplate string
logger Logger
funcMap template.FuncMap
lockPodChan chan *corev1.Pod
lockPodParallelism int
deletePodChan chan *corev1.Pod
deletePodParallelism int
}

// PodControllerConfig is the configuration for the PodController
type PodControllerConfig struct {
ClientSet kubernetes.Interface
NodeIP string
CIDR string
NodeHasFunc func(nodeName string) bool
PodStatusTemplate string
Logger Logger
LockPodParallelism int
DeletePodParallelism int
FuncMap template.FuncMap
ClientSet kubernetes.Interface
PodCustomStatusAnnotationSelector string
NodeIP string
CIDR string
NodeHasFunc func(nodeName string) bool
PodStatusTemplate string
Logger Logger
LockPodParallelism int
DeletePodParallelism int
FuncMap template.FuncMap
}

// NewPodController creates a new fake pods controller
Expand All @@ -63,18 +66,25 @@ func NewPodController(conf PodControllerConfig) (*PodController, error) {
if err != nil {
return nil, err
}

podCustomStatusAnnotationSelector, err := labels.Parse(conf.PodCustomStatusAnnotationSelector)
if err != nil {
return nil, err
}

n := &PodController{
clientSet: conf.ClientSet,
nodeIP: conf.NodeIP,
cidrIPNet: cidrIPNet,
ipPool: newIPPool(cidrIPNet),
nodeHasFunc: conf.NodeHasFunc,
logger: conf.Logger,
podStatusTemplate: conf.PodStatusTemplate,
lockPodChan: make(chan *corev1.Pod),
lockPodParallelism: conf.LockPodParallelism,
deletePodChan: make(chan *corev1.Pod),
deletePodParallelism: conf.DeletePodParallelism,
clientSet: conf.ClientSet,
podCustomStatusAnnotationSelector: podCustomStatusAnnotationSelector,
nodeIP: conf.NodeIP,
cidrIPNet: cidrIPNet,
ipPool: newIPPool(cidrIPNet),
nodeHasFunc: conf.NodeHasFunc,
logger: conf.Logger,
podStatusTemplate: conf.PodStatusTemplate,
lockPodChan: make(chan *corev1.Pod),
lockPodParallelism: conf.LockPodParallelism,
deletePodChan: make(chan *corev1.Pod),
deletePodParallelism: conf.DeletePodParallelism,
}
n.funcMap = template.FuncMap{
"NodeIP": func() string {
Expand Down Expand Up @@ -159,6 +169,11 @@ func (c *PodController) DeletePods(ctx context.Context, pods <-chan *corev1.Pod)

// LockPod locks a given pod
func (c *PodController) LockPod(ctx context.Context, pod *corev1.Pod) error {
if c.podCustomStatusAnnotationSelector != nil &&
len(pod.Annotations) != 0 &&
c.podCustomStatusAnnotationSelector.Matches(labels.Set(pod.Annotations)) {
return nil
}
patch, err := c.configurePod(pod)
if err != nil {
return err
Expand Down
39 changes: 30 additions & 9 deletions pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/wzshiming/fake-kubelet/templates"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
)

Expand Down Expand Up @@ -52,16 +53,18 @@ func TestPodController(t *testing.T) {
nodeHasFunc := func(nodeName string) bool {
return strings.HasPrefix(nodeName, "node")
}
annotationSelector, _ := labels.Parse("fake=custom")
pods, err := NewPodController(PodControllerConfig{
ClientSet: clientset,
NodeIP: "10.0.0.1",
CIDR: "10.0.0.1/24",
PodStatusTemplate: templates.DefaultPodStatusTemplate,
NodeHasFunc: nodeHasFunc,
FuncMap: funcMap,
LockPodParallelism: 2,
DeletePodParallelism: 2,
Logger: testingLogger{t},
ClientSet: clientset,
NodeIP: "10.0.0.1",
CIDR: "10.0.0.1/24",
PodCustomStatusAnnotationSelector: annotationSelector.String(),
PodStatusTemplate: templates.DefaultPodStatusTemplate,
NodeHasFunc: nodeHasFunc,
FuncMap: funcMap,
LockPodParallelism: 2,
DeletePodParallelism: 2,
Logger: testingLogger{t},
})
if err != nil {
t.Fatal(fmt.Errorf("new pods controller error: %v", err))
Expand Down Expand Up @@ -96,6 +99,24 @@ func TestPodController(t *testing.T) {
},
}, metav1.CreateOptions{})

pod1, err := clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{})
if err != nil {
t.Fatal(fmt.Errorf("get pod1 error: %v", err))
}
pod1.Annotations = map[string]string{
"fake": "custom",
}
pod1.Status.Reason = "custom"
clientset.CoreV1().Pods("default").Update(ctx, pod1, metav1.UpdateOptions{})

pod1, err = clientset.CoreV1().Pods("default").Get(ctx, "pod1", metav1.GetOptions{})
if err != nil {
t.Fatal(fmt.Errorf("get pod1 error: %v", err))
}
if pod1.Status.Reason != "custom" {
t.Fatal(fmt.Errorf("pod1 status reason not custom"))
}

time.Sleep(2 * time.Second)

list, err := clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
Expand Down

0 comments on commit 1575f3b

Please sign in to comment.