Skip to content

Commit

Permalink
awsecs: Add dummy scale up and scale down controls for services
Browse files Browse the repository at this point in the history
These controls do nothing for now, this was just to get the control buttons working
  • Loading branch information
ekimekim committed Feb 7, 2017
1 parent f8cf32c commit 7d58e6a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 18 deletions.
9 changes: 9 additions & 0 deletions probe/awsecs/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package awsecs

import (
"fmt"
"strings"
"sync"
"time"
Expand All @@ -20,6 +21,8 @@ const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it w
type EcsClient interface {
// Returns a EcsInfo struct containing data needed for a report.
GetInfo([]string) EcsInfo
// Scales a service up or down by amount
ScaleService(string, int) error
}

// actual implementation
Expand Down Expand Up @@ -379,3 +382,9 @@ func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo {

return info
}

// Implements EcsClient.ScaleService
func (c ecsClientImpl) ScaleService(serviceName string, amount int) error {
// TODO placeholder
return fmt.Errorf("ScaleService stub: %s, %d", serviceName, amount)
}
96 changes: 81 additions & 15 deletions probe/awsecs/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand All @@ -16,6 +18,8 @@ const (
TaskFamily = "ecs_task_family"
ServiceDesiredCount = "ecs_service_desired_count"
ServiceRunningCount = "ecs_service_running_count"
ScaleUp = "ecs_scale_up"
ScaleDown = "ecs_scale_down"
)

var (
Expand Down Expand Up @@ -83,15 +87,40 @@ type Reporter struct {
ClientsByCluster map[string]EcsClient // Exported for test
cacheSize int
cacheExpiry time.Duration
handlerRegistry *controls.HandlerRegistry
probeID string
}

// Make creates a new Reporter
func Make(cacheSize int, cacheExpiry time.Duration) Reporter {
return Reporter{
func Make(cacheSize int, cacheExpiry time.Duration, handlerRegistry *controls.HandlerRegistry, probeID string) Reporter {
r := Reporter{
ClientsByCluster: map[string]EcsClient{},
cacheSize: cacheSize,
cacheExpiry: cacheExpiry,
handlerRegistry: handlerRegistry,
probeID: probeID,
}

handlerRegistry.Batch(nil, map[string]xfer.ControlHandlerFunc{
ScaleUp: r.controlScaleUp,
ScaleDown: r.controlScaleDown,
})

return r
}

func (r Reporter) getClient(cluster string) (EcsClient, error) {
client, ok := r.ClientsByCluster[cluster]
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster, r.cacheSize, r.cacheExpiry)
if err != nil {
return nil, err
}
r.ClientsByCluster[cluster] = client
}
return client, nil
}

// Tag needed for Tagger
Expand All @@ -103,15 +132,9 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
for cluster, taskMap := range clusterMap {
log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap))

client, ok := r.ClientsByCluster[cluster]
if !ok {
log.Debugf("Creating new ECS client")
var err error
client, err = newClient(cluster, r.cacheSize, r.cacheExpiry)
if err != nil {
return rpt, err
}
r.ClientsByCluster[cluster] = client
client, err := r.getClient(cluster)
if err != nil {
return rpt, nil
}

taskArns := make([]string, 0, len(taskMap))
Expand All @@ -126,10 +149,11 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) {
for serviceName, service := range ecsInfo.Services {
serviceID := report.MakeECSServiceNodeID(cluster, serviceName)
rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
}))
Cluster: cluster,
ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount),
ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount),
report.ControlProbeID: r.probeID,
}).WithLatestActiveControls(ScaleUp, ScaleDown))
}
log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services))

Expand Down Expand Up @@ -176,6 +200,20 @@ func (Reporter) Report() (report.Report, error) {
taskTopology := report.MakeTopology().WithMetadataTemplates(taskMetadata)
result.ECSTask = result.ECSTask.Merge(taskTopology)
serviceTopology := report.MakeTopology().WithMetadataTemplates(serviceMetadata)
serviceTopology.Controls.AddControls([]report.Control{
{
ID: ScaleDown,
Human: "Scale Down",
Icon: "fa-minus",
Rank: 0,
},
{
ID: ScaleUp,
Human: "Scale Up",
Icon: "fa-plus",
Rank: 1,
},
})
result.ECSService = result.ECSService.Merge(serviceTopology)
return result, nil
}
Expand All @@ -184,3 +222,31 @@ func (Reporter) Report() (report.Report, error) {
func (r Reporter) Name() string {
return "awsecs"
}

// Stop unregisters controls.
func (r *Reporter) Stop() {
r.handlerRegistry.Batch([]string{
ScaleUp,
ScaleDown,
}, nil)
}

func (r *Reporter) controlScaleUp(req xfer.Request) xfer.Response {
return xfer.ResponseError(r.controlScale(req, 1))
}

func (r *Reporter) controlScaleDown(req xfer.Request) xfer.Response {
return xfer.ResponseError(r.controlScale(req, -1))
}

func (r *Reporter) controlScale(req xfer.Request, amount int) error {
cluster, serviceName, ok := report.ParseECSServiceNodeID(req.NodeID)
if !ok {
return fmt.Errorf("Bad node ID")
}
client, err := r.getClient(cluster)
if err != nil {
return err
}
return client.ScaleService(serviceName, amount)
}
11 changes: 9 additions & 2 deletions probe/awsecs/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/weaveworks/scope/probe/awsecs"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/report"
)
Expand Down Expand Up @@ -36,7 +37,8 @@ func getTestContainerNode() report.Node {
}

func TestGetLabelInfo(t *testing.T) {
r := awsecs.Make(1e6, time.Hour)
hr := controls.NewDefaultHandlerRegistry()
r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id")
rpt, err := r.Report()
if err != nil {
t.Fatalf("Error making report: %v", err)
Expand Down Expand Up @@ -84,8 +86,13 @@ func (c mockEcsClient) GetInfo(taskARNs []string) awsecs.EcsInfo {
return c.info
}

func (c mockEcsClient) ScaleService(serviceName string, amount int) error {
return nil
}

func TestTagReport(t *testing.T) {
r := awsecs.Make(1e6, time.Hour)
hr := controls.NewDefaultHandlerRegistry()
r := awsecs.Make(1e6, time.Hour, hr, "test-probe-id")

r.ClientsByCluster[testCluster] = newMockEcsClient(
t,
Expand Down
3 changes: 2 additions & 1 deletion prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func probeMain(flags probeFlags, targets []appclient.Target) {
}

if flags.ecsEnabled {
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry)
reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry, handlerRegistry, probeID)
defer reporter.Stop()
p.AddReporter(reporter)
p.AddTagger(reporter)
}
Expand Down

0 comments on commit 7d58e6a

Please sign in to comment.