Skip to content

Commit

Permalink
Merge pull request #422 from mmerrill3/master
Browse files Browse the repository at this point in the history
AWS DescribeAutoScalingGroups requests too aggressive - API limits reached
  • Loading branch information
mwielgus authored Nov 1, 2017
2 parents 5d599ca + 3d043f7 commit d25acce
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 32 deletions.
11 changes: 0 additions & 11 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package aws
import (
"fmt"
"sync"
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
)

type autoScalingGroups struct {
Expand All @@ -40,15 +38,6 @@ func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups {
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}

go wait.Forever(func() {
registry.cacheMutex.Lock()
defer registry.cacheMutex.Unlock()
if err := registry.regenerateCache(); err != nil {
glog.Errorf("Error while regenerating Asg cache: %v", err)
}
}, time.Hour)

return registry
}

Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func buildStaticallyDiscoveringProvider(awsManager *AwsManager, specs []string,
return aws, nil
}

// Cleanup stops the go routine that is handling the current view of the ASGs in the form of a cache
func (aws *awsCloudProvider) Cleanup() error {
aws.awsManager.Cleanup()
return nil
}

// addNodeGroup adds node group defined in string spec. Format:
// minNodes:maxNodes:asgName
func (aws *awsCloudProvider) addNodeGroup(spec string) error {
Expand Down
10 changes: 9 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ var testAwsManager = &AwsManager{
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
},
service: testService,
service: testService,
interrupt: make(chan struct{}),
}

func newTestAwsManagerWithService(service autoScaling) *AwsManager {
Expand All @@ -86,6 +87,7 @@ func newTestAwsManagerWithService(service autoScaling) *AwsManager {
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
service: wrapper,
},
interrupt: make(chan struct{}),
}
}

Expand Down Expand Up @@ -371,3 +373,9 @@ func TestBuildAsg(t *testing.T) {
assert.Equal(t, 222, asg.MaxSize())
assert.Equal(t, "test-name", asg.Name)
}

