Skip to content

学习如何为 Kubernetes Operators 进行单元测试 Learning How to Write Unit Tests for Kubernetes Operators

Notifications You must be signed in to change notification settings

leileiwan/learning-unit-testing-for-k8s-operator

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

learning-unit-testing-for-k8s-operator

这一 Repo 旨在帮助 Kubernetes Operators 的开发者们学习如何为 Operators 实现单元测试。其中包括:

  • 为原生实现的 Operator 实现单元测试
  • 为 kubebuilder v1 生成的 Operator 实现单元测试
  • 为 kubebuilder v2 生成的 Operator 实现单元测试

因此这一文档的受众是 Operator 开发者们,文档中为不同的实现方式(kubebuilder v1, v2, 原生实现)设计了不同的实验,配合实验阅读味道更佳。

Table of Contents

Created by gh-md-toc

为原生实现的 Operator 实现单元测试

原生实现的 Operator 实现单元测试的讲解与动手实验,是利用 kubernetes/sample-controller a52d0d8 作为示例展开的,为了实现动手实验的目的,修改了其单元测试 controller_test.go 中的内容。

事先需要了解的知识

  • Kubernetes CRD 特性
  • Kubernetes Informer 机制
  • Golang 单元测试机制

准备工作

首先,将 native-demo-operator 复制到 $GOPATH/src/k8s.io/sample-controller

# 将 `native-demo-operator` 复制到 `$GOPATH/src/k8s.io/sample-controller`。
./scripts/install-native-operator.sh
# 到 `$GOPATH/src/github.com/caicloud/kbv2-operator` 目录下
cd $GOPATH/src/k8s.io/sample-controller

这一操作是为了确保 operator 在正确的路径下。此时已经准备好了 Operator 的环境。

Operator 实现分析

注:如果已经熟悉 kubernetes/sample-controller 的实现与自带的单元测试,可跳过这一部分。

原生实现的 Operator 实现了一个新的资源类型,Foo。Foo 的定义如下,它进一步抽象了 Deployment,只保留了 Deployment Name 和 Replicas 两个字段。在创建 Foo 时,Foo 会创建出以 Deployment Name 命名的 Deployment。而在 Foo 的状态中,只会显示目前 Foo 创建的 Deployment 目前可用的 Replicas 的数量。

type Foo struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   FooSpec   `json:"spec"`
	Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
	DeploymentName string `json:"deploymentName"`
	Replicas       *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
	AvailableReplicas int32 `json:"availableReplicas"`
}

Operator 的初始化

如下代码是 Foo 的 Operator 初始化的过程。Foo 依赖两个 Client 和两个 Informer:kubeClient(用来操作 Deployment 资源),exampleClient(用来操作 Foo 资源),Deployment Informer(用来订阅 apiserver 上关于 Deployment 的事件),Foo Informer(用来订阅 Foo 资源的事件)。

	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

	// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
	// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}

Sync 过程

Foo Operator 如同 Kubernetes 内部的 controller 一样,维护了一个 workqueue,并且利用 syncHandler 比对现实状态与期望状态的不同,从现实状态努力同步到期望状态。

Sync 过程如下所示,首先会得到或者创建出对应的 Deployment,然后判断 Deployment 的 Replicas 是否与 Foo 的定义一致,如果不一致,则更新 Deployment。最后,更新 Foo 的状态。

点击此处查看 syncHandler 代码
func (c *Controller) syncHandler(key string) error {
	// Convert the namespace/name string into a distinct namespace and name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
		return nil
	}

	// Get the Foo resource with this namespace/name
	foo, err := c.foosLister.Foos(namespace).Get(name)
	if err != nil {
		// The Foo resource may no longer exist, in which case we stop
		// processing.
		if errors.IsNotFound(err) {
			utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
			return nil
		}

		return err
	}

	deploymentName := foo.Spec.DeploymentName
	if deploymentName == "" {
		utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
		return nil
	}

	// Get the deployment with the name specified in Foo.spec
	deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
	// If the resource doesn't exist, we'll create it
	if errors.IsNotFound(err) {
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
	}
	if err != nil {
		return err
	}

	// If the Deployment is not controlled by this Foo resource, we should log
	// a warning to the event recorder and ret
	if !metav1.IsControlledBy(deployment, foo) {
		msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
		c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
		return fmt.Errorf(msg)
	}

	// If this number of the replicas on the Foo resource is specified, and the
	// number does not equal the current desired replicas on the Deployment, we
	// should update the Deployment resource.
	if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
		klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
	}
	if err != nil {
		return err
	}
	// Finally, we update the status block of the Foo resource to reflect the
	// current state of the world
	err = c.updateFooStatus(foo, deployment)
	if err != nil {
		return err
	}
	c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	return nil
}

