Skip to content

Commit

Permalink
swarm/network: measure time of messages in priority queue (#19250)
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense authored Mar 20, 2019
1 parent c53c5e6 commit baded64
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 60 deletions.
7 changes: 7 additions & 0 deletions cmd/swarm/swarm-smoke/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
allhosts string
hosts []string
filesize int
inputSeed int
syncDelay int
httpPort int
wsPort int
Expand Down Expand Up @@ -74,6 +75,12 @@ func main() {
Usage: "ws port",
Destination: &wsPort,
},
cli.IntFlag{
Name: "seed",
Value: 0,
Usage: "input seed in case we need deterministic upload",
Destination: &inputSeed,
},
cli.IntFlag{
Name: "filesize",
Value: 1024,
Expand Down
35 changes: 15 additions & 20 deletions cmd/swarm/swarm-smoke/upload_and_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ import (
)

func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
// use input seed if it has been set
if inputSeed != 0 {
seed = inputSeed
}

randomBytes := testutil.RandomBytes(seed, filesize*1000)

errc := make(chan error)
Expand All @@ -47,37 +52,28 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
errc <- uploadAndSync(ctx, randomBytes, tuid)
}()

var err error
select {
case err := <-errc:
case err = <-errc:
if err != nil {
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1)
}
return err
case <-time.After(time.Duration(timeout) * time.Second):
metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)

e := fmt.Errorf("timeout after %v sec", timeout)
// trigger debug functionality on randomBytes
err := trackChunks(randomBytes[:])
if err != nil {
e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err)
}

return e
err = fmt.Errorf("timeout after %v sec", timeout)
}

// trigger debug functionality on randomBytes even on successful runs
err := trackChunks(randomBytes[:])
if err != nil {
log.Error(err.Error())
// trigger debug functionality on randomBytes
e := trackChunks(randomBytes[:])
if e != nil {
log.Error(e.Error())
}

return nil
return err
}

