Skip to content
This repository has been archived by the owner on Feb 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from jenyasd209/fix/handle_session_errors
Browse files Browse the repository at this point in the history
Fix/handle session errors
  • Loading branch information
ynewmann authored Oct 12, 2020
2 parents 9db4bad + bd2867f commit 20c5da3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 17 deletions.
11 changes: 11 additions & 0 deletions blockstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstream

import (
"context"
"errors"
"sync"

"github.com/Wondertan/go-libp2p-access"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/Wondertan/go-blockstream/block"
)

var ErrStreamsReset = errors.New("all streams reset")

var log = logging.Logger("blockstream")

const Protocol protocol.ID = "/blockstream/1.0.0"
Expand Down Expand Up @@ -80,6 +83,7 @@ func (bs *BlockStream) Close() error {
// Autosave defines if received Blocks should be automatically put into Blockstore.
func (bs *BlockStream) Session(ctx context.Context, token access.Token, autosave bool, peers ...peer.ID) (*Session, error) {
ses := newSession(ctx)

for _, p := range peers {
s, err := bs.Host.NewStream(ctx, p, Protocol)
if err != nil {
Expand All @@ -99,6 +103,13 @@ func (bs *BlockStream) Session(ctx context.Context, token access.Token, autosave
if err := f(); err != nil {
log.Error(err)
s.Reset()
ses.removeProvider()

if ses.getProviders() == 0 {
log.Error("Closing session: ", ErrStreamsReset)
ses.err = ErrStreamsReset
ses.cancel()
}
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion blockstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ import (
)

type BlockStreamer interface {
Stream(context.Context, <-chan []cid.Cid) <-chan blocks.Block
Stream(context.Context, <-chan []cid.Cid) (<-chan blocks.Block, <-chan error)
}
12 changes: 10 additions & 2 deletions ipld/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ type offlineStreamer struct {
streamed []cid.Cid
}

func (f *offlineStreamer) Stream(ctx context.Context, ids <-chan []cid.Cid) <-chan blocks.Block {
func (f *offlineStreamer) Stream(ctx context.Context, ids <-chan []cid.Cid) (<-chan blocks.Block, <-chan error) {
out := make(chan blocks.Block, 500)

cherr := make(chan error, 1)
go func() {
defer close(out)
defer close(cherr)

for {
select {
case ids, ok := <-ids:
Expand All @@ -107,20 +111,24 @@ func (f *offlineStreamer) Stream(ctx context.Context, ids <-chan []cid.Cid) <-ch
for _, id := range ids {
b, err := f.getter(id)
if err != nil {
cherr <- err
return
}
f.streamed = append(f.streamed, id)

select {
case out <- b:
case <-ctx.Done():
cherr <- ctx.Err()
return
}
}
case <-ctx.Done():
cherr <- ctx.Err()
return
}
}
}()
return out

return out, cherr
}
25 changes: 19 additions & 6 deletions ipld/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ipld

import (
"context"
"errors"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -56,10 +57,13 @@ func Walk(ctx context.Context, id cid.Cid, bs blockstream.BlockStreamer, handler
in <- []cid.Cid{id}
defer close(in)

out := bs.Stream(ctx, in)
out, cherr := bs.Stream(ctx, in)
for {
select {
case b := <-out:
case b, ok := <-out:
if !ok {
return errors.New("stream channel closed")
}
remains--

nd, err := format.Decode(b)
Expand All @@ -74,15 +78,18 @@ func Walk(ctx context.Context, id cid.Cid, bs blockstream.BlockStreamer, handler

ids := make([]cid.Cid, 0, len(nd.Links()))
for _, l := range nd.Links() {
if l.Cid.Type() != cid.Raw {
ids = append(ids, l.Cid)
continue
}

v, err := wo.visit(l.Cid)
if err != nil {
return err
}
if !v {
continue
if v {
ids = append(ids, l.Cid)
}

ids = append(ids, l.Cid)
}

if len(ids) == 0 {
Expand All @@ -97,9 +104,15 @@ func Walk(ctx context.Context, id cid.Cid, bs blockstream.BlockStreamer, handler
case in <- ids:
case <-ctx.Done():
return ctx.Err()
case err := <-cherr:
cancel()
return err
}
case <-ctx.Done():
return ctx.Err()
case err := <-cherr:
cancel()
return err
}
}
}
Expand Down
28 changes: 22 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,34 @@ const requestBufferSize = 8
type Session struct {
reqN, prvs uint32

reqs chan *block.Request
ctx context.Context
reqs chan *block.Request
ctx context.Context
cancel context.CancelFunc

err error
}

func newSession(ctx context.Context) *Session {
ctx, cancel := context.WithCancel(ctx)
return &Session{
reqs: make(chan *block.Request, requestBufferSize),
ctx: ctx,
reqs: make(chan *block.Request, requestBufferSize),
ctx: ctx,
cancel: cancel,
}
}

// Stream starts direct BBlock fetching from remote providers. It fetches the Blocks requested with 'in' chan by their ids.
// Stream is automatically stopped when both: the requested blocks are all fetched and 'in' chan is closed.
// It might be also terminated with the provided context.
// Block order is guaranteed to be the same as requested through the `in` chan.
func (ses *Session) Stream(ctx context.Context, in <-chan []cid.Cid) <-chan blocks.Block {
func (ses *Session) Stream(ctx context.Context, in <-chan []cid.Cid) (<-chan blocks.Block, <-chan error) {
ctx, cancel := context.WithCancel(ctx)
s := block.NewStream(ctx)

err := make(chan error, 1)
go func() {
defer close(err)

for {
select {
case ids, ok := <-in:
Expand All @@ -50,14 +59,17 @@ func (ses *Session) Stream(ctx context.Context, in <-chan []cid.Cid) <-chan bloc
s.Enqueue(reqs...)
case <-ses.ctx.Done():
cancel()
if ses.err != nil {
err <- ses.err
}
return
case <-ctx.Done():
return
}
}
}()

return s.Output()
return s.Output(), err
}

// Blocks fetches Blocks by their CIDs evenly from the remote providers in the session.
Expand Down Expand Up @@ -127,6 +139,10 @@ func (ses *Session) removeProvider() {
atomic.AddUint32(&ses.prvs, ^uint32(0))
}

func (ses *Session) getProviders() uint32 {
return atomic.LoadUint32(&ses.prvs)
}

func (ses *Session) requestId() uint32 {
return atomic.AddUint32(&ses.reqN, 1)
}
10 changes: 8 additions & 2 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,15 @@ func TestSessionStream(t *testing.T) {
close(in)
}()

out := ses.Stream(ctx, in)
out, err := ses.Stream(ctx, in)
for i := 0; i < times; i++ {
assertChan(t, out, ids[i*count/times:(i+1)*count/times], count/times)
select {
case err := <-err:
assert.Nil(t, err, err)
break
default:
assertChan(t, out, ids[i*count/times:(i+1)*count/times], count/times)
}
}
}

Expand Down

0 comments on commit 20c5da3

Please sign in to comment.