单元测试

Operator 的实现依赖 clienset 和 informer,informer 用来订阅 apiserver 的事件,触发 operator 的同步操作。clientset 用来与 apiserver 交互,进行增删改查等操作。

因此在进行单元测试时,需要把这两个依赖 fake 掉。为了在实现单元测试用例时,更方便地完成 Fake 的操作,Foo Operator 引入了一个专门用于测试的数据结构 fixture

首先,我们会介绍 fixture 的定义以及部分实现,接下来,会以一个测试用例作为示例,了解如何使用 fixture 简化测试用例的实现。

fixture 结构

fixture 的定义如下:

type fixture struct {
	t *testing.T

	client     *fake.Clientset
	kubeclient *k8sfake.Clientset
	// Objects to put in the store.
	fooLister        []*samplecontroller.Foo
	deploymentLister []*apps.Deployment
	// Actions expected to happen on the client.
	kubeactions []core.Action
	actions     []core.Action
	// Objects from here preloaded into NewSimpleFake.
	kubeobjects []runtime.Object
	objects     []runtime.Object
}

fixture 在测试中,代表的就是一个在运行的 Operator,其中 clientkubeclient 是 fake 的 client。

deploymentListerfooLister 会定义一系列 Deployment 和 Foo 实例,这些实例会被加入到 Informer 的 Indexer 中,以便发起 Sync 请求。

kubeobjectsobjects 是用来构建期望的测试数据的。它们中的对象,会被添加到 kubeclientclient 中:

	f := newFixture(t)
	f.client = fake.NewSimpleClientset(f.objects...)
	f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)

NewSimpleClientset 的定义如下所示,它利用了一个非常简单的 object 跟踪机制,绕过了正常实现的 clienset 中的各种 validation 和 defaults。它会记录对跟着的 object 的增删改查操作。

// NewSimpleClientset returns a clientset that will respond with the provided objects.
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
// without applying any validations and/or defaults. It shouldn't be considered a replacement
// for a real clientset and is mostly useful in simple unit tests.
func NewSimpleClientset(objects ...runtime.Object) *Clientset {
	o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())
	for _, obj := range objects {
		if err := o.Add(obj); err != nil {
			panic(err)
		}
	}

	cs := &Clientset{tracker: o}
	cs.discovery = &fakediscovery.FakeDiscovery{Fake: &cs.Fake}
	cs.AddReactor("*", "*", testing.ObjectReaction(o))
	cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
		gvr := action.GetResource()
		ns := action.GetNamespace()
		watch, err := o.Watch(gvr, ns)
		if err != nil {
			return false, nil, err
		}
		return true, watch, nil
	})

	return cs
}

kubeactionsactions 是用来记录期望观测到的,作用在 clientkubeclient 上的调用。Action 的定义如下所示:

type Action interface {
	GetNamespace() string
	GetVerb() string
	GetResource() schema.GroupVersionResource
	GetSubresource() string
	Matches(verb, resource string) bool

	// DeepCopy is used to copy an action to avoid any risk of accidental mutation.  Most people never need to call this
	// because the invocation logic deep copies before calls to storage and reactors.
	DeepCopy() Action
}

type GetAction interface {
	Action
	GetName() string
}

type CreateAction interface {
	Action
	GetObject() runtime.Object
}

type UpdateAction interface {
	Action
	GetObject() runtime.Object
}
// ...

一个 Action 实例描述的是发生在 clientset 上的一次调用,其中包括 GET 请求操作(GetAction),创建操作(CreateAction),更新操作(UpdateAction)等。通过定义期望的 Action,在单元测试中可以检查 clientset 是否发起了与期望一致的请求。

