Skip to content

Commit

Permalink
Add support for uncordoning nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
cezarsa committed Sep 11, 2020
1 parent 46799e2 commit 0f4b2e1
Show file tree
Hide file tree
Showing 7 changed files with 616 additions and 218 deletions.
19 changes: 13 additions & 6 deletions cmd/draino/draino.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ func main() {
Aggregation: view.Count(),
TagKeys: []tag.Key{kubernetes.TagResult},
}
nodesUncordoned = &view.View{
Name: "uncordoned_nodes_total",
Measure: kubernetes.MeasureNodesUncordoned,
Description: "Number of nodes uncordoned.",
Aggregation: view.Count(),
TagKeys: []tag.Key{kubernetes.TagResult},
}
nodesDrained = &view.View{
Name: "drained_nodes_total",
Measure: kubernetes.MeasureNodesDrained,
Expand All @@ -106,7 +113,7 @@ func main() {
}
)

kingpin.FatalIfError(view.Register(nodesCordoned, nodesDrained, nodesDrainScheduled), "cannot create metrics")
kingpin.FatalIfError(view.Register(nodesCordoned, nodesUncordoned, nodesDrained, nodesDrainScheduled), "cannot create metrics")
p, err := prometheus.NewExporter(prometheus.Options{Namespace: kubernetes.Component})
kingpin.FatalIfError(err, "cannot export metrics")
view.RegisterExporter(p)
Expand Down Expand Up @@ -161,7 +168,8 @@ func main() {
),
kubernetes.NewEventRecorder(cs),
kubernetes.WithLogger(log),
kubernetes.WithDrainBuffer(*drainBuffer))
kubernetes.WithDrainBuffer(*drainBuffer),
kubernetes.WithConditionsFilter(*conditions))

if *dryRun {
h = cache.FilteringResourceEventHandler{
Expand All @@ -170,12 +178,11 @@ func main() {
&kubernetes.NoopCordonDrainer{},
kubernetes.NewEventRecorder(cs),
kubernetes.WithLogger(log),
kubernetes.WithDrainBuffer(*drainBuffer)),
kubernetes.WithDrainBuffer(*drainBuffer),
kubernetes.WithConditionsFilter(*conditions)),
}
}

cf := cache.FilteringResourceEventHandler{FilterFunc: kubernetes.NewNodeConditionFilter(*conditions), Handler: h}

