Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split and move xfer package. #794

Merged
merged 1 commit into from
Jan 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/controls.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/gorilla/mux"

"github.com/weaveworks/scope/xfer"
"github.com/weaveworks/scope/common/xfer"
)

// RegisterControlRoutes registers the various control routes with a http mux.
Expand Down
7 changes: 4 additions & 3 deletions app/controls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/gorilla/mux"

"github.com/weaveworks/scope/app"
"github.com/weaveworks/scope/xfer"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
)

func TestControl(t *testing.T) {
Expand All @@ -26,7 +27,7 @@ func TestControl(t *testing.T) {
t.Fatal(err)
}

probeConfig := xfer.ProbeConfig{
probeConfig := appclient.ProbeConfig{
ProbeID: "foo",
}
controlHandler := xfer.ControlHandlerFunc(func(req xfer.Request) xfer.Response {
Expand All @@ -42,7 +43,7 @@ func TestControl(t *testing.T) {
Value: "foo",
}
})
client, err := xfer.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, controlHandler)
client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, controlHandler)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/gorilla/mux"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/xfer"
"github.com/weaveworks/scope/common/xfer"
)

const (
Expand Down
9 changes: 5 additions & 4 deletions app/pipes_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
"github.com/gorilla/websocket"

"github.com/weaveworks/scope/common/mtime"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
"github.com/weaveworks/scope/probe/controls"
"github.com/weaveworks/scope/test"
"github.com/weaveworks/scope/xfer"
)