func TestCleanup(t *testing.T) {
provider := testProvider(t, testAwsManager)
err := provider.Cleanup()
assert.NoError(t, err)
}
40 changes: 32 additions & 8 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
provider_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
Expand All @@ -51,8 +52,9 @@ type asgInformation struct {

// AwsManager is handles aws communication and data caching.
type AwsManager struct {
service autoScalingWrapper
asgs *autoScalingGroups
service autoScalingWrapper
asgs *autoScalingGroups
interrupt chan struct{}
}

type asgTemplate struct {
Expand All @@ -62,8 +64,8 @@ type asgTemplate struct {
Tags []*autoscaling.TagDescription
}

// CreateAwsManager constructs awsManager object.
func CreateAwsManager(configReader io.Reader) (*AwsManager, error) {
// createAwsManagerInternal allows for a customer autoScalingWrapper to be passed in by tests
func createAWSManagerInternal(configReader io.Reader, service *autoScalingWrapper) (*AwsManager, error) {
if configReader != nil {
var cfg provider_aws.CloudConfig
if err := gcfg.ReadInto(&cfg, configReader); err != nil {
Expand All @@ -72,17 +74,34 @@ func CreateAwsManager(configReader io.Reader) (*AwsManager, error) {
}
}

service := autoScalingWrapper{
autoscaling.New(session.New()),
if service == nil {
service = &autoScalingWrapper{
autoscaling.New(session.New()),
}
}

manager := &AwsManager{
asgs: newAutoScalingGroups(service),
service: service,
asgs: newAutoScalingGroups(*service),
service: *service,
interrupt: make(chan struct{}),
}

go wait.Until(func() {
manager.asgs.cacheMutex.Lock()
defer manager.asgs.cacheMutex.Unlock()
if err := manager.asgs.regenerateCache(); err != nil {
glog.Errorf("Error while regenerating Asg cache: %v", err)
}
}, time.Hour, manager.interrupt)

return manager, nil
}

// CreateAwsManager constructs awsManager object.
func CreateAwsManager(configReader io.Reader) (*AwsManager, error) {
return createAWSManagerInternal(configReader, nil)
}

// RegisterAsg registers asg in Aws Manager.
func (m *AwsManager) RegisterAsg(asg *Asg) {
m.asgs.Register(asg)
Expand All @@ -93,6 +112,11 @@ func (m *AwsManager) GetAsgForInstance(instance *AwsRef) (*Asg, error) {
return m.asgs.FindForInstance(instance)
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *AwsManager) Cleanup() {
close(m.interrupt)
}

func (m *AwsManager) getAutoscalingGroupsByTags(keys []string) ([]*autoscaling.Group, error) {
return m.service.getAutoscalingGroupsByTags(keys)
}
Expand Down
9 changes: 9 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
"runtime"
)

func TestBuildGenericLabels(t *testing.T) {
Expand Down Expand Up @@ -93,3 +94,11 @@ func makeTaintSet(taints []apiv1.Taint) map[apiv1.Taint]bool {
}
return set
}

func testCreateAWSManager(t *testing.T) {
manager, awsError := createAWSManagerInternal(nil, &testService)
assert.Nil(t, awsError, "Expected nil from the error when creating AWS Manager")
currentNumberRoutines := runtime.NumGoroutine()
manager.Cleanup()
assert.True(t, currentNumberRoutines-1 == runtime.NumGoroutine(), "current number of go routines should be one less since we called close")
}
3 changes: 3 additions & 0 deletions cluster-autoscaler/cloudprovider/cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type CloudProvider interface {
// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
GetResourceLimiter() (*ResourceLimiter, error)

// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
Cleanup() error

// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
Refresh() error
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func BuildGceCloudProvider(gceManager GceManager, specs []string, resourceLimite
return gce, nil
}

// Cleanup cleans up all resources before the cloud provider is removed
func (gce *GceCloudProvider) Cleanup() error {
gce.gceManager.Cleanup()
return nil
}

// addNodeGroup adds node group defined in string spec. Format:
// minNodes:maxNodes:migUrl
func (gce *GceCloudProvider) addNodeGroup(spec string) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (m *gceManagerMock) Refresh() error {
return args.Error(0)
}

func (m *gceManagerMock) Cleanup() error {
args := m.Called()
return args.Error(0)
}

func (m *gceManagerMock) getMigs() []*migInformation {
args := m.Called()
return args.Get(0).([]*migInformation)
Expand Down
30 changes: 19 additions & 11 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type GceManager interface {
Refresh() error
// GetResourceLimiter returns resource limiter.
GetResourceLimiter() (*cloudprovider.ResourceLimiter, error)
// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
Cleanup() error
getMigs() []*migInformation
createNodePool(mig *Mig) error
deleteNodePool(toBeRemoved *Mig) error
Expand All @@ -127,16 +129,15 @@ type gceManagerImpl struct {
cacheMutex sync.Mutex
migsMutex sync.Mutex

location string
projectId string
clusterName string
mode GcpCloudProviderMode
templates *templateBuilder
isRegional bool

location string
projectId string
clusterName string
mode GcpCloudProviderMode
templates *templateBuilder
interrupt chan struct{}
isRegional bool
resourceLimiter *cloudprovider.ResourceLimiter

lastRefresh time.Time
lastRefresh time.Time
}

// CreateGceManager constructs gceManager object.
Expand Down Expand Up @@ -208,6 +209,7 @@ func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, cluster
projectId: projectId,
service: gceService,
},
interrupt: make(chan struct{}),
}

if mode == ModeGKE {
Expand Down Expand Up @@ -260,17 +262,23 @@ func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, cluster

manager.lastRefresh = time.Now()

go wait.Forever(func() {
go wait.Until(func() {
manager.cacheMutex.Lock()
defer manager.cacheMutex.Unlock()
if err := manager.regenerateCache(); err != nil {
glog.Errorf("Error while regenerating Mig cache: %v", err)
}
}, time.Hour)
}, time.Hour, manager.interrupt)

return manager, nil
}

// Cleanup closes the channel to signal the go routine to stop that is handling the cache
func (m *gceManagerImpl) Cleanup() error {
close(m.interrupt)
return nil
}

func (m *gceManagerImpl) assertGKE() {
if m.mode != ModeGKE {
glog.Fatalf("This should run only in GKE mode")
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/kubemark/kubemark_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (kubemark *KubemarkCloudProvider) Refresh() error {
return nil
}

// Cleanup cleans up all resources before the cloud provider is removed
func (kubemark *KubemarkCloudProvider) Cleanup() error {
return nil
}

// NodeGroup implements NodeGroup interfrace.
type NodeGroup struct {
Name string
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/kubemark/kubemark_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ func (kubemark *KubemarkCloudProvider) GetResourceLimiter() (*cloudprovider.Reso
func (kubemark *KubemarkCloudProvider) Refresh() error {
return cloudprovider.ErrNotImplemented
}

// Cleanup cleans up all resources before the cloud provider is removed
func (kubemark *KubemarkCloudProvider) Cleanup() error {
return cloudprovider.ErrNotImplemented
}
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ func (tcp *TestCloudProvider) SetResourceLimiter(resourceLimiter *cloudprovider.
tcp.resourceLimiter = resourceLimiter
}

// Cleanup this is a function to close resources associated with the cloud provider
func (tcp *TestCloudProvider) Cleanup() error {
return nil
}

// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (tcp *TestCloudProvider) Refresh() error {
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/polling_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ func (a *PollingAutoscaler) Poll() error {
if !reflect.DeepEqual(prevNodeGroupIds, currentNodeGroupIds) {
glog.V(4).Infof("Detected change(s) in node group definitions. Recreating autoscaler...")

// See https://github.com/kubernetes/autoscaler/issues/252, we need to close any stray resources
a.autoscaler.CloudProvider().Cleanup()

// For safety, any config change should stop and recreate all the stuff running in CA hence recreating all the Autoscaler instance here
// See https://github.com/kubernetes/contrib/pull/2226#discussion_r94126064
a.autoscaler = currentAutoscaler
} else {
// See https://github.com/kubernetes/autoscaler/issues/252, we need to close any stray resources
currentAutoscaler.CloudProvider().Cleanup()
}
glog.V(4).Infof("Poll finished")
return nil
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/polling_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestRunOnce(t *testing.T) {
newCloudProvider.AddNode("ng2", n2)

initialAutoscaler := &AutoscalerMock{}
initialAutoscaler.On("CloudProvider").Return(initialCloudProvider).Once()
initialAutoscaler.On("CloudProvider").Return(initialCloudProvider).Twice()

newAutoscaler := &AutoscalerMock{}
newAutoscaler.On("RunOnce", currentTime).Once()
Expand Down

0 comments on commit d25acce

Please sign in to comment.