Skip to content

Commit 1665392

Browse files
authored
[core] add support for context (#2717)
1 parent c3ab4eb commit 1665392

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+826
-794
lines changed

cmd/client-keystone-auth/main.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"io"
2223
"net/http"
@@ -156,7 +157,7 @@ func main() {
156157
Use: "client-keystone-auth",
157158
Short: "Keystone client credential plugin for Kubernetes",
158159
Run: func(cmd *cobra.Command, args []string) {
159-
handle()
160+
handle(context.Background())
160161
},
161162
Version: version.Version,
162163
}
@@ -177,7 +178,7 @@ func main() {
177178
os.Exit(code)
178179
}
179180

180-
func handle() {
181+
func handle(ctx context.Context) {
181182
// Generate Gophercloud Auth Options based on input data from stdin
182183
// if IsTerminal returns "true", or from env variables otherwise.
183184
if !term.IsTerminal(int(os.Stdin.Fd())) {
@@ -214,7 +215,7 @@ func handle() {
214215
options.ClientKeyPath = clientKeyPath
215216
options.ClientCAPath = clientCAPath
216217

217-
token, err := keystone.GetToken(options)
218+
token, err := keystone.GetToken(ctx, options)
218219
if err != nil {
219220
if gophercloud.ResponseCodeIs(err, http.StatusUnauthorized) {
220221
fmt.Println(errRespTemplate)

cmd/k8s-keystone-auth/main.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ limitations under the License.
1515
package main
1616

1717
import (
18+
"context"
1819
"os"
1920

2021
"github.com/spf13/cobra"
@@ -38,7 +39,7 @@ func main() {
3839
os.Exit(1)
3940
}
4041

41-
keystoneAuth, err := keystone.NewKeystoneAuth(config)
42+
keystoneAuth, err := keystone.NewKeystoneAuth(context.Background(), config)
4243
if err != nil {
4344
klog.Errorf("%v", err)
4445
os.Exit(1)

pkg/autohealing/cloudprovider/cloudprovider.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package cloudprovider
1818

1919
import (
20+
"context"
21+
2022
"k8s.io/client-go/kubernetes"
2123
log "k8s.io/klog/v2"
2224

@@ -34,17 +36,17 @@ type CloudProvider interface {
3436
GetName() string
3537

3638
// Update cluster health status.
37-
UpdateHealthStatus([]healthcheck.NodeInfo, []healthcheck.NodeInfo) error
39+
UpdateHealthStatus(context.Context, []healthcheck.NodeInfo, []healthcheck.NodeInfo) error
3840

3941
// Repair triggers the node repair process in the cloud.
40-
Repair([]healthcheck.NodeInfo) error
42+
Repair(context.Context, []healthcheck.NodeInfo) error
4143

4244
// Enabled decides if the repair should be triggered.
4345
// It's recommended that the `Enabled()` function of the cloud provider doesn't allow to re-trigger when the repair
4446
// is in place, e.g. before the repair process is finished, `Enabled()` should return false so that we won't
4547
// re-trigger the repair process in the subsequent checks.
4648
// This function also provides the cluster admin the capability to disable the cluster auto healing on the fly.
47-
Enabled() bool
49+
Enabled(context.Context) bool
4850
}
4951

5052
type RegisterFunc func(config config.Config, client kubernetes.Interface) (CloudProvider, error)

pkg/autohealing/cloudprovider/openstack/provider.go

+29-29
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ func (provider CloudProvider) GetName() string {
9595
}
9696

9797
// getStackName finds the name of a stack matching a given ID.
98-
func (provider *CloudProvider) getStackName(stackID string) (string, error) {
99-
stack, err := stacks.Find(context.TODO(), provider.Heat, stackID).Extract()
98+
func (provider *CloudProvider) getStackName(ctx context.Context, stackID string) (string, error) {
99+
stack, err := stacks.Find(ctx, provider.Heat, stackID).Extract()
100100
if err != nil {
101101
return "", err
102102
}
@@ -108,14 +108,14 @@ func (provider *CloudProvider) getStackName(stackID string) (string, error) {
108108
// masters and minions(workers). The key in the map is the server/instance ID
109109
// in Nova and the value is the resource ID and name of the server, and the
110110
// parent stack ID and name.
111-
func (provider *CloudProvider) getAllStackResourceMapping(stackName, stackID string) (m map[string]ResourceStackRelationship, err error) {
111+
func (provider *CloudProvider) getAllStackResourceMapping(ctx context.Context, stackName, stackID string) (m map[string]ResourceStackRelationship, err error) {
112112
if provider.ResourceStackMapping != nil {
113113
return provider.ResourceStackMapping, nil
114114
}
115115

116116
mapping := make(map[string]ResourceStackRelationship)
117117

118-
serverPages, err := stackresources.List(provider.Heat, stackName, stackID, stackresources.ListOpts{Depth: 2}).AllPages(context.TODO())
118+
serverPages, err := stackresources.List(provider.Heat, stackName, stackID, stackresources.ListOpts{Depth: 2}).AllPages(ctx)
119119
if err != nil {
120120
return m, err
121121
}
@@ -266,7 +266,7 @@ func (provider CloudProvider) waitForServerDetachVolumes(serverID string, timeou
266266
// will be kept as False, which means the node need to be rebuilt to fix it, otherwise it means the has been processed.
267267
//
268268
// The bool type return value means that if the node has been processed from a first time repair PoV
269-
func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID string, firstTimeRebootNodes map[string]healthcheck.NodeInfo) (bool, error) {
269+
func (provider CloudProvider) firstTimeRepair(ctx context.Context, n healthcheck.NodeInfo, serverID string, firstTimeRebootNodes map[string]healthcheck.NodeInfo) (bool, error) {
270270
var firstTimeUnhealthy = true
271271
for id := range unHealthyNodes {
272272
log.V(5).Infof("comparing server ID %s with known broken ID %s", serverID, id)
@@ -281,7 +281,7 @@ func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID s
281281
if firstTimeUnhealthy {
282282
log.Infof("rebooting node %s to repair it", serverID)
283283

284-
if res := servers.Reboot(context.TODO(), provider.Nova, serverID, servers.RebootOpts{Type: servers.SoftReboot}); res.Err != nil {
284+
if res := servers.Reboot(ctx, provider.Nova, serverID, servers.RebootOpts{Type: servers.SoftReboot}); res.Err != nil {
285285
// Usually it means the node is being rebooted
286286
log.Warningf("failed to reboot node %s, error: %v", serverID, res.Err)
287287
if strings.Contains(res.Err.Error(), "reboot_started") {
@@ -351,7 +351,7 @@ func (provider CloudProvider) firstTimeRepair(n healthcheck.NodeInfo, serverID s
351351
// - Heat stack ID and resource ID.
352352
//
353353
// For worker nodes: Call Magnum resize API directly.
354-
func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
354+
func (provider CloudProvider) Repair(ctx context.Context, nodes []healthcheck.NodeInfo) error {
355355
if len(nodes) == 0 {
356356
return nil
357357
}
@@ -370,12 +370,12 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
370370

371371
firstTimeRebootNodes := make(map[string]healthcheck.NodeInfo)
372372

373-
err := provider.UpdateHealthStatus(masters, workers)
373+
err := provider.UpdateHealthStatus(ctx, masters, workers)
374374
if err != nil {
375375
return fmt.Errorf("failed to update the health status of cluster %s, error: %v", clusterName, err)
376376
}
377377

378-
cluster, err := clusters.Get(context.TODO(), provider.Magnum, clusterName).Extract()
378+
cluster, err := clusters.Get(ctx, provider.Magnum, clusterName).Extract()
379379
if err != nil {
380380
return fmt.Errorf("failed to get the cluster %s, error: %v", clusterName, err)
381381
}
@@ -389,7 +389,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
389389
continue
390390
}
391391

392-
if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
392+
if processed, err := provider.firstTimeRepair(ctx, n, serverID, firstTimeRebootNodes); err != nil {
393393
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
394394
} else if processed {
395395
log.Infof("Node %s has been processed", serverID)
@@ -405,7 +405,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
405405
}
406406

407407
nodesToReplace.Insert(serverID)
408-
ng, err := provider.getNodeGroup(clusterName, n)
408+
ng, err := provider.getNodeGroup(ctx, clusterName, n)
409409
ngName := "default-worker"
410410
ngNodeCount := &cluster.NodeCount
411411
if err == nil {
@@ -419,7 +419,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
419419
NodesToRemove: nodesToReplace.List(),
420420
}
421421

422-
clusters.Resize(context.TODO(), provider.Magnum, clusterName, opts)
422+
clusters.Resize(ctx, provider.Magnum, clusterName, opts)
423423
// Wait 10 seconds to make sure Magnum has already got the request
424424
// to avoid sending all of the resize API calls at the same time.
425425
time.Sleep(10 * time.Second)
@@ -432,14 +432,14 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
432432
log.Infof("Cluster %s resized", clusterName)
433433
}
434434
} else {
435-
clusterStackName, err := provider.getStackName(cluster.StackID)
435+
clusterStackName, err := provider.getStackName(ctx, cluster.StackID)
436436
if err != nil {
437437
return fmt.Errorf("failed to get the Heat stack for cluster %s, error: %v", clusterName, err)
438438
}
439439

440440
// In order to rebuild the nodes by Heat stack update, we need to know the parent stack ID of the resources and
441441
// mark them unhealthy first.
442-
allMapping, err := provider.getAllStackResourceMapping(clusterStackName, cluster.StackID)
442+
allMapping, err := provider.getAllStackResourceMapping(ctx, clusterStackName, cluster.StackID)
443443
if err != nil {
444444
return fmt.Errorf("failed to get the resource stack mapping for cluster %s, error: %v", clusterName, err)
445445
}
@@ -456,7 +456,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
456456
continue
457457
}
458458

459-
if processed, err := provider.firstTimeRepair(n, serverID, firstTimeRebootNodes); err != nil {
459+
if processed, err := provider.firstTimeRepair(ctx, n, serverID, firstTimeRebootNodes); err != nil {
460460
log.Warningf("Failed to process if the node %s is in first time repair , error: %v", serverID, err)
461461
} else if processed {
462462
log.Infof("Node %s has been processed", serverID)
@@ -468,7 +468,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
468468
} else {
469469
// Mark root volume as unhealthy
470470
if rootVolumeID != "" {
471-
err = stackresources.MarkUnhealthy(context.TODO(), provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
471+
err = stackresources.MarkUnhealthy(ctx, provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, rootVolumeID, opts).ExtractErr()
472472
if err != nil {
473473
log.Errorf("failed to mark resource %s unhealthy, error: %v", rootVolumeID, err)
474474
}
@@ -479,7 +479,7 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
479479
log.Warningf("Failed to shutdown the server %s, error: %v", serverID, err)
480480
// If the server is failed to delete after 180s, then delete it to avoid the
481481
// stack update failure later.
482-
res := servers.ForceDelete(context.TODO(), provider.Nova, serverID)
482+
res := servers.ForceDelete(ctx, provider.Nova, serverID)
483483
if res.Err != nil {
484484
log.Warningf("Failed to delete the server %s, error: %v", serverID, err)
485485
}
@@ -488,15 +488,15 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
488488
log.Infof("Marking Nova VM %s(Heat resource %s) unhealthy for Heat stack %s", serverID, allMapping[serverID].ResourceID, cluster.StackID)
489489

490490
// Mark VM as unhealthy
491-
err = stackresources.MarkUnhealthy(context.TODO(), provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
491+
err = stackresources.MarkUnhealthy(ctx, provider.Heat, allMapping[serverID].StackName, allMapping[serverID].StackID, allMapping[serverID].ResourceID, opts).ExtractErr()
492492
if err != nil {
493493
log.Errorf("failed to mark resource %s unhealthy, error: %v", serverID, err)
494494
}
495495

496496
delete(unHealthyNodes, serverID)
497497
}
498498

499-
if err := stacks.UpdatePatch(context.TODO(), provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
499+
if err := stacks.UpdatePatch(ctx, provider.Heat, clusterStackName, cluster.StackID, stacks.UpdateOpts{}).ExtractErr(); err != nil {
500500
return fmt.Errorf("failed to update Heat stack to rebuild resources, error: %v", err)
501501
}
502502

@@ -514,26 +514,26 @@ func (provider CloudProvider) Repair(nodes []healthcheck.NodeInfo) error {
514514
log.Infof("Skip node delete for %s because it's repaired by reboot", serverID)
515515
continue
516516
}
517-
if err := provider.KubeClient.CoreV1().Nodes().Delete(context.TODO(), n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
517+
if err := provider.KubeClient.CoreV1().Nodes().Delete(ctx, n.KubeNode.Name, metav1.DeleteOptions{}); err != nil {
518518
log.Errorf("Failed to remove the node %s from cluster, error: %v", n.KubeNode.Name, err)
519519
}
520520
}
521521

522522
return nil
523523
}
524524

525-
func (provider CloudProvider) getNodeGroup(clusterName string, node healthcheck.NodeInfo) (nodegroups.NodeGroup, error) {
525+
func (provider CloudProvider) getNodeGroup(ctx context.Context, clusterName string, node healthcheck.NodeInfo) (nodegroups.NodeGroup, error) {
526526
var ng nodegroups.NodeGroup
527527

528-
ngPages, err := nodegroups.List(provider.Magnum, clusterName, nodegroups.ListOpts{}).AllPages(context.TODO())
528+
ngPages, err := nodegroups.List(provider.Magnum, clusterName, nodegroups.ListOpts{}).AllPages(ctx)
529529
if err == nil {
530530
ngs, err := nodegroups.ExtractNodeGroups(ngPages)
531531
if err != nil {
532532
log.Warningf("Failed to get node group for cluster %s, error: %v", clusterName, err)
533533
return ng, err
534534
}
535535
for _, ng := range ngs {
536-
ngInfo, err := nodegroups.Get(context.TODO(), provider.Magnum, clusterName, ng.UUID).Extract()
536+
ngInfo, err := nodegroups.Get(ctx, provider.Magnum, clusterName, ng.UUID).Extract()
537537
if err != nil {
538538
log.Warningf("Failed to get node group for cluster %s, error: %v", clusterName, err)
539539
return ng, err
@@ -555,7 +555,7 @@ func (provider CloudProvider) getNodeGroup(clusterName string, node healthcheck.
555555

556556
// UpdateHealthStatus can update the cluster health status to reflect the
557557
// real-time health status of the k8s cluster.
558-
func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo, workers []healthcheck.NodeInfo) error {
558+
func (provider CloudProvider) UpdateHealthStatus(ctx context.Context, masters []healthcheck.NodeInfo, workers []healthcheck.NodeInfo) error {
559559
log.Infof("start to update cluster health status.")
560560
clusterName := provider.Config.ClusterName
561561

@@ -600,7 +600,7 @@ func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo,
600600
}
601601

602602
log.Infof("updating cluster health status as %s for reason %s.", healthStatus, healthStatusReason)
603-
res := clusters.Update(context.TODO(), provider.Magnum, clusterName, updateOpts)
603+
res := clusters.Update(ctx, provider.Magnum, clusterName, updateOpts)
604604

605605
if res.Err != nil {
606606
return fmt.Errorf("failed to update the health status of cluster %s error: %v", clusterName, res.Err)
@@ -617,10 +617,10 @@ func (provider CloudProvider) UpdateHealthStatus(masters []healthcheck.NodeInfo,
617617
// There are two conditions that we disable the repair:
618618
// - The cluster admin disables the auto healing via OpenStack API.
619619
// - The Magnum cluster is not in stable status.
620-
func (provider CloudProvider) Enabled() bool {
620+
func (provider CloudProvider) Enabled(ctx context.Context) bool {
621621
clusterName := provider.Config.ClusterName
622622

623-
cluster, err := clusters.Get(context.TODO(), provider.Magnum, clusterName).Extract()
623+
cluster, err := clusters.Get(ctx, provider.Magnum, clusterName).Extract()
624624
if err != nil {
625625
log.Warningf("failed to get the cluster %s, error: %v", clusterName, err)
626626
return false
@@ -644,12 +644,12 @@ func (provider CloudProvider) Enabled() bool {
644644
return false
645645
}
646646

647-
clusterStackName, err := provider.getStackName(cluster.StackID)
647+
clusterStackName, err := provider.getStackName(ctx, cluster.StackID)
648648
if err != nil {
649649
log.Warningf("Failed to get the Heat stack ID for cluster %s, error: %v", clusterName, err)
650650
return false
651651
}
652-
stack, err := stacks.Get(context.TODO(), provider.Heat, clusterStackName, cluster.StackID).Extract()
652+
stack, err := stacks.Get(ctx, provider.Heat, clusterStackName, cluster.StackID).Extract()
653653
if err != nil {
654654
log.Warningf("Failed to get Heat stack %s for cluster %s, error: %v", cluster.StackID, clusterName, err)
655655
return false

pkg/autohealing/cmd/root.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ var rootCmd = &cobra.Command{
5050
"OpenStack is supported by default.",
5151

5252
Run: func(cmd *cobra.Command, args []string) {
53+
ctx := context.TODO()
5354
autohealer := controller.NewController(conf)
5455

5556
if !conf.LeaderElect {
56-
autohealer.Start(context.TODO())
57+
autohealer.Start(ctx)
5758
panic("unreachable")
5859
}
5960

@@ -63,7 +64,7 @@ var rootCmd = &cobra.Command{
6364
}
6465

6566
// Try and become the leader and start autohealing loops
66-
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
67+
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
6768
Lock: lock,
6869
LeaseDuration: 20 * time.Second,
6970
RenewDeadline: 15 * time.Second,

0 commit comments

Comments
 (0)