利用 fixture 实现测试用例

接下来,以一个 Foo Operator 的测试用例为例,介绍一下如何使用 fixture 实现单元测试用例:

func TestCreatesDeployment(t *testing.T) {
	f := newFixture(t)
	foo := newFoo("test", int32Ptr(1))

	f.fooLister = append(f.fooLister, foo)
	f.objects = append(f.objects, foo)

	expDeployment := newDeployment(foo)
	f.expectCreateDeploymentAction(expDeployment)
	f.expectUpdateFooStatusAction(foo)

	f.run(getKey(foo, t))
}

这一测试用例用于测试创建 Deployment 的逻辑是否符合期望。首先创建出一 fixture 对象,其次构造一个用于测试的 Foo 实例。然后将 Foo 添加到 fooListerobjects 中。最后,构造期望的 Deployment,利用辅助函数 expectCreateDeploymentActionexpectUpdateFooStatusAction 将对应的期望 Action 加入到 kubeactionsactions 中。最后,运行 Controller 以完成整个测试。

接下来,看一下 f.run(getKey(foo, t)) 具体的过程。

点击此处查看 run 代码
func (f *fixture) run(fooName string) {
	f.runController(fooName, true, false)
}

func (f *fixture) runController(fooName string, startInformers bool, expectError bool) {
	c, i, k8sI := f.newController()
	if startInformers {
		stopCh := make(chan struct{})
		defer close(stopCh)
		i.Start(stopCh)
		k8sI.Start(stopCh)
	}

	err := c.syncHandler(fooName)
	if !expectError && err != nil {
		f.t.Errorf("error syncing foo: %v", err)
	} else if expectError && err == nil {
		f.t.Error("expected error syncing foo, got nil")
	}

	actions := filterInformerActions(f.client.Actions())
	for i, action := range actions {
		if len(f.actions) < i+1 {
			f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
			break
		}

		expectedAction := f.actions[i]
		checkAction(expectedAction, action, f.t)
	}

	if len(f.actions) > len(actions) {
		f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):])
	}

	k8sActions := filterInformerActions(f.kubeclient.Actions())
	for i, action := range k8sActions {
		if len(f.kubeactions) < i+1 {
			f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeactions), k8sActions[i:])
			break
		}

		expectedAction := f.kubeactions[i]
		checkAction(expectedAction, action, f.t)
	}

	if len(f.kubeactions) > len(k8sActions) {
		f.t.Errorf("%d additional expected actions:%+v", len(f.kubeactions)-len(k8sActions), f.kubeactions[len(k8sActions):])
	}
}

func (f *fixture) newController() (*Controller, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
	f.client = fake.NewSimpleClientset(f.objects...)
	f.kubeclient = k8sfake.NewSimpleClientset(f.kubeobjects...)

	i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc())
	k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriodFunc())

	c := NewController(f.kubeclient, f.client,
		k8sI.Apps().V1().Deployments(), i.Samplecontroller().V1alpha1().Foos())

	c.foosSynced = alwaysReady
	c.deploymentsSynced = alwaysReady
	c.recorder = &record.FakeRecorder{}

	for _, f := range f.fooLister {
		i.Samplecontroller().V1alpha1().Foos().Informer().GetIndexer().Add(f)
	}

	for _, d := range f.deploymentLister {
		k8sI.Apps().V1().Deployments().Informer().GetIndexer().Add(d)
	}

	return c, i, k8sI
}

run 是对另一函数 runController(fooName string, startInformers bool, expectError bool) 的直接调用。其中 fooName 就是 Foo 的 namespace/name,这一参数会被用来作为 syncHandler 的输入。第二个参数 startInformers 确定是否需要利用 goroutine 运行 informer 的逻辑。第三个参数 expectError 代表是否期望在运行中收到 error。

runController 的最开始,通过调用 newController,创建了 fake 的 client 和 informer,并且将数据在 client 和 informer 中准备好。接下来,是测试用例中的主要逻辑,它会把 informer 运行起来,同时去调用一次 syncHandler,做一次状态的比对和同步,最后检查在 client 中,是否有期望的 Action 发生。

