Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd/swarm/swarm-smoke: sliding window test #18967

Merged
merged 10 commits into from
Jan 30, 2019
98 changes: 5 additions & 93 deletions cmd/swarm/swarm-smoke/feed_upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package main

import (
"bytes"
"context"
"crypto/md5"
crand "crypto/rand"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptrace"
"os"
"os/exec"
"strings"
Expand All @@ -18,13 +16,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable "github.com/mattn/go-colorable"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1"
)
Expand All @@ -33,27 +25,6 @@ const (
feedRandomDataLength = 8
)

func cliFeedUploadAndSync(c *cli.Context) error {
metrics.GetOrRegisterCounter("feed-and-sync", nil).Inc(1)
log.Root().SetHandler(log.CallerFileHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))))

errc := make(chan error)
go func() {
errc <- feedUploadAndSync(c)
}()

select {
case err := <-errc:
if err != nil {
metrics.GetOrRegisterCounter("feed-and-sync.fail", nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter("feed-and-sync.timeout", nil).Inc(1)
return fmt.Errorf("timeout after %v sec", timeout)
}
}

// TODO: retrieve with manifest + extract repeating code
func feedUploadAndSync(c *cli.Context) error {
defer func(now time.Time) { log.Info("total time", "time", time.Since(now), "size (kb)", filesize) }(time.Now())
Expand Down Expand Up @@ -232,9 +203,10 @@ func feedUploadAndSync(c *cli.Context) error {
seed := int(time.Now().UnixNano() / 1e6)
log.Info("feed uploading to "+endpoints[0]+" and syncing", "seed", seed)

randomBytes := testutil.RandomBytes(seed, filesize*1000)
h = md5.New()
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)

hash, err := upload(&randomBytes, endpoints[0])
hash, err := upload(r, filesize*1000, endpoints[0])
if err != nil {
return err
}
Expand All @@ -243,10 +215,7 @@ func feedUploadAndSync(c *cli.Context) error {
return err
}
multihashHex := hexutil.Encode(hashBytes)
fileHash, err := digest(bytes.NewReader(randomBytes))
if err != nil {
return err
}
fileHash := h.Sum(nil)

log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fileHash))

Expand Down Expand Up @@ -307,60 +276,3 @@ func feedUploadAndSync(c *cli.Context) error {

return nil
}

func fetchFeed(topic string, user string, endpoint string, original []byte, ruid string) error {
ctx, sp := spancontext.StartSpan(context.Background(), "feed-and-sync.fetch")
defer sp.Finish()

log.Trace("sleeping", "ruid", ruid)
time.Sleep(3 * time.Second)

log.Trace("http get request (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user)

var tn time.Time
reqUri := endpoint + "/bzz-feed:/?topic=" + topic + "&user=" + user
req, _ := http.NewRequest("GET", reqUri, nil)

opentracing.GlobalTracer().Inject(
sp.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header))

trace := client.GetClientTrace("feed-and-sync - http get", "feed-and-sync", ruid, &tn)

req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
transport := http.DefaultTransport

//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}

tn = time.Now()
res, err := transport.RoundTrip(req)
if err != nil {
log.Error(err.Error(), "ruid", ruid)
return err
}

log.Trace("http get response (feed)", "ruid", ruid, "api", endpoint, "topic", topic, "user", user, "code", res.StatusCode, "len", res.ContentLength)

if res.StatusCode != 200 {
return fmt.Errorf("expected status code %d, got %v (ruid %v)", 200, res.StatusCode, ruid)
}

defer res.Body.Close()

rdigest, err := digest(res.Body)
if err != nil {
log.Warn(err.Error(), "ruid", ruid)
return err
}

if !bytes.Equal(rdigest, original) {
err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original)
log.Warn(err.Error(), "ruid", ruid)
return err
}

log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength)