func TestPipeTimeout(t *testing.T) {
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestPipeTimeout(t *testing.T) {
}

type adapter struct {
c xfer.AppClient
c appclient.AppClient
}

func (a adapter) PipeConnection(_, pipeID string, pipe xfer.Pipe) error {
Expand All @@ -75,10 +76,10 @@ func TestPipeClose(t *testing.T) {
t.Fatal(err)
}

probeConfig := xfer.ProbeConfig{
probeConfig := appclient.ProbeConfig{
ProbeID: "foo",
}
client, err := xfer.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, nil)
client, err := appclient.NewAppClient(probeConfig, ip+":"+port, ip+":"+port, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/gorilla/mux"

"github.com/weaveworks/scope/common/hostname"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)

var (
Expand Down
19 changes: 19 additions & 0 deletions common/xfer/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package xfer

const (
// AppPort is the default port that the app will use for its HTTP server.
// The app publishes the API and user interface, and receives reports from
// probes, on this port.
AppPort = 4040

// ScopeProbeIDHeader is the header we use to carry the probe's unique ID. The
// ID is currently set to the a random string on probe startup.
ScopeProbeIDHeader = "X-Scope-Probe-ID"
)

// Details are some generic details that can be fetched from /api
type Details struct {
ID string `json:"id"`
Version string `json:"version"`
Hostname string `json:"hostname"`
}
File renamed without changes.
File renamed without changes.
7 changes: 4 additions & 3 deletions experimental/demoprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
"strconv"
"time"

"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
"github.com/weaveworks/scope/probe/docker"
"github.com/weaveworks/scope/probe/process"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)

func main() {
Expand All @@ -23,15 +24,15 @@ func main() {
)
flag.Parse()

client, err := xfer.NewAppClient(xfer.ProbeConfig{
client, err := appclient.NewAppClient(appclient.ProbeConfig{
Token: "demoprobe",
ProbeID: "demoprobe",
Insecure: false,
}, *publish, *publish, nil)
if err != nil {
log.Fatal(err)
}
rp := xfer.NewReportPublisher(client)
rp := appclient.NewReportPublisher(client)

rand.Seed(time.Now().UnixNano())
for range time.Tick(*publishInterval) {
Expand Down
7 changes: 4 additions & 3 deletions experimental/fixprobe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
"os"
"time"

"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/xfer"
)

func main() {
Expand All @@ -34,7 +35,7 @@ func main() {
}
f.Close()

client, err := xfer.NewAppClient(xfer.ProbeConfig{
client, err := appclient.NewAppClient(appclient.ProbeConfig{
Token: "fixprobe",
ProbeID: "fixprobe",
Insecure: false,
Expand All @@ -43,7 +44,7 @@ func main() {
log.Fatal(err)
}

rp := xfer.NewReportPublisher(client)
rp := appclient.NewReportPublisher(client)
for range time.Tick(*publishInterval) {
rp.Publish(fixedReport)
}
Expand Down
28 changes: 11 additions & 17 deletions xfer/app_client.go → probe/appclient/app_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package xfer
package appclient

import (
"encoding/json"
Expand All @@ -13,25 +13,19 @@ import (
"github.com/gorilla/websocket"

"github.com/weaveworks/scope/common/sanitize"
"github.com/weaveworks/scope/common/xfer"
)

const (
initialBackoff = 1 * time.Second
maxBackoff = 60 * time.Second
)

// Details are some generic details that can be fetched from /api
type Details struct {
ID string `json:"id"`
Version string `json:"version"`
Hostname string `json:"hostname"`
}

// AppClient is a client to an app for dealing with controls.
type AppClient interface {
Details() (Details, error)
Details() (xfer.Details, error)
ControlConnection()
PipeConnection(string, Pipe)
PipeConnection(string, xfer.Pipe)
PipeClose(string) error
Publish(r io.Reader) error
Stop()
Expand All @@ -58,11 +52,11 @@ type appClient struct {
readers chan io.Reader

// For controls
control ControlHandler
control xfer.ControlHandler
}

// NewAppClient makes a new appClient.
func NewAppClient(pc ProbeConfig, hostname, target string, control ControlHandler) (AppClient, error) {
func NewAppClient(pc ProbeConfig, hostname, target string, control xfer.ControlHandler) (AppClient, error) {
httpTransport, err := pc.getHTTPTransport(hostname)
if err != nil {
return nil, err
Expand Down Expand Up @@ -144,8 +138,8 @@ func (c *appClient) Stop() {
}

// Details fetches the details (version, id) of the app.
func (c *appClient) Details() (Details, error) {
result := Details{}
func (c *appClient) Details() (xfer.Details, error) {
result := xfer.Details{}
req, err := c.ProbeConfig.authorizedRequest("GET", sanitize.URL("", 0, "/api")(c.target), nil)
if err != nil {
return result, err
Expand Down Expand Up @@ -202,7 +196,7 @@ func (c *appClient) controlConnection() (bool, error) {
conn.Close()
}()

codec := NewJSONWebsocketCodec(conn)
codec := xfer.NewJSONWebsocketCodec(conn)
server := rpc.NewServer()
if err := server.RegisterName("control", c.control); err != nil {
return false, err
Expand Down Expand Up @@ -271,7 +265,7 @@ func (c *appClient) Publish(r io.Reader) error {
return nil
}

func (c *appClient) pipeConnection(id string, pipe Pipe) (bool, error) {
func (c *appClient) pipeConnection(id string, pipe xfer.Pipe) (bool, error) {
headers := http.Header{}
c.ProbeConfig.authorizeHeaders(headers)
url := sanitize.URL("ws://", 0, fmt.Sprintf("/api/pipe/%s/probe", id))(c.target)
Expand All @@ -295,7 +289,7 @@ func (c *appClient) pipeConnection(id string, pipe Pipe) (bool, error) {
return false, pipe.CopyToWebsocket(remote, conn)
}

func (c *appClient) PipeConnection(id string, pipe Pipe) {
func (c *appClient) PipeConnection(id string, pipe xfer.Pipe) {
go func() {
log.Printf("Pipe %s connection to %s starting", id, c.target)
defer log.Printf("Pipe %s connection to %s exiting", id, c.target)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package xfer
package appclient

import (
"compress/gzip"
Expand All @@ -15,6 +15,7 @@ import (
"time"

"github.com/gorilla/handlers"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/report"
"github.com/weaveworks/scope/test"
)
Expand All @@ -33,7 +34,7 @@ func dummyServer(t *testing.T, expectedToken, expectedID string, expectedReport
t.Errorf("want %q, have %q", expectedToken, have)
}

if have := r.Header.Get(ScopeProbeIDHeader); expectedID != have {
if have := r.Header.Get(xfer.ScopeProbeIDHeader); expectedID != have {
t.Errorf("want %q, have %q", expectedID, have)
}

Expand Down Expand Up @@ -151,7 +152,7 @@ func TestAppClientPublish(t *testing.T) {
var (
id = "foobarbaz"
version = "imalittleteapot"
want = Details{ID: id, Version: version}
want = xfer.Details{ID: id, Version: version}
)

handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
16 changes: 12 additions & 4 deletions xfer/multi_client.go → probe/appclient/multi_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package xfer
package appclient

import (
"bytes"
Expand All @@ -10,6 +10,7 @@ import (
"strings"
"sync"

"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/report"
)

Expand All @@ -29,15 +30,22 @@ type multiClient struct {
}

type clientTuple struct {
Details
xfer.Details
AppClient
}

// Publisher is something which can send a stream of data somewhere, probably
// to a remote collector.
type Publisher interface {
Publish(io.Reader) error
Stop()
}

// MultiAppClient maintains a set of upstream apps, and ensures we have an
// AppClient for each one.
type MultiAppClient interface {
Set(hostname string, endpoints []string)
PipeConnection(appID, pipeID string, pipe Pipe) error
PipeConnection(appID, pipeID string, pipe xfer.Pipe) error
PipeClose(appID, pipeID string) error
Stop()
Publish(io.Reader) error
Expand Down Expand Up @@ -122,7 +130,7 @@ func (c *multiClient) withClient(appID string, f func(AppClient) error) error {
return f(client)
}

func (c *multiClient) PipeConnection(appID, pipeID string, pipe Pipe) error {
func (c *multiClient) PipeConnection(appID, pipeID string, pipe xfer.Pipe) error {
return c.withClient(appID, func(client AppClient) error {
client.PipeConnection(pipeID, pipe)
return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package xfer
package appclient

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package xfer_test
package appclient_test

import (
"bytes"
"io"
"runtime"
"testing"

"github.com/weaveworks/scope/xfer"
"github.com/weaveworks/scope/common/xfer"
"github.com/weaveworks/scope/probe/appclient"
)

type mockClient struct {
Expand Down Expand Up @@ -41,7 +42,7 @@ var (
a2 = &mockClient{id: "2"} // hostname a, app id 2
b2 = &mockClient{id: "2"} // hostname b, app id 2 (duplicate)
b3 = &mockClient{id: "3"} // hostname b, app id 3
factory = func(hostname, target string) (xfer.AppClient, error) {
factory = func(hostname, target string) (appclient.AppClient, error) {
switch target {
case "a1":
return a1, nil
Expand All @@ -66,7 +67,7 @@ func TestMultiClient(t *testing.T) {
}
)

mp := xfer.NewMultiAppClient(factory)
mp := appclient.NewMultiAppClient(factory)
defer mp.Stop()

// Add two hostnames with overlapping apps, check we don't add the same app twice
Expand All @@ -88,7 +89,7 @@ func TestMultiClient(t *testing.T) {
}

func TestMultiClientPublish(t *testing.T) {
mp := xfer.NewMultiAppClient(factory)
mp := appclient.NewMultiAppClient(factory)
defer mp.Stop()

sum := func() int { return a1.publish + a2.publish + b2.publish + b3.publish }
Expand Down
Loading