Skip to content

Commit

Permalink
Merge pull request #10758 from prezha/multinode-wait-join
Browse files Browse the repository at this point in the history
multinode cluster: fix waits and joins
  • Loading branch information
medyagh authored Mar 10, 2021
2 parents b3b4807 + 9ba6c7c commit e1c872a
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 149 deletions.
18 changes: 9 additions & 9 deletions pkg/minikube/bootstrapper/bsutil/kverify/kverify.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
NodeReadyKey = "node_ready"
// KubeletKey is the name used in the flags for waiting for the kubelet status to be ready
KubeletKey = "kubelet"
// ExtraKey is the name used for extra waiting for pods in CorePodsList to be Ready
// ExtraKey is the name used for extra waiting for pods in CorePodsLabels to be Ready
ExtraKey = "extra"
)

Expand All @@ -62,14 +62,14 @@ var (
"kube-proxy",
"kube-scheduler",
}
// CorePodsList is a list of essential pods for running kurnetes to extra wait for them to be Ready
CorePodsList = []string{
"kube-dns", // coredns
"etcd",
"kube-apiserver",
"kube-controller-manager",
"kube-proxy",
"kube-scheduler",
// CorePodsLabels is a list of essential, in addition to any other system-critical, pods for running kurnetes to extra wait for them to be Ready
CorePodsLabels = []string{
"k8s-app=kube-dns", // coredns
"component=etcd",
"component=kube-apiserver",
"component=kube-controller-manager",
"k8s-app=kube-proxy",
"component=kube-scheduler",
}
)

Expand Down
66 changes: 43 additions & 23 deletions pkg/minikube/bootstrapper/bsutil/kverify/node_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,64 @@ import (
"fmt"
"time"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
)