return nil
}
12 changes: 9 additions & 3 deletions cmd/swarm/swarm-smoke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,25 @@ func main() {
Name: "upload_and_sync",
Aliases: []string{"c"},
Usage: "upload and sync",
Action: cliUploadAndSync,
Action: wrapCliCommand("upload-and-sync", true, uploadAndSync),
},
{
Name: "feed_sync",
Aliases: []string{"f"},
Usage: "feed update generate, upload and sync",
Action: cliFeedUploadAndSync,
Action: wrapCliCommand("feed-and-sync", true, feedUploadAndSync),
},
{
Name: "upload_speed",
Aliases: []string{"u"},
Usage: "measure upload speed",
Action: cliUploadSpeed,
Action: wrapCliCommand("upload-speed", true, uploadSpeed),
},
{
Name: "sliding_window",
Aliases: []string{"s"},
Usage: "measure network aggregate capacity",
Action: wrapCliCommand("sliding-window", false, slidingWindow),
},
}

Expand Down
122 changes: 122 additions & 0 deletions cmd/swarm/swarm-smoke/sliding_window.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.

package main

import (
"crypto/md5"
crand "crypto/rand"
"fmt"
"io"
"math/rand"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/pborman/uuid"

cli "gopkg.in/urfave/cli.v1"
)

var seed = time.Now().UTC().UnixNano()

func init() {
rand.Seed(seed)
}

type uploadResult struct {
hash string
digest []byte
}

func slidingWindow(c *cli.Context) error {
defer func(now time.Time) {
totalTime := time.Since(now)

log.Info("total time", "time", totalTime)
metrics.GetOrRegisterCounter("sliding-window.total-time", nil).Inc(int64(totalTime))
}(time.Now())

generateEndpoints(scheme, cluster, appName, from, to)
hashes := []uploadResult{} //swarm hashes of the uploads
nodes := to - from
const iterationTimeout = 30 * time.Second
log.Info("sliding window test started", "nodes", nodes, "filesize(kb)", filesize, "timeout", timeout)
uploadedBytes := 0
networkDepth := 0
errored := false

outer:
for {
log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed)

h := md5.New()
r := io.TeeReader(io.LimitReader(crand.Reader, int64(filesize*1000)), h)
t1 := time.Now()

hash, err := upload(r, filesize*1000, endpoints[0])
if err != nil {
log.Error(err.Error())
return err
}

metrics.GetOrRegisterResettingTimer("sliding-window.upload-time", nil).UpdateSince(t1)

fhash := h.Sum(nil)

log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash), "sleeping", syncDelay)
hashes = append(hashes, uploadResult{hash: hash, digest: fhash})
time.Sleep(time.Duration(syncDelay) * time.Second)
uploadedBytes += filesize * 1000

for i, v := range hashes {
timeout := time.After(time.Duration(timeout) * time.Second)
errored = false

inner:
for {
select {
case <-timeout:
errored = true
log.Error("error retrieving hash. timeout", "hash idx", i, "err", err)
metrics.GetOrRegisterCounter("sliding-window.single.error", nil).Inc(1)
break inner
default:
randIndex := 1 + rand.Intn(len(endpoints)-1)
ruid := uuid.New()[:8]
start := time.Now()
err := fetch(v.hash, endpoints[randIndex], v.digest, ruid)
if err != nil {
continue inner
}
metrics.GetOrRegisterResettingTimer("sliding-window.single.fetch-time", nil).UpdateSince(start)
break inner
}
}

if errored {
break outer
}
networkDepth = i
metrics.GetOrRegisterGauge("sliding-window.network-depth", nil).Update(int64(networkDepth))
}
}

log.Info("sliding window test finished", "errored?", errored, "networkDepth", networkDepth, "networkDepth(kb)", networkDepth*filesize)
log.Info("stats", "uploadedFiles", len(hashes), "uploadedKb", uploadedBytes/1000, "filesizeKb", filesize)

return nil
}
Loading