Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add wait-timeout flag to start command and refactor util/kubernetes #5121

Merged
merged 7 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/minikube/pkg/minikube/bootstrapper/kubeadm"
"k8s.io/minikube/pkg/minikube/cluster"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
cfg "k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/cruntime"
Expand Down Expand Up @@ -102,6 +103,7 @@ const (
dnsProxy = "dns-proxy"
hostDNSResolver = "host-dns-resolver"
waitUntilHealthy = "wait"
waitTimeout = "wait-timeout"
)

var (
Expand All @@ -111,7 +113,7 @@ var (
insecureRegistry []string
apiServerNames []string
apiServerIPs []net.IP
extraOptions pkgutil.ExtraOptionSlice
extraOptions config.ExtraOptionSlice
)

func init() {
Expand Down Expand Up @@ -148,6 +150,7 @@ func initMinikubeFlags() {
startCmd.Flags().String(networkPlugin, "", "The name of the network plugin.")
startCmd.Flags().Bool(enableDefaultCNI, false, "Enable the default CNI plugin (/etc/cni/net.d/k8s.conf). Used in conjunction with \"--network-plugin=cni\".")
startCmd.Flags().Bool(waitUntilHealthy, true, "Wait until Kubernetes core services are healthy before exiting.")
startCmd.Flags().Duration(waitTimeout, 3*time.Minute, "max time to wait per Kubernetes core services to be healthy.")
}

// initKubernetesFlags inits the commandline flags for kubernetes related options
Expand Down Expand Up @@ -321,7 +324,7 @@ func runStart(cmd *cobra.Command, args []string) {
// special ops for none driver, like change minikube directory.
prepareNone(viper.GetString(vmDriver))
if viper.GetBool(waitUntilHealthy) {
if err := bs.WaitCluster(config.KubernetesConfig); err != nil {
if err := bs.WaitCluster(config.KubernetesConfig, viper.GetDuration(waitTimeout)); err != nil {
exit.WithError("Wait failed", err)
}
}
Expand Down Expand Up @@ -538,8 +541,8 @@ func validateConfig() {

// check that kubeadm extra args contain only whitelisted parameters
for param := range extraOptions.AsMap().Get(kubeadm.Kubeadm) {
if !pkgutil.ContainsString(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmCmdParam], param) &&
!pkgutil.ContainsString(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmConfigParam], param) {
if !cfg.ContainsParam(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmCmdParam], param) &&
!cfg.ContainsParam(kubeadm.KubeadmExtraArgsWhitelist[kubeadm.KubeadmConfigParam], param) {
exit.UsageT("Sorry, the kubeadm.{{.parameter_name}} parameter is currently not supported by --extra-config", out.V{"parameter_name": param})
}
}
Expand Down
8 changes: 5 additions & 3 deletions hack/jenkins/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ export MINIKUBE_HOME="${TEST_HOME}/.minikube"
export MINIKUBE_WANTREPORTERRORPROMPT=False
export KUBECONFIG="${TEST_HOME}/kubeconfig"

# Build the gvisor image. This will be copied into minikube and loaded by ctr.
# Used by TestContainerd for Gvisor Test.
docker build -t gcr.io/k8s-minikube/gvisor-addon:latest -f testdata/gvisor-addon-Dockerfile out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added by Priya to fix the gvisor test ( to be added before the integration tests) but it was added after the minikube clean up ! so it was not being used by the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only moved the command up the script



# Display the default image URL
echo ""
echo ">> ISO URL"
Expand Down Expand Up @@ -274,9 +279,6 @@ ${SUDO_PREFIX} rm -f "${KUBECONFIG}" || true
rmdir "${TEST_HOME}"
echo ">> ${TEST_HOME} completed at $(date)"

# Build the gvisor image. This will be copied into minikube and loaded by ctr.
docker build -t gcr.io/k8s-minikube/gvisor-addon:latest -f testdata/gvisor-addon-Dockerfile out

if [[ "${MINIKUBE_LOCATION}" != "master" ]]; then
readonly target_url="https://storage.googleapis.com/minikube-builds/logs/${MINIKUBE_LOCATION}/${JOB_NAME}.txt"
curl -s "https://api.github.com/repos/kubernetes/minikube/statuses/${COMMIT}?access_token=$access_token" \
Expand Down
144 changes: 19 additions & 125 deletions pkg/util/kubernetes.go → pkg/kapi/kapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package util
package kapi

import (
"context"
Expand All @@ -29,15 +29,13 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
kconst "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/minikube/pkg/minikube/proxy"
)

Expand All @@ -48,30 +46,8 @@ var (
ReasonableStartTime = time.Minute * 5
)

// PodStore stores pods
type PodStore struct {
cache.Store
stopCh chan struct{}
Reflector *cache.Reflector
}

// List lists the pods
func (s *PodStore) List() []*core.Pod {
objects := s.Store.List()
pods := make([]*core.Pod, 0)
for _, o := range objects {
pods = append(pods, o.(*core.Pod))
}
return pods
}

// Stop stops the pods
func (s *PodStore) Stop() {
close(s.stopCh)
}

// GetClient gets the client from config
func GetClient(kubectlContext ...string) (kubernetes.Interface, error) {
// Client gets the kuberentes client from default kubeconfig
func Client(kubectlContext ...string) (kubernetes.Interface, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
if kubectlContext != nil {
Expand All @@ -92,50 +68,16 @@ func GetClient(kubectlContext ...string) (kubernetes.Interface, error) {
return client, nil
}

// NewPodStore creates a new PodStore
func NewPodStore(c kubernetes.Interface, namespace string, label fmt.Stringer, field fmt.Stringer) *PodStore {
lw := &cache.ListWatch{
ListFunc: func(options meta.ListOptions) (runtime.Object, error) {
options.LabelSelector = label.String()
options.FieldSelector = field.String()
obj, err := c.CoreV1().Pods(namespace).List(options)
return runtime.Object(obj), err
},
WatchFunc: func(options meta.ListOptions) (watch.Interface, error) {
options.LabelSelector = label.String()
options.FieldSelector = field.String()
return c.CoreV1().Pods(namespace).Watch(options)
},
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &core.Pod{}, store, 0)
go reflector.Run(stopCh)
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
}

// StartPods starts all pods
func StartPods(c kubernetes.Interface, namespace string, pod core.Pod, waitForRunning bool) error {
pod.ObjectMeta.Labels["name"] = pod.Name
if waitForRunning {
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": pod.Name}))
err := WaitForPodsWithLabelRunning(c, namespace, label)
if err != nil {
return fmt.Errorf("error waiting for pod %s to be running: %v", pod.Name, err)
}
}
return nil
}

// WaitForPodsWithLabelRunning waits for all matching pods to become Running and at least one matching pod exists.
func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector) error {
func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels.Selector, timeOut ...time.Duration) error {
start := time.Now()
glog.Infof("Waiting for pod with label %q in ns %q ...", ns, label)
lastKnownPodNumber := -1
return wait.PollImmediate(constants.APICallRetryInterval, ReasonableStartTime, func() (bool, error) {
f := func() (bool, error) {
listOpts := meta.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(listOpts)
if err != nil {
glog.Infof("error getting Pods with label selector %q [%v]\n", label.String(), err)
glog.Infof("temporary error: getting Pods with label selector %q : [%v]\n", label.String(), err)
return false, nil
}

Expand All @@ -150,45 +92,23 @@ func WaitForPodsWithLabelRunning(c kubernetes.Interface, ns string, label labels

for _, pod := range pods.Items {
if pod.Status.Phase != core.PodRunning {
glog.Infof("waiting for pod %q, current state: %s: [%v]\n", label.String(), pod.Status.Phase, err)
return false, nil
}
}

return true, nil
})
}

// WaitForPodDelete waits for a pod to be deleted
func WaitForPodDelete(c kubernetes.Interface, ns string, label fmt.Stringer) error {
return wait.PollImmediate(constants.APICallRetryInterval, ReasonableMutateTime, func() (bool, error) {
listOpts := meta.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(listOpts)
if err != nil {
glog.Infof("error getting Pods with label selector %q [%v]\n", label.String(), err)
return false, nil
}
return len(pods.Items) == 0, nil
})
}

// WaitForEvent waits for the given event to appear
func WaitForEvent(c kubernetes.Interface, ns string, reason string) error {
return wait.PollImmediate(constants.APICallRetryInterval, ReasonableMutateTime, func() (bool, error) {
events, err := c.EventsV1beta1().Events("default").List(meta.ListOptions{})
if err != nil {
glog.Infof("error getting events: %v", err)
return false, nil
}
for _, e := range events.Items {
if e.Reason == reason {
return true, nil
}
}
return false, nil
})
}
t := ReasonableStartTime
if timeOut != nil {
t = timeOut[0]
}
err := wait.PollImmediate(kconst.APICallRetryInterval, t, f)
glog.Infof("duration metric: took %s to wait for %s ...", time.Since(start), label)
return err
}

// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status.
// WaitForRCToStabilize waits till the RC has a matching generation/replica count between spec and status. used by integration tests
func WaitForRCToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error {
options := meta.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
Expand Down Expand Up @@ -222,7 +142,7 @@ func WaitForRCToStabilize(c kubernetes.Interface, ns, name string, timeout time.
return err
}

// WaitForDeploymentToStabilize waits till the Deployment has a matching generation/replica count between spec and status.
// WaitForDeploymentToStabilize waits till the Deployment has a matching generation/replica count between spec and status. used by integrationt tests
func WaitForDeploymentToStabilize(c kubernetes.Interface, ns, name string, timeout time.Duration) error {
options := meta.ListOptions{FieldSelector: fields.Set{
"metadata.name": name,
Expand Down Expand Up @@ -281,32 +201,6 @@ func WaitForService(c kubernetes.Interface, namespace, name string, exist bool,
return nil
}

// WaitForServiceEndpointsNum waits until the amount of endpoints that implement service to expectNum.
func WaitForServiceEndpointsNum(c kubernetes.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
return wait.Poll(interval, timeout, func() (bool, error) {
glog.Infof("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
list, err := c.CoreV1().Endpoints(namespace).List(meta.ListOptions{})
if err != nil {
return false, err
}

for _, e := range list.Items {
if e.Name == serviceName && countEndpointsNum(&e) == expectNum {
return true, nil
}
}
return false, nil
})
}

func countEndpointsNum(e *core.Endpoints) int {
num := 0
for _, sub := range e.Subsets {
num += len(sub.Addresses)
}
return num
}

// IsRetryableAPIError returns if this error is retryable or not
func IsRetryableAPIError(err error) bool {
return apierr.IsTimeout(err) || apierr.IsServerTimeout(err) || apierr.IsTooManyRequests(err) || apierr.IsInternalError(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/minikube/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bootstrapper

import (
"net"
"time"

"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
Expand All @@ -39,7 +40,7 @@ type Bootstrapper interface {
UpdateCluster(config.KubernetesConfig) error
RestartCluster(config.KubernetesConfig) error
DeleteCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig) error
WaitCluster(config.KubernetesConfig, time.Duration) error
// LogCommands returns a map of log type to a command which will display that log.
LogCommands(LogOptions) map[string]string
SetupCerts(cfg config.KubernetesConfig) error
Expand Down
Loading