func trackChunks(testData []byte) error {
log.Warn("Test timed out, running chunk debug sequence")

addrs, err := getAllRefs(testData)
if err != nil {
return err
Expand All @@ -94,14 +90,14 @@ func trackChunks(testData []byte) error {

rpcClient, err := rpc.Dial(httpHost)
if err != nil {
log.Error("Error dialing host", "err", err)
log.Error("error dialing host", "err", err, "host", httpHost)
continue
}

var hasInfo []api.HasInfo
err = rpcClient.Call(&hasInfo, "bzz_has", addrs)
if err != nil {
log.Error("Error calling host", "err", err)
log.Error("error calling rpc client", "err", err, "host", httpHost)
continue
}

Expand All @@ -125,7 +121,6 @@ func trackChunks(testData []byte) error {
}

func getAllRefs(testData []byte) (storage.AddressCollection, error) {
log.Trace("Getting all references for given root hash")
datadir, err := ioutil.TempDir("", "chunk-debug")
if err != nil {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
Expand Down
1 change: 1 addition & 0 deletions metrics/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (r *reporter) makeClient() (err error) {
URL: r.url,
Username: r.username,
Password: r.password,
Timeout: 10 * time.Second,
})

return
Expand Down
7 changes: 4 additions & 3 deletions swarm/network/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,24 +204,24 @@ func (f *Fetcher) run(peers *sync.Map) {

// incoming request
case hopCount = <-f.requestC:
log.Trace("new request", "request addr", f.addr)
// 2) chunk is requested, set requested flag
// launch a request iff none been launched yet
doRequest = !requested
log.Trace("new request", "request addr", f.addr, "doRequest", doRequest)
requested = true

// peer we requested from is gone. fall back to another
// and remove the peer from the peers map
case id := <-gone:
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr)
peers.Delete(id.String())
doRequest = requested
log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest)

// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case <-waitC:
log.Trace("search timed out: requesting", "request addr", f.addr)
doRequest = requested
log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest)

// all Fetcher context closed, can quit
case <-f.ctx.Done():
Expand Down Expand Up @@ -288,6 +288,7 @@ func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources
for i = 0; i < len(sources); i++ {
req.Source = sources[i]
var err error
log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String())
sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
if err == nil {
// remove the peer from known sources
Expand Down
23 changes: 16 additions & 7 deletions swarm/network/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ package priorityqueue
import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

var (
Expand Down Expand Up @@ -69,21 +70,23 @@ READ:
case <-ctx.Done():
return
case x := <-q:
log.Trace("priority.queue f(x)", "p", p, "len(Queues[p])", len(pq.Queues[p]))
f(x)
val := x.(struct {
v interface{}
t time.Time
})
f(val.v)
metrics.GetOrRegisterResettingTimer("pq.run", nil).UpdateSince(val.t)
p = top
default:
if p > 0 {
p--
log.Trace("priority.queue p > 0", "p", p)
continue READ
}
p = top
select {
case <-ctx.Done():
return
case <-pq.wakeup:
log.Trace("priority.queue wakeup", "p", p)
}
}
}
Expand All @@ -95,9 +98,15 @@ func (pq *PriorityQueue) Push(x interface{}, p int) error {
if p < 0 || p >= len(pq.Queues) {
return errBadPriority
}
log.Trace("priority.queue push", "p", p, "len(Queues[p])", len(pq.Queues[p]))
val := struct {
v interface{}
t time.Time
}{
x,
time.Now(),
}
select {
case pq.Queues[p] <- x:
case pq.Queues[p] <- val:
default:
return ErrContention
}
Expand Down
12 changes: 12 additions & 0 deletions swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
osp.LogFields(olog.Bool("delivered", true))
return
}
osp.LogFields(olog.Bool("skipCheck", false))
Expand Down Expand Up @@ -216,20 +217,29 @@ type ChunkDeliveryMsgSyncing ChunkDeliveryMsg

// chunk delivery msg is response to retrieverequest msg
func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *ChunkDeliveryMsg) error {
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx,
"handle.chunk.delivery")

processReceivedChunksCount.Inc(1)

// retrieve the span for the originating retrieverequest
spanId := fmt.Sprintf("stream.send.request.%v.%v", sp.ID(), req.Addr)
span := tracing.ShiftSpanByKey(spanId)

log.Trace("handle.chunk.delivery", "ref", req.Addr, "from peer", sp.ID())

go func() {
defer osp.Finish()

if span != nil {
span.LogFields(olog.String("finish", "from handleChunkDeliveryMsg"))
defer span.Finish()
}

req.peer = sp
log.Trace("handle.chunk.delivery", "put", req.Addr)
err := d.chunkStore.Put(ctx, storage.NewChunk(req.Addr, req.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
Expand All @@ -239,6 +249,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req *Ch
req.peer.Drop(err)
}
}
log.Trace("handle.chunk.delivery", "done put", req.Addr, "err", err)
}()
return nil
}
Expand Down Expand Up @@ -284,6 +295,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
// this span will finish only when delivery is handled (or times out)
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ func (r *Registry) APIs() []rpc.API {
Namespace: "stream",
Version: "3.0",
Service: r.api,
Public: true,
Public: false,
},
}
}
Expand Down
5 changes: 2 additions & 3 deletions swarm/storage/chunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,6 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in
chunkData, err := r.getter.Get(ctx, Reference(childAddress))
if err != nil {
metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
select {
case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
case <-quitC:
Expand All @@ -561,12 +560,12 @@ func (r *LazyChunkReader) join(ctx context.Context, b []byte, off int64, eoff in

// Read keeps a cursor so cannot be called simulateously, see ReadAt
func (r *LazyChunkReader) Read(b []byte) (read int, err error) {
log.Debug("lazychunkreader.read", "key", r.addr)
log.Trace("lazychunkreader.read", "key", r.addr)
metrics.GetOrRegisterCounter("lazychunkreader.read", nil).Inc(1)

read, err = r.ReadAt(b, r.off)
if err != nil && err != io.EOF {
log.Debug("lazychunkreader.readat", "read", read, "err", err)
log.Trace("lazychunkreader.readat", "read", read, "err", err)
metrics.GetOrRegisterCounter("lazychunkreader.read.err", nil).Inc(1)
}

Expand Down
3 changes: 3 additions & 0 deletions swarm/storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ func (n *NetStore) Put(ctx context.Context, ch Chunk) error {

// if chunk is now put in the store, check if there was an active fetcher and call deliver on it
// (this delivers the chunk to requestors via the fetcher)
log.Trace("n.getFetcher", "ref", ch.Address())
if f := n.getFetcher(ch.Address()); f != nil {
log.Trace("n.getFetcher deliver", "ref", ch.Address())
f.deliver(ctx, ch)
}
return nil
Expand Down Expand Up @@ -341,5 +343,6 @@ func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
f.chunk = ch
// closing the deliveredC channel will terminate ongoing requests
close(f.deliveredC)
log.Trace("n.getFetcher close deliveredC", "ref", ch.Address())
})
}
2 changes: 2 additions & 0 deletions swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,8 @@ func (s *Swarm) APIs() []rpc.API {

apis = append(apis, s.bzz.APIs()...)

apis = append(apis, s.streamer.APIs()...)

if s.ps != nil {
apis = append(apis, s.ps.APIs()...)
}
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/opentracing/opentracing-go/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 3 additions & 15 deletions vendor/github.com/opentracing/opentracing-go/Makefile

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/opentracing/opentracing-go/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit baded64

Please sign in to comment.