if len(*nodeLabels) > 0 {
log.Debug("node labels", zap.Any("labels", nodeLabels))
if *nodeLabelsExpr != "" {
Expand All @@ -193,7 +200,7 @@ func main() {
log.Sugar().Fatalf("Failed to parse node label expression: %v", err)
}

nodeLabelFilter = cache.FilteringResourceEventHandler{FilterFunc: nodeLabelFilterFunc, Handler: cf}
nodeLabelFilter = cache.FilteringResourceEventHandler{FilterFunc: nodeLabelFilterFunc, Handler: h}

nodes := kubernetes.NewNodeWatch(cs, nodeLabelFilter)

Expand Down
36 changes: 33 additions & 3 deletions internal/kubernetes/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
DefaultSkipDrain = false
)

type nodeMutatorFn func(*core.Node)

type errTimeout struct{}

func (e errTimeout) Error() string {
Expand All @@ -63,7 +65,10 @@ func IsTimeout(err error) bool {
// A Cordoner cordons nodes.
type Cordoner interface {
// Cordon the supplied node. Marks it unschedulable for new pods.
Cordon(n *core.Node) error
Cordon(n *core.Node, mutators ...nodeMutatorFn) error

// Uncordon the supplied node. Marks it schedulable for new pods.
Uncordon(n *core.Node, mutators ...nodeMutatorFn) error
}

// A Drainer drains nodes.
Expand All @@ -83,7 +88,10 @@ type CordonDrainer interface {
type NoopCordonDrainer struct{}

// Cordon does nothing.
func (d *NoopCordonDrainer) Cordon(n *core.Node) error { return nil }
func (d *NoopCordonDrainer) Cordon(n *core.Node, mutators ...nodeMutatorFn) error { return nil }

// Uncordon does nothing.
func (d *NoopCordonDrainer) Uncordon(n *core.Node, mutators ...nodeMutatorFn) error { return nil }

// Drain does nothing.
func (d *NoopCordonDrainer) Drain(n *core.Node) error { return nil }
Expand Down Expand Up @@ -177,7 +185,7 @@ func (d *APICordonDrainer) deleteTimeout() time.Duration {
}

// Cordon the supplied node. Marks it unschedulable for new pods.
func (d *APICordonDrainer) Cordon(n *core.Node) error {
func (d *APICordonDrainer) Cordon(n *core.Node, mutators ...nodeMutatorFn) error {
fresh, err := d.c.CoreV1().Nodes().Get(n.GetName(), meta.GetOptions{})
if err != nil {
return errors.Wrapf(err, "cannot get node %s", n.GetName())
Expand All @@ -186,12 +194,34 @@ func (d *APICordonDrainer) Cordon(n *core.Node) error {
return nil
}
fresh.Spec.Unschedulable = true
for _, m := range mutators {
m(fresh)
}
if _, err := d.c.CoreV1().Nodes().Update(fresh); err != nil {
return errors.Wrapf(err, "cannot cordon node %s", fresh.GetName())
}
return nil
}

// Uncordon the supplied node. Marks it schedulable for new pods.
func (d *APICordonDrainer) Uncordon(n *core.Node, mutators ...nodeMutatorFn) error {
fresh, err := d.c.CoreV1().Nodes().Get(n.GetName(), meta.GetOptions{})
if err != nil {
return errors.Wrapf(err, "cannot get node %s", n.GetName())
}
if !fresh.Spec.Unschedulable {
return nil
}
fresh.Spec.Unschedulable = false
for _, m := range mutators {
m(fresh)
}
if _, err := d.c.CoreV1().Nodes().Update(fresh); err != nil {
return errors.Wrapf(err, "cannot uncordon node %s", fresh.GetName())
}
return nil
}

// MarkDrain set a condition on the node to mark that that drain is scheduled.
func (d *APICordonDrainer) MarkDrain(n *core.Node, when, finish time.Time, failed bool) error {
nodeName := n.Name
Expand Down
180 changes: 139 additions & 41 deletions internal/kubernetes/drainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and limitations under the License.
package kubernetes

import (
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -79,84 +80,181 @@ func TestCordon(t *testing.T) {
cases := []struct {
name string
node *core.Node
mutators []nodeMutatorFn
expected *core.Node
reactions []reactor
}{
{
name: "CordonSchedulableNode",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
reactions: []reactor{
reactor{
verb: "get",
resource: "nodes",
ret: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: false},
},
},
reactor{
verb: "update",
resource: "nodes",
ret: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
},
expected: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
},
{
name: "CordonUnschedulableNode",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
reactions: []reactor{
reactor{
verb: "get",
resource: "nodes",
ret: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
},
node: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
expected: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
},
{
name: "CordonNonExistentNode",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
reactions: []reactor{
reactor{verb: "get", resource: "nodes", err: errors.New("nope")},
{verb: "get", resource: "nodes", err: errors.New("nope")},
},
},
{
name: "ErrorCordoningSchedulableNode",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
reactions: []reactor{
reactor{
verb: "get",
resource: "nodes",
ret: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: false},
},
},
reactor{verb: "update", resource: "nodes", err: errors.New("nope")},
{verb: "update", resource: "nodes", err: errors.New("nope")},
},
},
{
name: "CordonSchedulableNodeWithMutator",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
mutators: []nodeMutatorFn{func(n *core.Node) {
n.Annotations = map[string]string{"foo": "1"}
}},
expected: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName, Annotations: map[string]string{"foo": "1"}},
Spec: core.NodeSpec{Unschedulable: true},
},
},
{
name: "CordonUnschedulableNodeWithMutator",
node: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
mutators: []nodeMutatorFn{func(n *core.Node) {
n.Annotations = map[string]string{"foo": "1"}
}},
expected: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c := &fake.Clientset{}
c := fake.NewSimpleClientset(tc.node)
for _, r := range tc.reactions {
c.AddReactor(r.verb, r.resource, r.Fn())
c.PrependReactor(r.verb, r.resource, r.Fn())
}

d := NewAPICordonDrainer(c)
if err := d.Cordon(tc.node); err != nil {
if err := d.Cordon(tc.node, tc.mutators...); err != nil {
for _, r := range tc.reactions {
if errors.Cause(err) == r.err {
return
}
}
t.Errorf("d.Cordon(%v): %v", tc.node.Name, err)
}
{
n, err := c.CoreV1().Nodes().Get(tc.node.GetName(), meta.GetOptions{})
if err != nil {
t.Errorf("node.Get(%v): %v", tc.node.Name, err)
}
if !reflect.DeepEqual(tc.expected, n) {
t.Errorf("node.Get(%v): want %#v, got %#v", tc.node.Name, tc.expected, n)
}
}
})
}
}

func TestUncordon(t *testing.T) {
cases := []struct {
name string
node *core.Node
mutators []nodeMutatorFn
expected *core.Node
reactions []reactor
}{
{
name: "UncordonSchedulableNode",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
},
{
name: "UncordonUnschedulableNode",
node: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
},
{
name: "UncordonNonExistentNode",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
reactions: []reactor{
{verb: "get", resource: "nodes", err: errors.New("nope")},
},
},
{
name: "ErrorUncordoningUnschedulableNode",
node: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
reactions: []reactor{
{verb: "update", resource: "nodes", err: errors.New("nope")},
},
},
{
name: "UncordonSchedulableNodeWithMutator",
node: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
mutators: []nodeMutatorFn{func(n *core.Node) {
n.Annotations = map[string]string{"foo": "1"}
}},
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName}},
},
{
name: "UncordonUnschedulableNodeWithMutator",
node: &core.Node{
ObjectMeta: meta.ObjectMeta{Name: nodeName},
Spec: core.NodeSpec{Unschedulable: true},
},
mutators: []nodeMutatorFn{func(n *core.Node) {
n.Annotations = map[string]string{"foo": "1"}
}},
expected: &core.Node{ObjectMeta: meta.ObjectMeta{Name: nodeName, Annotations: map[string]string{"foo": "1"}}},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
c := fake.NewSimpleClientset(tc.node)
for _, r := range tc.reactions {
c.PrependReactor(r.verb, r.resource, r.Fn())
}
d := NewAPICordonDrainer(c)
if err := d.Uncordon(tc.node, tc.mutators...); err != nil {
for _, r := range tc.reactions {
if errors.Cause(err) == r.err {
return
}
}
t.Errorf("d.Uncordon(%v): %v", tc.node.Name, err)
}
{
n, err := c.CoreV1().Nodes().Get(tc.node.GetName(), meta.GetOptions{})
if err != nil {
t.Errorf("node.Get(%v): %v", tc.node.Name, err)
}
if !reflect.DeepEqual(tc.expected, n) {
t.Errorf("node.Get(%v): want %#v, got %#v", tc.node.Name, tc.expected, n)
}
}
})
}
}
Expand Down
Loading

0 comments on commit 0f4b2e1

Please sign in to comment.