Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] add support for context #2717

Merged
merged 1 commit into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/client-keystone-auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -156,7 +157,7 @@ func main() {
Use: "client-keystone-auth",
Short: "Keystone client credential plugin for Kubernetes",
Run: func(cmd *cobra.Command, args []string) {
handle()
handle(context.Background())
},
Version: version.Version,
}
Expand All @@ -177,7 +178,7 @@ func main() {
os.Exit(code)
}

func handle() {
func handle(ctx context.Context) {
// Generate Gophercloud Auth Options based on input data from stdin
// if IsTerminal returns "true", or from env variables otherwise.
if !term.IsTerminal(int(os.Stdin.Fd())) {
Expand Down Expand Up @@ -214,7 +215,7 @@ func handle() {
options.ClientKeyPath = clientKeyPath
options.ClientCAPath = clientCAPath

token, err := keystone.GetToken(options)
token, err := keystone.GetToken(ctx, options)
if err != nil {
if gophercloud.ResponseCodeIs(err, http.StatusUnauthorized) {
fmt.Println(errRespTemplate)
Expand Down
3 changes: 2 additions & 1 deletion cmd/k8s-keystone-auth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package main

import (
"context"
"os"

"github.com/spf13/cobra"
Expand All @@ -38,7 +39,7 @@ func main() {
os.Exit(1)
}

keystoneAuth, err := keystone.NewKeystoneAuth(config)
keystoneAuth, err := keystone.NewKeystoneAuth(context.Background(), config)
if err != nil {
klog.Errorf("%v", err)
os.Exit(1)
Expand Down
8 changes: 5 additions & 3 deletions pkg/autohealing/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package cloudprovider

import (
"context"

"k8s.io/client-go/kubernetes"
log "k8s.io/klog/v2"

Expand All @@ -34,17 +36,17 @@ type CloudProvider interface {
GetName() string

// Update cluster health status.
UpdateHealthStatus([]healthcheck.NodeInfo, []healthcheck.NodeInfo) error
UpdateHealthStatus(context.Context, []healthcheck.NodeInfo, []healthcheck.NodeInfo) error

// Repair triggers the node repair process in the cloud.
Repair([]healthcheck.NodeInfo) error
Repair(context.Context, []healthcheck.NodeInfo) error

// Enabled decides if the repair should be triggered.
// It's recommended that the `Enabled()` function of the cloud provider doesn't allow to re-trigger when the repair
// is in place, e.g. before the repair process is finished, `Enabled()` should return false so that we won't
// re-trigger the repair process in the subsequent checks.
// This function also provides the cluster admin the capability to disable the cluster auto healing on the fly.
Enabled() bool
Enabled(context.Context) bool
}

type RegisterFunc func(config config.Config, client kubernetes.Interface) (CloudProvider, error)
Expand Down
58 changes: 29 additions & 29 deletions pkg/autohealing/cloudprovider/openstack/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (provider CloudProvider) GetName() string {
}

// getStackName finds the name of a stack matching a given ID.
func (provider *CloudProvider) getStackName(stackID string) (string, error) {
stack, err := stacks.Find(context.TODO(), provider.Heat, stackID).Extract()
func (provider *CloudProvider) getStackName(ctx context.Context, stackID string) (string, error) {
stack, err := stacks.Find(ctx, provider.Heat, stackID).Extract()
if err != nil {
return "", err
}
Expand All @@ -108,14 +108,14 @@ func (provider *CloudProvider) getStackName(stackID string) (string, error) {
// masters and minions(workers). The key in the map is the server/instance ID
// in Nova and the value is the resource ID and name of the server, and the
// parent stack ID and name.
func (provider *CloudProvider) getAllStackResourceMapping(stackName, stackID string) (m map[string]ResourceStackRelationship, err error) {
func (provider *CloudProvider) getAllStackResourceMapping(ctx context.Context, stackName, stackID string) (m map[string]ResourceStackRelationship, err error) {
if provider.ResourceStackMapping != nil {
return provider.ResourceStackMapping, nil
}

mapping := make(map[string]ResourceStackRelationship)

serverPages, err := stackresources.List(provider.Heat, stackName, stackID, stackresources.ListOpts{Depth: 2}).AllPages(context.TODO())
serverPages, err := stackresources.List(provider.Heat, stackName, stackID, stackresources.ListOpts{Depth: 2}).AllPages(ctx)
if err != nil {
return m, err
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func (provider CloudProvider) waitForServerDetachVolumes(serverID string, timeou
// will be kept as False, which means the node need to be rebuilt to fix it, otherwise it means the has been processed.
//
// The bool type return value means that if the node has been processed from a first time repair PoV
func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID string, firstTimeRebootNodes map[string]healthcheck.NodeInfo) (bool, error) {
func (provider CloudProvider) firstTimeRepair(ctx context.Context, n healthcheck.NodeInfo, serverID string, firstTimeRebootNodes map[string]healthcheck.NodeInfo) (bool, error) {
var firstTimeUnhealthy = true
for id := range unHealthyNodes {
log.V(5).Infof("comparing server ID %s with known broken ID %s", serverID, id)
Expand All @@ -281,7 +281,7 @@ func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID s
if firstTimeUnhealthy {
log.Infof("rebooting node %s to repair it", serverID)

if res := servers.Reboot(context.TODO(), provider.Nova, serverID, servers.RebootOpts{Type: servers.SoftReboot}); res.Err != nil {
if res := servers.Reboot(ctx, provider.Nova, serverID, servers.RebootOpts{Type: servers.SoftReboot}); res.Err != nil {
// Usually it means the node is being rebooted
log.Warningf("failed to reboot node %s, error: %v", serverID, res.Err)
if strings.Contains(res.Err.Error(), "reboot_started") {
Expand Down Expand Up @@ -351,7 +351,7 @@ func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID s
// - Heat stack ID and resource ID.
//
// For worker nodes: Call Magnum resize API directly.
func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
func (provider CloudProvider) Repair(ctx context.Context, nodes []healthcheck.NodeInfo) error {
if len(nodes) == 0 {
return nil
}
Expand All @@ -370,12 +370,12 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {

firstTimeRebootNodes := make(map[string]healthcheck.NodeInfo)

err := provider.UpdateHealthStatus(masters, workers)
err := provider.UpdateHealthStatus(ctx, masters, workers)
if err != nil {
return fmt.Errorf("failed to update the health status of cluster %s, error: %v", clusterName, err)
}

cluster, err := clusters.Get(context.TODO(), provider.Magnum, clusterName).Extract()
cluster, err := clusters.Get(ctx, provider.Magnum, clusterName).Extract()
if err != nil {
return fmt.Errorf("failed to get the cluster %s, error: %v", clusterName, err)
}
Expand All @@ -389,7 +389,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
continue
}

if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
if processed, err := provider.firstTimeRepair(ctx, n, serverID, firstTimeRebootNodes); err != nil {
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
} else if processed {
log.Infof("Node %s has been processed", serverID)
Expand All @@ -405,7 +405,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
}

nodesToReplace.Insert(serverID)
ng, err := provider.getNodeGroup(clusterName, n)
ng, err := provider.getNodeGroup(ctx, clusterName, n)
ngName := "default-worker"
ngNodeCount := &cluster.NodeCount
if err == nil {
Expand All @@ -419,7 +419,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
NodesToRemove: nodesToReplace.List(),
}

clusters.Resize(context.TODO(), provider.Magnum, clusterName, opts)
clusters.Resize(ctx, provider.Magnum, clusterName, opts)
// Wait 10 seconds to make sure Magnum has already got the request
// to avoid sending all of the resize API calls at the same time.
time.Sleep(10 * time.Second)
Expand All @@ -432,14 +432,14 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Infof("Cluster %s resized", clusterName)
}
} else {
clusterStackName, err := provider.getStackName(cluster.StackID)
clusterStackName, err := provider.getStackName(ctx, cluster.StackID)
if err != nil {
return fmt.Errorf("failed to get the Heat stack for cluster %s, error: %v", clusterName, err)
}

// In order to rebuild the nodes by Heat stack update, we need to know the parent stack ID of the resources and
// mark them unhealthy first.
allMapping, err := provider.getAllStackResourceMapping(clusterStackName, cluster.StackID)
allMapping, err := provider.getAllStackResourceMapping(ctx, clusterStackName, cluster.StackID)
if err != nil {
return fmt.Errorf("failed to get the resource stack mapping for cluster %s, error: %v", clusterName, err)
}
Expand All @@ -456,7 +456,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
continue
}

if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
if processed, err := provider.firstTimeRepair(ctx, n, serverID, firstTimeRebootNodes); err != nil {
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
} else if processed {
log.Infof("Node %s has been processed", serverID)
Expand All @@ -468,7 +468,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
} else {
// Mark root volume as unhealthy
if rootVolumeID != "" {
err = stackresources.MarkUnhealthy(context.TODO(), provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
err = stackresources.MarkUnhealthy(ctx, provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
if err != nil {
log.Errorf("failed to mark resource %s unhealthy, error: %v", rootVolumeID, err)
}
Expand All @@ -479,7 +479,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Warningf("Failed to shutdown the server %s, error: %v", serverID, err)
// If the server is failed to delete after 180s, then delete it to avoid the
// stack update failure later.
res := servers.ForceDelete(context.TODO(), provider.Nova, serverID)
res := servers.ForceDelete(ctx, provider.Nova, serverID)
if res.Err != nil {
log.Warningf("Failed to delete the server %s, error: %v", serverID, err)
}
Expand All @@ -488,15 +488,15 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Infof("Marking Nova VM %s(Heat resource %s) unhealthy for Heat stack %s", serverID, allMapping[serverID].ResourceID, cluster.StackID)

// Mark VM as unhealthy
err = stackresources.MarkUnhealthy(context.TODO(), provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
err = stackresources.MarkUnhealthy(ctx, provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
if err != nil {
log.Errorf("failed to mark resource %s unhealthy, error: %v", serverID, err)
}

delete(unHealthyNodes, serverID)
}

if err := stacks.UpdatePatch(context.TODO(), provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
if err := stacks.UpdatePatch(ctx, provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
return fmt.Errorf("failed to update Heat stack to rebuild resources, error: %v", err)
}

Expand All @@ -514,26 +514,26 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
log.Infof("Skip node delete for %s because it's repaired by reboot", serverID)
continue
}
if err := provider.KubeClient.CoreV1().Nodes().Delete(context.TODO(), n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
if err := provider.KubeClient.CoreV1().Nodes().Delete(ctx, n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
log.Errorf("Failed to remove the node %s from cluster, error: %v", n.KubeNode.Name, err)
}
}

return nil
}

func (provider CloudProvider) getNodeGroup(clusterName string, node healthcheck.NodeInfo) (nodegroups.NodeGroup, error) {
func (provider CloudProvider) getNodeGroup(ctx context.Context, clusterName string, node healthcheck.NodeInfo) (nodegroups.NodeGroup, error) {
var ng nodegroups.NodeGroup

ngPages, err := nodegroups.List(provider.Magnum, clusterName, nodegroups.ListOpts{}).AllPages(context.TODO())
ngPages, err := nodegroups.List(provider.Magnum, clusterName, nodegroups.ListOpts{}).AllPages(ctx)
if err == nil {
ngs, err := nodegroups.ExtractNodeGroups(ngPages)
if err != nil {
log.Warningf("Failed to get node group for cluster %s, error: %v", clusterName, err)
return ng, err
}
for _, ng := range ngs {
ngInfo, err := nodegroups.Get(context.TODO(), provider.Magnum, clusterName, ng.UUID).Extract()
ngInfo, err := nodegroups.Get(ctx, provider.Magnum, clusterName, ng.UUID).Extract()
if err != nil {
log.Warningf("Failed to get node group for cluster %s, error: %v", clusterName, err)
return ng, err
Expand All @@ -555,7 +555,7 @@ func (provider CloudProvider) getNodeGroup(clusterName string, node healthcheck.

// UpdateHealthStatus can update the cluster health status to reflect the
// real-time health status of the k8s cluster.
func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo, workers []healthcheck.NodeInfo) error {
func (provider CloudProvider) UpdateHealthStatus(ctx context.Context, masters []healthcheck.NodeInfo, workers []healthcheck.NodeInfo) error {
log.Infof("start to update cluster health status.")
clusterName := provider.Config.ClusterName

Expand Down Expand Up @@ -600,7 +600,7 @@ func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo,
}

log.Infof("updating cluster health status as %s for reason %s.", healthStatus, healthStatusReason)
res := clusters.Update(context.TODO(), provider.Magnum, clusterName, updateOpts)
res := clusters.Update(ctx, provider.Magnum, clusterName, updateOpts)

if res.Err != nil {
return fmt.Errorf("failed to update the health status of cluster %s error: %v", clusterName, res.Err)
Expand All @@ -617,10 +617,10 @@ func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo,
// There are two conditions that we disable the repair:
// - The cluster admin disables the auto healing via OpenStack API.
// - The Magnum cluster is not in stable status.
func (provider CloudProvider) Enabled() bool {
func (provider CloudProvider) Enabled(ctx context.Context) bool {
clusterName := provider.Config.ClusterName

cluster, err := clusters.Get(context.TODO(), provider.Magnum, clusterName).Extract()
cluster, err := clusters.Get(ctx, provider.Magnum, clusterName).Extract()
if err != nil {
log.Warningf("failed to get the cluster %s, error: %v", clusterName, err)
return false
Expand All @@ -644,12 +644,12 @@ func (provider CloudProvider) Enabled() bool {
return false
}

clusterStackName, err := provider.getStackName(cluster.StackID)
clusterStackName, err := provider.getStackName(ctx, cluster.StackID)
if err != nil {
log.Warningf("Failed to get the Heat stack ID for cluster %s, error: %v", clusterName, err)
return false
}
stack, err := stacks.Get(context.TODO(), provider.Heat, clusterStackName, cluster.StackID).Extract()
stack, err := stacks.Get(ctx, provider.Heat, clusterStackName, cluster.StackID).Extract()
if err != nil {
log.Warningf("Failed to get Heat stack %s for cluster %s, error: %v", cluster.StackID, clusterName, err)
return false
Expand Down
5 changes: 3 additions & 2 deletions pkg/autohealing/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ var rootCmd = &cobra.Command{
"OpenStack is supported by default.",

Run: func(cmd *cobra.Command, args []string) {
ctx := context.TODO()
autohealer := controller.NewController(conf)

if !conf.LeaderElect {
autohealer.Start(context.TODO())
autohealer.Start(ctx)
panic("unreachable")
}

Expand All @@ -63,7 +64,7 @@ var rootCmd = &cobra.Command{
}

// Try and become the leader and start autohealing loops
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 20 * time.Second,
RenewDeadline: 15 * time.Second,
Expand Down
Loading