在这一例子中,我们期望的 Action 是:

    f.expectCreateDeploymentAction(expDeployment)
	f.expectUpdateFooStatusAction(foo)

也就是期望观测到创建 expDeployment 的 Action,以及更新 Foo 的状态的 Action。如果在测试用例运行时没有在 runController 时遇到这两个 Action,测试用例就会报错。

Lab 1 实现单元测试

问题

目前在代码中,已经有了四个测试用例,分别是 TestCreatesDeploymentTestDoNothingTestUpdateDeploymentTestNotControlledByUs。Lab 需要完成一个新的测试用例:TestAnonymousDeployment

TestAnonymousDeployment 中,用户需要测试 Foo.Spec.DeploymentName 为空的情况。在实现时,建议利用 Fixture 简化实现,具体细节可参考已有的三个测试用例。

请前往 $GOPATH/src/k8s.io/sample-controller/controller_test.go 实现用例 TestAnonymousDeployment

参考实现

在完成后,可以查看参考实现。实现方式有很多种,此处只提供其中的一种实现方式。

点击此处查看参考实现
func TestAnonymousDeployment(t *testing.T) {
	f := newFixture(t)
	foo := newFoo("test", int32Ptr(1))
	foo.Spec.DeploymentName = ""

	f.fooLister = append(f.fooLister, foo)
	f.objects = append(f.objects, foo)

	f.run(getKey(foo, t))
}

首先,利用 newFixture 创建了测试环境,然后创建了 DeploymentName 是空值的测试用例 Foo,然后将其加入到了 fooListerobjects 中,在 run 的调用中,fooListerobjects 中的对象会被加入到 operator 对应的 clientinformer 中。最后,由于在 DeploymentName 是空值的情况下,会直接返回,不做任何处理:

    if deploymentName == "" {
		// We choose to absorb the error here as the worker would requeue the
		// resource otherwise. Instead, the next time the resource is updated
		// the resource will be queued again.
		utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
		return nil
	}

所以,应该没有任何 Action 产生。

Lab 2 扩展内容:Table Driven Test

背景知识

在之前的实验中,所有的测试用例都是独立的,我们为了不同的情况都实现了一个 TestXXX 函数,这样的实现,当我们要覆盖更多 case 时,会非常冗长。这时我们可以采用 Table-Driven 的方式,把多个测试用例合并在一个用例中。举一个斐波那契数列的例子介绍这样的方式:

func TestFib(t *testing.T) {
    var fibTests = []struct {
        in       int // input
        expected int // expected result
    }{
        {1, 1},
        {2, 1},
        {3, 2},
        {4, 3},
        {5, 5},
        {6, 8},
        {7, 13},
    }

    for _, tt := range fibTests {
        actual := Fib(tt.in)
        if actual != tt.expected {
            t.Errorf("Fib(%d) = %d; expected %d", tt.in, actual, tt.expected)
        }
    }
}

通过定义了一个测试用例的数组,在一个循环中依次进行多次测试。这样的实现可以用更少的代码覆盖更多的用例,更多介绍可以参考 golang/go/wiki/TableDrivenTests

问题

在这一实验中,我们需要把之前的五个测试用例,利用 Table Driven 的方法,合并成一个测试用例。

请前往 $GOPATH/src/k8s.io/sample-controller/controller_test.go 实现用例 TestController

参考实现

在完成后,可以查看参考实现。实现方式有很多种,此处只提供其中的一种实现方式。

点击此处查看参考实现

首先,在测试函数中定义了一个结构 TestCase,其中包含了测试用例的名字,测试中会用到的数据 FooDeployment,控制是否将数据加入到 Controller 中的变量 AddFooIntoControllerAddDeploymentIntoController。接下来是控制是否期望观测到对应 Action 的一系列变量 ExpectCreateDeploymentExpectUpdateDeploymentExpectUpdateFooStatus。最后是关于期望观测到的 Deployment 和是否期望遇到 Error 的变量 ExpectDeploymentExpectError