// WaitForNodeReady waits till kube client reports node status as "ready"
func WaitForNodeReady(cs *kubernetes.Clientset, timeout time.Duration) error {
klog.Infof("waiting %s for node status to be ready ...", timeout)
// WaitNodeCondition waits for specified condition of node name.
func WaitNodeCondition(cs *kubernetes.Clientset, name string, condition core.NodeConditionType, timeout time.Duration) error {
klog.Infof("waiting up to %v for node %q to be %q ...", timeout, name, condition)
start := time.Now()
defer func() {
klog.Infof("duration metric: took %s to wait for WaitForNodeReady...", time.Since(start))
klog.Infof("duration metric: took %v waiting for node %q to be %q ...", time.Since(start), name, condition)
}()
checkReady := func() (bool, error) {

lap := time.Now()
checkCondition := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("wait for node to be ready timed out")
}
ns, err := cs.CoreV1().Nodes().List(meta.ListOptions{})
if err != nil {
klog.Infof("error listing nodes will retry: %v", err)
return false, nil
return false, fmt.Errorf("timed out waiting %v for node %q to be %q (will not retry!)", timeout, name, condition)
}

for _, n := range ns.Items {
for _, c := range n.Status.Conditions {
if c.Type == v1.NodeReady && c.Status != v1.ConditionTrue {
klog.Infof("node %q has unwanted condition %q : Reason %q Message: %q. will try. ", n.Name, c.Type, c.Reason, c.Message)
return false, nil
}
}
status, reason := nodeConditionStatus(cs, name, condition)
if status == core.ConditionTrue {
klog.Info(reason)
return true, nil
}
if status == core.ConditionUnknown {
klog.Info(reason)
return false, fmt.Errorf(reason)
}
return true, nil
// reduce log spam
if time.Since(lap) > (2 * time.Second) {
klog.Info(reason)
lap = time.Now()
}
return false, nil
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, checkReady); err != nil {
return errors.Wrapf(err, "wait node ready")
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, checkCondition); err != nil {
return fmt.Errorf("waitNodeCondition: %w", err)
}

return nil
}

// nodeConditionStatus returns if node is in specified condition and verbose reason.
func nodeConditionStatus(cs *kubernetes.Clientset, name string, condition core.NodeConditionType) (status core.ConditionStatus, reason string) {
node, err := cs.CoreV1().Nodes().Get(name, meta.GetOptions{})
if err != nil {
return core.ConditionUnknown, fmt.Sprintf("error getting node %q: %v", name, err)
}

for _, c := range node.Status.Conditions {
if c.Type == condition {
return c.Status, fmt.Sprintf("node %q has status %q:%q", node.Name, condition, c.Status)
}
}

// assume transient condition
return core.ConditionFalse, fmt.Sprintf("node %q doesn't have %q status: %+v", node.Name, condition, node.Status)
}
141 changes: 81 additions & 60 deletions pkg/minikube/bootstrapper/bsutil/kverify/pod_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package kverify

import (
"fmt"
"strings"
"time"

"github.com/pkg/errors"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -31,91 +29,114 @@ import (
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
)

// WaitExtra calls WaitForPodReadyByLabel for each pod in labels list and returns any errors occurred.
// WaitExtra calls waitPodCondition for all system-critical pods including those with specified labels.
func WaitExtra(cs *kubernetes.Clientset, labels []string, timeout time.Duration) error {
klog.Infof("extra waiting for kube-system core pods %s to be Ready ...", labels)
klog.Infof("extra waiting up to %v for all system-critical pods including labels %v to be %q ...", timeout, labels, core.PodReady)
start := time.Now()
defer func() {
klog.Infof("duration metric: took %s for extra waiting for kube-system core pods to be Ready ...", time.Since(start))
klog.Infof("duration metric: took %s for extra waiting for all system-critical and pods with labels %v to be %q ...", time.Since(start), labels, core.PodReady)
}()

var errs []string
for _, label := range labels {
if err := waitForPodReadyByLabel(cs, label, "kube-system", timeout); err != nil {
errs = append(errs, fmt.Sprintf("%q: %q", label, err.Error()))
}
pods, err := cs.CoreV1().Pods(meta.NamespaceSystem).List(meta.ListOptions{})
if err != nil {
return fmt.Errorf("error listing pods in %q namespace: %w", meta.NamespaceSystem, err)
}
if errs != nil {
return fmt.Errorf(strings.Join(errs, ", "))

for _, pod := range pods.Items {
if time.Since(start) > timeout {
return fmt.Errorf("timed out waiting %v for all system-critical and pods with labels %v to be %q", timeout, labels, core.NodeReady)
}

for k, v := range pod.Labels {
label := fmt.Sprintf("%s=%s", k, v)
match := false
for _, l := range labels {
if l == label {
match = true
break
}
}
// ignore system-critical pods' non-essential labels
if !match && pod.Namespace != meta.NamespaceSystem && k != "k8s-app" && k != "component" {
continue
}
if match || pod.Spec.PriorityClassName == "system-cluster-critical" || pod.Spec.PriorityClassName == "system-node-critical" {
if err := waitPodCondition(cs, pod.Name, pod.Namespace, core.PodReady, timeout); err != nil {
klog.Errorf("WaitExtra: %v", err)
}
break
}
}
}

return nil
}

// waitForPodReadyByLabel waits for pod with label ([key:]val) in a namespace to be in Ready condition.
// If namespace is not provided, it defaults to "kube-system".
// If label key is not provided, it will try with "component" and "k8s-app".
func waitForPodReadyByLabel(cs *kubernetes.Clientset, label, namespace string, timeout time.Duration) error {
klog.Infof("waiting %v for pod with %q label in %q namespace to be Ready ...", timeout, label, namespace)
// waitPodCondition waits for specified condition of podName in a namespace.
func waitPodCondition(cs *kubernetes.Clientset, name, namespace string, condition core.PodConditionType, timeout time.Duration) error {
klog.Infof("waiting up to %v for pod %q in %q namespace to be %q ...", timeout, name, namespace, condition)
start := time.Now()
defer func() {
klog.Infof("duration metric: took %v to run WaitForPodReadyByLabel for pod with %q label in %q namespace ...", time.Since(start), label, namespace)
klog.Infof("duration metric: took %v waiting for pod %q in %q namespace to be %q ...", time.Since(start), name, namespace, condition)
}()

if namespace == "" {
namespace = "kube-system"
}

lkey := ""
lval := ""
l := strings.Split(label, ":")
switch len(l) {
case 1: // treat as no label key provided, just val
lval = strings.TrimSpace(l[0])
case 2:
lkey = strings.TrimSpace(l[0])
lval = strings.TrimSpace(l[1])
default:
return fmt.Errorf("pod label %q is malformed", label)
}

lap := time.Now()
checkReady := func() (bool, error) {
checkCondition := func() (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("wait for pod with %q label in %q namespace to be Ready timed out", label, namespace)
return false, fmt.Errorf("timed out waiting %v for pod %q in %q namespace to be %q (will not retry!)", timeout, name, namespace, condition)
}

status, reason := podConditionStatus(cs, name, namespace, condition)
if status == core.ConditionTrue {
klog.Info(reason)
return true, nil
}
pods, err := cs.CoreV1().Pods(namespace).List(meta.ListOptions{})
if err != nil {
klog.Infof("error listing pods in %q namespace, will retry: %v", namespace, err)
return false, nil
if status == core.ConditionUnknown {
klog.Info(reason)
return false, fmt.Errorf(reason)
}
for _, pod := range pods.Items {
for k, v := range pod.ObjectMeta.Labels {
if ((lkey == "" && (k == "component" || k == "k8s-app")) || lkey == k) && v == lval {
ready, reason := IsPodReady(&pod)
if ready {
klog.Info(reason)
return true, nil
}
// reduce log spam
if time.Since(lap) > (1 * time.Second) {
klog.Info(reason)
lap = time.Now()
}
return false, nil
}
}
// reduce log spam
if time.Since(lap) > (2 * time.Second) {
klog.Info(reason)
lap = time.Now()
}
klog.Infof("pod with %q label in %q namespace was not found, will retry", label, namespace)
return false, nil
}
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, checkReady); err != nil {
return errors.Wrapf(err, "wait pod Ready")
if err := wait.PollImmediate(kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, checkCondition); err != nil {
return fmt.Errorf("waitPodCondition: %w", err)
}

return nil
}

// podConditionStatus returns if pod is in specified condition and verbose reason.
func podConditionStatus(cs *kubernetes.Clientset, name, namespace string, condition core.PodConditionType) (status core.ConditionStatus, reason string) {
pod, err := cs.CoreV1().Pods(namespace).Get(name, meta.GetOptions{})
if err != nil {
return core.ConditionUnknown, fmt.Sprintf("error getting pod %q in %q namespace: %v", name, namespace, err)
}

// check if undelying node is Ready - in case we got stale data about the pod
if pod.Spec.NodeName != "" {
if status, reason := nodeConditionStatus(cs, pod.Spec.NodeName, core.NodeReady); status != core.ConditionTrue {
return core.ConditionUnknown, fmt.Sprintf("node %q hosting pod %q in %q namespace is currently not %q: %v", pod.Spec.NodeName, name, namespace, core.NodeReady, reason)
}
}

if pod.Status.Phase != core.PodRunning && pod.Status.Phase != core.PodPending {
return core.ConditionUnknown, fmt.Sprintf("pod %q in %q namespace has status phase %q (skipping!): %+v", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status)
}

for _, c := range pod.Status.Conditions {
if c.Type == condition {
return c.Status, fmt.Sprintf("pod %q in %q namespace has status %q:%q", pod.Name, pod.Namespace, condition, c.Status)
}
}

// assume transient condition
return core.ConditionFalse, fmt.Sprintf("pod %q in %q namespace doesn't have %q status: %+v", pod.Name, pod.Namespace, core.PodReady, pod.Status)
}

// IsPodReady returns if pod is Ready and verbose reason.
func IsPodReady(pod *core.Pod) (ready bool, reason string) {
if pod.Status.Phase != core.PodRunning {
Expand Down
Loading

0 comments on commit e1c872a

Please sign in to comment.