Skip to content

Commit

Permalink
Merge pull request #2197 from weaveworks/mike/ecs/scale-controls
Browse files Browse the repository at this point in the history
Add ECS Service scale up/down controls
  • Loading branch information
ekimekim authored Feb 21, 2017
2 parents 0423e49 + a49f1c9 commit ed19e7a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 17 deletions.
27 changes: 27 additions & 0 deletions probe/awsecs/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package awsecs

import (
"fmt"
"strings"
"sync"
"time"
Expand All @@ -20,6 +21,8 @@ const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it w
type EcsClient interface {
// Returns a EcsInfo struct containing data needed for a report.
GetInfo([]string) EcsInfo
// Scales a service up or down by amount
ScaleService(string, int) error
}

// actual implementation
Expand Down Expand Up @@ -379,3 +382,27 @@ func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo {

return info
}

// Implements EcsClient.ScaleService
func (c ecsClientImpl) ScaleService(serviceName string, amount int) error {
// Note this is inherently racey, due to needing to get, modify, then update the DesiredCount.

// refresh service in cache
c.describeServices([]string{serviceName})
// now check the cache to see if it worked
service, ok := c.getCachedService(serviceName)
if !ok {
return fmt.Errorf("Service %s not found", serviceName)
}

newCount := service.DesiredCount + int64(amount)
if newCount < 1 {
return fmt.Errorf("Cannot reduce count below one")
}
_, err := c.client.UpdateService(&ecs.UpdateServiceInput{
Cluster: &c.cluster,
Service: &serviceName,
DesiredCount: &newCount,
})
return err
}
99 changes: 85 additions & 14 deletions probe/awsecs/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand All @@ -16,6 +18,8 @@ const (
TaskFamily = "ecs_task_family"
ServiceDesiredCount = "ecs_service_desired_count"
ServiceRunningCount = "ecs_service_running_count"
ScaleUp = "ecs_scale_up"
ScaleDown = "ecs_scale_down"
)

var (
Expand Down Expand Up @@ -83,15 +87,40 @@ type Reporter struct {
ClientsByCluster map[string]EcsClient // Exported for test
cacheSize int
cacheExpiry time.Duration
handlerRegistry *controls.HandlerRegistry
probeID string
}

// Make creates a new Reporter
func Make(cacheSize int, cacheExpiry time.Duration) Reporter {
return Reporter{
func Make(cacheSize int, cacheExpiry time.Duration, handlerRegistry *controls.HandlerRegistry, probeID string) Reporter {
r := Reporter{
ClientsByCluster: map[string]EcsClient{},
cacheSize: cacheSize,
cacheExpiry: cacheExpiry,
handlerRegistry: handlerRegistry,
probeID: probeID,
}

handlerRegistry.Batch(nil, map[string]xfer.ControlHandlerFunc{
ScaleUp: r.controlScaleUp,
ScaleDown: r.controlScaleDown,
})

return r
}

func (r Reporter) getClient(cluster string) (EcsClient, error) {
client, ok := r.ClientsByCluster[cluster]
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster, r.cacheSize, r.cacheExpiry)
if err != nil {
return nil, err
}
r.ClientsByCluster[cluster] = client
}
return client, nil
}

// Tag needed for Tagger
Expand All @@ -103,15 +132,9 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
for cluster, taskMap := range clusterMap {
log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap))

client, ok := r.ClientsByCluster[cluster]
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster, r.cacheSize, r.cacheExpiry)
if err != nil {
return rpt, err
}
r.ClientsByCluster[cluster] = client
client, err := r.getClient(cluster)
if err != nil {
return rpt, nil
}

taskArns := make([]string, 0, len(taskMap))
Expand All @@ -126,9 +149,15 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
for serviceName, service := range ecsInfo.Services {
serviceID := report.MakeECSServiceNodeID(cluster, serviceName)
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
report.ControlProbeID: r.probeID,
}).WithLatestControls(map[string]report.NodeControlData{
ScaleUp: {Dead: false},
// We've decided for now to disable ScaleDown when only 1 task is desired,
// since scaling down to 0 would cause the service to disappear (#2085)
ScaleDown: {Dead: service.DesiredCount <= 1},
}))
}
log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services))
Expand Down Expand Up @@ -176,6 +205,20 @@ func (Reporter) Report() (report.Report, error) {
taskTopology := report.MakeTopology().WithMetadataTemplates(taskMetadata)
result.ECSTask = result.ECSTask.Merge(taskTopology)
serviceTopology := report.MakeTopology().WithMetadataTemplates(serviceMetadata)
serviceTopology.Controls.AddControls([]report.Control{
{
ID: ScaleDown,
Human: "Scale Down",
Icon: "fa-minus",
Rank: 0,
},
{
ID: ScaleUp,
Human: "Scale Up",
Icon: "fa-plus",
Rank: 1,
},
})
result.ECSService = result.ECSService.Merge(serviceTopology)
return result, nil
}
Expand All @@ -184,3 +227,31 @@ func (Reporter) Report() (report.Report, error) {
func (r Reporter) Name() string {
return "awsecs"
}

// Stop unregisters controls.
func (r *Reporter) Stop() {
r.handlerRegistry.Batch([]string{
ScaleUp,
ScaleDown,
}, nil)
}

func (r *Reporter) controlScaleUp(req xfer.Request) xfer.Response {
return xfer.ResponseError(r.controlScale(req, 1))
}

func (r *Reporter) controlScaleDown(req xfer.Request) xfer.Response {
return xfer.ResponseError(r.controlScale(req, -1))
}

func (r *Reporter) controlScale(req xfer.Request, amount int) error {
cluster, serviceName, ok := report.ParseECSServiceNodeID(req.NodeID)
if !ok {
return fmt.Errorf("Bad node ID")
}
client, err := r.getClient(cluster)
if err != nil {
return err
}
return client.ScaleService(serviceName, amount)
}
11 changes: 9 additions & 2 deletions probe/awsecs/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/weaveworks/scope/probe/awsecs"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand Down Expand Up @@ -36,7 +37,8 @@ func getTestContainerNode() report.Node {
}

func TestGetLabelInfo(t *testing.T) {
r := awsecs.Make(1e6, time.Hour)
hr := controls.NewDefaultHandlerRegistry()
r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id")
rpt, err := r.Report()
if err != nil {
t.Fatalf("Error making report: %v", err)
Expand Down Expand Up @@ -84,8 +86,13 @@ func (c mockEcsClient) GetInfo(taskARNs []string) awsecs.EcsInfo {
return c.info
}

func (c mockEcsClient) ScaleService(serviceName string, amount int) error {
return nil
}

func TestTagReport(t *testing.T) {
r := awsecs.Make(1e6, time.Hour)
hr := controls.NewDefaultHandlerRegistry()
r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id")

r.ClientsByCluster[testCluster] = newMockEcsClient(
t,
Expand Down
3 changes: 2 additions & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}

if flags.ecsEnabled {
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry)
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, handlerRegistry, probeID)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
}
Expand Down

0 comments on commit ed19e7a

Please sign in to comment.