func TestController(t *testing.T) {
	type TestCase struct {
		Case       string
		Foo        *samplecontroller.Foo
		Deployment *appsv1.Deployment

		AddFooIntoController        bool
		AddDeploymentIntoController bool

		ExpectCreateDeployment bool
		ExpectUpdateDeployment bool
		ExpectUpdateFooStatus  bool

		ExpectDeployment *appsv1.Deployment
		ExpectError      bool
	}
	testCases := []TestCase{
		{
			Case:       "TestCreatesDeployment",
			Foo:        newFoo("test", int32Ptr(1)),
			Deployment: newDeployment(newFoo("test", int32Ptr(1))),

			AddFooIntoController:        true,
			AddDeploymentIntoController: false,

			ExpectCreateDeployment: true,
			ExpectUpdateDeployment: false,
			ExpectUpdateFooStatus:  true,

			ExpectError: false,
		},
		{
			Case:       "TestDoNothing",
			Foo:        newFoo("test", int32Ptr(1)),
			Deployment: newDeployment(newFoo("test", int32Ptr(1))),

			AddFooIntoController:        true,
			AddDeploymentIntoController: true,

			ExpectCreateDeployment: false,
			ExpectUpdateDeployment: false,
			ExpectUpdateFooStatus:  true,

			ExpectError: false,
		},
		{
			Case:       "TestUpdateDeployment",
			Foo:        newFoo("test", int32Ptr(1)),
			Deployment: newDeployment(newFoo("test", int32Ptr(2))),

			AddFooIntoController:        true,
			AddDeploymentIntoController: true,

			ExpectCreateDeployment: false,
			ExpectUpdateDeployment: true,
			ExpectUpdateFooStatus:  true,

			ExpectDeployment: newDeployment(newFoo("test", int32Ptr(1))),
			ExpectError:      false,
		},
		{
			Case: "TestNotControlledByUs",
			Foo:  newFoo("test", int32Ptr(1)),
			Deployment: func() *appsv1.Deployment {
				d := newDeployment(newFoo("test", int32Ptr(2)))
				d.ObjectMeta.OwnerReferences = []metav1.OwnerReference{}
				return d
			}(),

			AddFooIntoController:        true,
			AddDeploymentIntoController: true,

			ExpectCreateDeployment: false,
			ExpectUpdateDeployment: false,
			ExpectUpdateFooStatus:  false,

			ExpectError: true,
		},
		{
			Case: "TestAnonymousDeployment",
			Foo: func() *samplecontroller.Foo {
				f := newFoo("test", int32Ptr(1))
				f.Spec.DeploymentName = ""
				return f
			}(),

			AddFooIntoController:        true,
			AddDeploymentIntoController: false,

			ExpectCreateDeployment: false,
			ExpectUpdateDeployment: false,
			ExpectUpdateFooStatus:  false,

			ExpectError: false,
		},
	}

	for _, testCase := range testCases {
		t.Logf("Running Test Case: %s", testCase.Case)
		f := newFixture(t)
		if testCase.AddFooIntoController {
			f.fooLister = append(f.fooLister, testCase.Foo)
			f.objects = append(f.objects, testCase.Foo)
		}
		if testCase.AddDeploymentIntoController {
			f.deploymentLister = append(f.deploymentLister, testCase.Deployment)
			f.kubeobjects = append(f.kubeobjects, testCase.Deployment)
		}
		if testCase.ExpectCreateDeployment {
			f.expectCreateDeploymentAction(testCase.Deployment)
		}
		if testCase.ExpectUpdateDeployment {
			if testCase.ExpectDeployment != nil {
				f.expectUpdateDeploymentAction(testCase.ExpectDeployment)
			} else {
				f.expectUpdateDeploymentAction(testCase.Deployment)
			}
		}
		if testCase.ExpectUpdateFooStatus {
			f.expectUpdateFooStatusAction(testCase.Foo)
		}
		f.runController(getKey(testCase.Foo, t), true, testCase.ExpectError)
	}
}

接下来,就顺理成章了。添加测试用例只需要在 testCases 中添加新的 TestCase 实例即可。

为 kubebuilder v1 生成的 Operator 实现单元测试(TODO)

为 kubebuilder v2 生成的 Operator 实现单元测试(TODO)

About

学习如何为 Kubernetes Operators 进行单元测试 Learning How to Write Unit Tests for Kubernetes Operators

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Go 96.3%
  • Shell 3.7%