Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: controller leader election #173

Merged
merged 3 commits into from
Jan 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test: add e2e cases
  • Loading branch information
tokers committed Jan 13, 2021
commit 86972eea51fbb4161de73c776ed41fd7d1130ec9
9 changes: 9 additions & 0 deletions charts/ingress-apisix/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ spec:
volumeMounts:
- mountPath: /ingress-apisix/conf
name: configuration
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
{{- with .Values.ingressController.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
Expand Down
36 changes: 20 additions & 16 deletions pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,21 @@ import (
"sync"
"time"

"go.uber.org/zap"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

v1 "k8s.io/api/core/v1"

"k8s.io/client-go/tools/cache"

"github.com/api7/ingress-controller/pkg/apisix"

clientSet "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
crdclientset "github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

"github.com/api7/ingress-controller/pkg/api"
"github.com/api7/ingress-controller/pkg/apisix"
"github.com/api7/ingress-controller/pkg/config"
"github.com/api7/ingress-controller/pkg/kube"
"github.com/api7/ingress-controller/pkg/log"
Expand Down Expand Up @@ -172,11 +167,17 @@ func (c *Controller) Run(stop chan struct{}) error {
OnNewLeader: func(identity string) {
log.Warnf("found a new leader %s", identity)
if identity != c.name {
log.Infof("controller now is running as a candidate")
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
}
},
OnStoppedLeading: func() {
log.Info("controller now is running as a candidate")
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.metricsCollector.ResetLeader(false)
},
},
Expand All @@ -201,7 +202,10 @@ election:
}

func (c *Controller) run(ctx context.Context) {
log.Info("controller now is running as leader")
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
c.metricsCollector.ResetLeader(true)

ac := &Api6Controller{
Expand Down
9 changes: 9 additions & 0 deletions samples/deploy/deployment/ingress-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ spec:
- mountPath: /ingress-apisix/conf/config.yaml
name: apisix-ingress-configmap
subPath: config.yaml
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
volumes:
- configMap:
name: apisix-ingress-cm
Expand Down
56 changes: 56 additions & 0 deletions test/e2e/ingress/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ingress
import (
"encoding/json"
"net/http"
"time"

"github.com/api7/ingress-controller/test/e2e/scaffold"
"github.com/onsi/ginkgo"
Expand Down Expand Up @@ -103,3 +104,58 @@ var _ = ginkgo.Describe("double-routes", func() {
// We don't care the json data, only make sure it's a normal json string.
})
})

var _ = ginkgo.Describe("leader election", func() {
s := scaffold.NewScaffold(&scaffold.Options{
Name: "leaderelection",
Kubeconfig: scaffold.GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
APISIXDefaultConfigPath: "testdata/apisix-gw-config-default.yaml",
IngressAPISIXReplicas: 2,
})
ginkgo.It("lease check", func() {
pods, err := s.GetIngressPodDetails()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), pods, 2)
lease, err := s.GetLeaderLease()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Equal(ginkgo.GinkgoT(), *lease.Spec.LeaseDurationSeconds, int32(15))
if *lease.Spec.HolderIdentity != pods[0].Name && *lease.Spec.HolderIdentity != pods[1].Name {
assert.Fail(ginkgo.GinkgoT(), "bad leader lease holder identity")
}
})

ginkgo.It("leader failover", func() {
// Wait the leader election to complete.
time.Sleep(2 * time.Second)
pods, err := s.GetIngressPodDetails()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), pods, 2)

lease, err := s.GetLeaderLease()
assert.Nil(ginkgo.GinkgoT(), err)

leaderIdx := 0
if *lease.Spec.HolderIdentity == pods[1].Name {
leaderIdx = 1
}
ginkgo.GinkgoT().Logf("lease is %s", *lease.Spec.HolderIdentity)
assert.Nil(ginkgo.GinkgoT(), s.KillPod(pods[leaderIdx].Name))
time.Sleep(25 * time.Second)

newLease, err := s.GetLeaderLease()
assert.Nil(ginkgo.GinkgoT(), err)

newPods, err := s.GetIngressPodDetails()
assert.Nil(ginkgo.GinkgoT(), err)
assert.Len(ginkgo.GinkgoT(), pods, 2)

assert.NotEqual(ginkgo.GinkgoT(), *newLease.Spec.HolderIdentity, *lease.Spec.HolderIdentity)
assert.Greater(ginkgo.GinkgoT(), *newLease.Spec.LeaseTransitions, *lease.Spec.LeaseTransitions)

if *newLease.Spec.HolderIdentity != newPods[0].Name && *newLease.Spec.HolderIdentity != newPods[1].Name {
assert.Failf(ginkgo.GinkgoT(), "bad leader lease holder identity: %s, should be %s or %s",
*newLease.Spec.HolderIdentity, newPods[0].Name, newPods[1].Name)
}
})
})
37 changes: 35 additions & 2 deletions test/e2e/scaffold/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package scaffold

