Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling controls in probes #1627

Merged
merged 6 commits into from
Jul 7, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
rp := appclient.NewReportPublisher(client)
rp := appclient.NewReportPublisher(client, false)

rand.Seed(time.Now().UnixNano())
for range time.Tick(*publishInterval) {
Expand Down
2 changes: 1 addition & 1 deletion experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
log.Fatal(err)
}

rp := appclient.NewReportPublisher(client)
rp := appclient.NewReportPublisher(client, false)
for range time.Tick(*publishInterval) {
rp.Publish(fixedReport)
}
Expand Down
4 changes: 2 additions & 2 deletions probe/appclient/app_client_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestAppClientPublish(t *testing.T) {
defer p.Stop()

// First few reports might be dropped as the client is spinning up.
rp := NewReportPublisher(p)
rp := NewReportPublisher(p, false)
for i := 0; i < 10; i++ {
if err := rp.Publish(rpt); err != nil {
t.Error(err)
Expand Down Expand Up @@ -208,7 +208,7 @@ func TestStop(t *testing.T) {
t.Fatal(err)
}

rp := NewReportPublisher(p)
rp := NewReportPublisher(p, false)

// Make sure the app received our report and is stuck
for done := false; !done; {
Expand Down
30 changes: 16 additions & 14 deletions probe/appclient/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ type ClientFactory func(string, string) (AppClient, error)
type multiClient struct {
clientFactory ClientFactory

mtx sync.Mutex
sema semaphore
clients map[string]AppClient // holds map from app id -> client
ids map[string]report.IDList // holds map from hostname -> app ids
quit chan struct{}
mtx sync.Mutex
sema semaphore
clients map[string]AppClient // holds map from app id -> client
ids map[string]report.IDList // holds map from hostname -> app ids
quit chan struct{}
noControls bool
}

type clientTuple struct {
Expand All @@ -53,14 +54,15 @@ type MultiAppClient interface {
}

// NewMultiAppClient creates a new MultiAppClient.
func NewMultiAppClient(clientFactory ClientFactory) MultiAppClient {
func NewMultiAppClient(clientFactory ClientFactory, noControls bool) MultiAppClient {
return &multiClient{
clientFactory: clientFactory,

sema: newSemaphore(maxConcurrentGET),
clients: map[string]AppClient{},
ids: map[string]report.IDList{},
quit: make(chan struct{}),
sema: newSemaphore(maxConcurrentGET),
clients: map[string]AppClient{},
ids: map[string]report.IDList{},
quit: make(chan struct{}),
noControls: noControls,
}
}

Expand Down Expand Up @@ -100,11 +102,11 @@ func (c *multiClient) Set(hostname string, endpoints []string) {
hostIDs := report.MakeIDList()
for tuple := range clients {
hostIDs = hostIDs.Add(tuple.ID)

_, ok := c.clients[tuple.ID]
if !ok {
if _, ok := c.clients[tuple.ID]; !ok {

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.

c.clients[tuple.ID] = tuple.AppClient
tuple.AppClient.ControlConnection()
if !c.noControls {
tuple.AppClient.ControlConnection()
}
}
}
c.ids[hostname] = hostIDs
Expand Down
4 changes: 2 additions & 2 deletions probe/appclient/multi_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestMultiClient(t *testing.T) {
}
)

mp := appclient.NewMultiAppClient(factory)
mp := appclient.NewMultiAppClient(factory, false)
defer mp.Stop()

// Add two hostnames with overlapping apps, check we don't add the same app twice
Expand All @@ -89,7 +89,7 @@ func TestMultiClient(t *testing.T) {
}

func TestMultiClientPublish(t *testing.T) {
mp := appclient.NewMultiAppClient(factory)
mp := appclient.NewMultiAppClient(factory, false)
defer mp.Stop()

sum := func() int { return a1.publish + a2.publish + b2.publish + b3.publish }
Expand Down
13 changes: 10 additions & 3 deletions probe/appclient/report_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@ import (
// A ReportPublisher uses a buffer pool to serialise reports, which it
// then passes to a publisher
type ReportPublisher struct {
publisher Publisher
publisher Publisher
noControls bool
}

// NewReportPublisher creates a new report publisher
func NewReportPublisher(publisher Publisher) *ReportPublisher {
func NewReportPublisher(publisher Publisher, noControls bool) *ReportPublisher {
return &ReportPublisher{
publisher: publisher,
publisher: publisher,
noControls: noControls,
}
}

// Publish serialises and compresses a report, then passes it to a publisher
func (p *ReportPublisher) Publish(r report.Report) error {
if p.noControls {
r.WalkTopologies(func(t *report.Topology) {
t.Controls = report.Controls{}
})
}
buf := &bytes.Buffer{}
r.WriteBinary(buf)
return p.publisher.Publish(buf)
Expand Down
3 changes: 2 additions & 1 deletion probe/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ type Ticker interface {
func New(
spyInterval, publishInterval time.Duration,
publisher appclient.Publisher,
noControls bool,
) *Probe {
result := &Probe{
spyInterval: spyInterval,
publishInterval: publishInterval,
publisher: appclient.NewReportPublisher(publisher),
publisher: appclient.NewReportPublisher(publisher, noControls),
quit: make(chan struct{}),
spiedReports: make(chan report.Report, reportBufferSize),
shortcutReports: make(chan report.Report, reportBufferSize),
Expand Down
4 changes: 2 additions & 2 deletions probe/probe_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestApply(t *testing.T) {
endpointNode = report.MakeNodeWith(endpointNodeID, map[string]string{"5": "6"})
)

p := New(0, 0, nil)
p := New(0, 0, nil, false)
p.AddTagger(NewTopologyTagger())

r := report.MakeReport()
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestProbe(t *testing.T) {

pub := mockPublisher{make(chan report.Report, 10)}

p := New(10*time.Millisecond, 100*time.Millisecond, pub)
p := New(10*time.Millisecond, 100*time.Millisecond, pub, false)
p.AddReporter(mockReporter{want})
p.Start()
defer p.Stop()
Expand Down
2 changes: 2 additions & 0 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type probeFlags struct {
logLevel string
resolver string
noApp bool
noControls bool

useConntrack bool // Use conntrack for endpoint topo
spyProcs bool // Associate endpoints with processes (must be root)
Expand Down Expand Up @@ -145,6 +146,7 @@ func main() {
flag.DurationVar(&flags.probe.publishInterval, "probe.publish.interval", 3*time.Second, "publish (output) interval")
flag.DurationVar(&flags.probe.spyInterval, "probe.spy.interval", time.Second, "spy (scan) interval")
flag.StringVar(&flags.probe.pluginsRoot, "probe.plugins.root", "/var/run/scope/plugins", "Root directory to search for plugins")
flag.BoolVar(&flags.probe.noControls, "probe.no-controls", false, "Disable controls (read-only mode)")

This comment was marked as abuse.

This comment was marked as abuse.

This comment was marked as abuse.


flag.BoolVar(&flags.probe.insecure, "probe.insecure", false, "(SSL) explicitly allow \"insecure\" SSL connections and transfers")
flag.StringVar(&flags.probe.resolver, "probe.resolver", "", "IP address & port of resolver to use. Default is to use system resolver.")
Expand Down
7 changes: 4 additions & 3 deletions prog/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ func probeMain(flags probeFlags) {
ProbeID: probeID,
Insecure: flags.insecure,
}
clients := appclient.NewMultiAppClient(func(hostname, endpoint string) (appclient.AppClient, error) {
clientFactory := func(hostname, endpoint string) (appclient.AppClient, error) {
return appclient.NewAppClient(
probeConfig, hostname, endpoint,
xfer.ControlHandlerFunc(controls.HandleControlRequest),
)
})
}
clients := appclient.NewMultiAppClient(clientFactory, flags.noControls)
defer clients.Stop()

dnsLookupFn := net.LookupIP
Expand All @@ -128,7 +129,7 @@ func probeMain(flags probeFlags) {
resolver := appclient.NewResolver(targets, dnsLookupFn, clients.Set)
defer resolver.Stop()

p := probe.New(flags.spyInterval, flags.publishInterval, clients)
p := probe.New(flags.spyInterval, flags.publishInterval, clients, flags.noControls)

hostReporter := host.NewReporter(hostID, hostName, probeID, version, clients)
defer hostReporter.Stop()
Expand Down
32 changes: 20 additions & 12 deletions report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,26 @@ func (r Report) Merge(other Report) Report {

// Topologies returns a slice of Topologies in this report
func (r Report) Topologies() []Topology {
return []Topology{
r.Endpoint,
r.Process,
r.Container,
r.ContainerImage,
r.Pod,
r.Service,
r.Deployment,
r.ReplicaSet,
r.Host,
r.Overlay,
}
result := []Topology{}
r.WalkTopologies(func(t *Topology) {
result = append(result, *t)
})
return result
}

// WalkTopologies iterates through the Topologies of the report,
// potentially modifying them
func (r *Report) WalkTopologies(f func(*Topology)) {
f(&r.Endpoint)
f(&r.Process)
f(&r.Container)
f(&r.ContainerImage)
f(&r.Pod)
f(&r.Service)
f(&r.Deployment)
f(&r.ReplicaSet)
f(&r.Host)
f(&r.Overlay)
}

// Topology gets a topology by name
Expand Down