import (
"context"
"fmt"

"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -45,7 +48,7 @@ kind: Deployment
metadata:
name: ingress-apisix-controller-deployment-e2e-test
spec:
replicas: 1
replicas: %d
selector:
matchLabels:
app: ingress-apisix-controller-deployment-e2e-test
Expand Down Expand Up @@ -77,6 +80,15 @@ spec:
tcpSocket:
port: 8080
timeoutSeconds: 2
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
image: "apache/apisix-ingress-controller:dev"
imagePullPolicy: Never
name: ingress-apisix-controller-deployment-e2e-test
Expand All @@ -102,7 +114,7 @@ spec:
)

func (s *Scaffold) newIngressAPISIXController() error {
ingressAPISIXDeployment := fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.namespace)
ingressAPISIXDeployment := fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas, s.namespace)
if err := k8s.CreateServiceAccountE(s.t, s.kubectlOptions, _serviceAccount); err != nil {
return err
}
Expand Down Expand Up @@ -144,3 +156,24 @@ func (s *Scaffold) waitAllIngressControllerPodsAvailable() error {
}
return waitExponentialBackoff(condFunc)
}

// GetLeaderLease returns the Lease resource.
func (s *Scaffold) GetLeaderLease() (*coordinationv1.Lease, error) {
cli, err := k8s.GetKubernetesClientE(s.t)
if err != nil {
return nil, err
}
lease, err := cli.CoordinationV1().Leases(s.namespace).Get(context.TODO(), "ingress-apisix-leader", metav1.GetOptions{})
if err != nil {
return nil, err
}
return lease, nil
}

// GetIngressPodDetails returns a batch of pod description
// about apisix-ingress-controller.
func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
return k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
LabelSelector: "app=ingress-apisix-controller-deployment-e2e-test",
})
}
14 changes: 14 additions & 0 deletions test/e2e/scaffold/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package scaffold

import (
"context"
"fmt"
"io/ioutil"
"net/http"
Expand All @@ -26,6 +27,8 @@ import (
"text/template"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/gavv/httpexpect/v2"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/gruntwork-io/terratest/modules/testing"
Expand All @@ -41,6 +44,7 @@ type Options struct {
Kubeconfig string
APISIXConfigPath string
APISIXDefaultConfigPath string
IngressAPISIXReplicas int
}

type Scaffold struct {
Expand Down Expand Up @@ -101,10 +105,20 @@ func NewDefaultScaffold() *Scaffold {
Kubeconfig: GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
APISIXDefaultConfigPath: "testdata/apisix-gw-config-default.yaml",
IngressAPISIXReplicas: 1,
}
return NewScaffold(opts)
}

// KillPod kill the pod which name is podName.
func (s *Scaffold) KillPod(podName string) error {
cli, err := k8s.GetKubernetesClientE(s.t)
if err != nil {
return err
}
return cli.CoreV1().Pods(s.namespace).Delete(context.TODO(), podName, metav1.DeleteOptions{})
}

// DefaultHTTPBackend returns the service name and service ports
// of the default http backend.
func (s *Scaffold) DefaultHTTPBackend() (string, []int32) {
Expand Down