Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions extern/sector-storage/ffiwrapper/sealer_cgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, pha

func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateOut, error) {
empty := storage.ReplicaUpdateOut{}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTSealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)
if err != nil {
return empty, xerrors.Errorf("failed to acquire sector paths: %w", err)
}
Expand Down Expand Up @@ -718,7 +718,7 @@ func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, p
}

func (sb *Sealer) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storage.ReplicaVanillaProofs, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache|storiface.FTUpdateCache|storiface.FTUpdate, storiface.FTNone, storiface.PathSealing)
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return nil, xerrors.Errorf("failed to acquire sector paths: %w", err)
}
Expand Down
11 changes: 7 additions & 4 deletions extern/sector-storage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,13 +715,13 @@ func (m *Manager) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, p
return out, waitErr
}

if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed|storiface.FTSealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
return storage.ReplicaUpdateOut{}, xerrors.Errorf("acquiring sector lock: %w", err)
}

selector := newAllocSelector(m.index, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)

err = m.sched.Schedule(ctx, sector, sealtasks.TTReplicaUpdate, selector, m.schedFetch(sector, storiface.FTSealed, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sector, sealtasks.TTReplicaUpdate, selector, m.schedFetch(sector, storiface.FTUnsealed|storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
log.Errorf("scheduled work for replica update")
err := m.startWork(ctx, w, wk)(w.ReplicaUpdate(ctx, sector, pieces))
if err != nil {
Expand Down Expand Up @@ -768,9 +768,12 @@ func (m *Manager) ProveReplicaUpdate1(ctx context.Context, sector storage.Sector
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
}

selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTSealed|storiface.FTCache, true)
// NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTSealed|storiface.FTCache, false)

err = m.sched.Schedule(ctx, sector, sealtasks.TTProveReplicaUpdate1, selector, m.schedFetch(sector, storiface.FTSealed, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
err = m.sched.Schedule(ctx, sector, sealtasks.TTProveReplicaUpdate1, selector, m.schedFetch(sector, storiface.FTSealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {

err := m.startWork(ctx, w, wk)(w.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed))
if err != nil {
Expand Down
20 changes: 11 additions & 9 deletions extern/sector-storage/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
var log = logging.Logger("sbmock")

type SectorMgr struct {
sectors map[abi.SectorID]*sectorState
failPoSt bool
pieces map[cid.Cid][]byte
nextSectorID abi.SectorNumber
sectors map[abi.SectorID]*sectorState
failPoSt bool
pieces map[cid.Cid][]byte
nextSectorID abi.SectorNumber
alwaysUnsealed bool

lk sync.Mutex
}
Expand All @@ -40,7 +41,7 @@ type mockVerifProver struct {
aggregates map[string]proof.AggregateSealVerifyProofAndInfos // used for logging bad verifies
}

func NewMockSectorMgr(genesisSectors []abi.SectorID) *SectorMgr {
func NewMockSectorMgr(genesisSectors []abi.SectorID, alwaysUnsealed bool) *SectorMgr {
sectors := make(map[abi.SectorID]*sectorState)
for _, sid := range genesisSectors {
sectors[sid] = &sectorState{
Expand All @@ -50,9 +51,10 @@ func NewMockSectorMgr(genesisSectors []abi.SectorID) *SectorMgr {
}

return &SectorMgr{
sectors: sectors,
pieces: map[cid.Cid][]byte{},
nextSectorID: 5,
sectors: sectors,
pieces: map[cid.Cid][]byte{},
nextSectorID: 5,
alwaysUnsealed: alwaysUnsealed,
}
}

Expand Down Expand Up @@ -125,7 +127,7 @@ func (mgr *SectorMgr) AcquireSectorNumber() (abi.SectorNumber, error) {
}

func (mgr *SectorMgr) IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
return false, nil
return mgr.alwaysUnsealed, nil
}

func (mgr *SectorMgr) ForceState(sid storage.SectorRef, st int) error {
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestOpFinish(t *testing.T) {
sb := NewMockSectorMgr(nil)
sb := NewMockSectorMgr(nil, false)

sid, pieces, err := sb.StageFakeData(123, abi.RegisteredSealProof_StackedDrg2KiBV1_1)
if err != nil {
Expand Down
21 changes: 20 additions & 1 deletion extern/sector-storage/piece_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"context"
"io"
"sync"

"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
Expand Down Expand Up @@ -34,6 +35,8 @@ type pieceReader struct {
r io.ReadCloser
br *bufio.Reader
rAt int64

mu sync.Mutex
}

func (p *pieceReader) init() (_ *pieceReader, err error) {
Expand Down Expand Up @@ -62,6 +65,9 @@ func (p *pieceReader) check() error {
}

func (p *pieceReader) Close() error {
p.mu.Lock()
defer p.mu.Unlock()

if err := p.check(); err != nil {
return err
}
Expand All @@ -84,16 +90,22 @@ func (p *pieceReader) Close() error {
}

func (p *pieceReader) Read(b []byte) (int, error) {
p.mu.Lock()
defer p.mu.Unlock()

if err := p.check(); err != nil {
return 0, err
}

n, err := p.ReadAt(b, p.seqAt)
n, err := p.readAtUnlocked(b, p.seqAt)
p.seqAt += int64(n)
return n, err
}

func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
p.mu.Lock()
defer p.mu.Unlock()

if err := p.check(); err != nil {
return 0, err
}
Expand All @@ -113,6 +125,13 @@ func (p *pieceReader) Seek(offset int64, whence int) (int64, error) {
}

func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()

return p.readAtUnlocked(b, off)
}

func (p *pieceReader) readAtUnlocked(b []byte, off int64) (n int, err error) {
if err := p.check(); err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/sealtasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const (
TTAddPiece TaskType = "seal/v0/addpiece"
TTPreCommit1 TaskType = "seal/v0/precommit/1"
TTPreCommit2 TaskType = "seal/v0/precommit/2"
TTCommit1 TaskType = "seal/v0/commit/1" // NOTE: We use this to transfer the sector into miner-local storage for now; Don't use on workers!
TTCommit1 TaskType = "seal/v0/commit/1"
TTCommit2 TaskType = "seal/v0/commit/2"

TTFinalize TaskType = "seal/v0/finalize"
Expand Down
2 changes: 1 addition & 1 deletion extern/sector-storage/testworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerR
lstor: lstor,
ret: ret,

mockSeal: mock.NewMockSectorMgr(nil),
mockSeal: mock.NewMockSectorMgr(nil, false),

session: uuid.New(),
}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/elastic/gosigar v0.14.1
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.13.0
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3
github.com/filecoin-project/dagstore v0.5.2-0.20220127165531-15ec8e35f3ac
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.6
github.com/filecoin-project/go-bitfield v0.2.4
Expand All @@ -36,7 +36,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.14.0
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220120121729-eb2fe1f4df58
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220123060332-16461fbde41a
github.com/filecoin-project/go-indexer-core v0.2.7
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1
Expand Down Expand Up @@ -110,7 +110,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p v0.18.0-rc1
github.com/libp2p/go-libp2p v0.18.0-rc2
github.com/libp2p/go-libp2p-connmgr v0.3.1 // indirect
github.com/libp2p/go-libp2p-core v0.14.0
github.com/libp2p/go-libp2p-discovery v0.6.0
Expand All @@ -122,9 +122,9 @@ require (
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.1.2
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.10.0
github.com/libp2p/go-libp2p-swarm v0.10.1
github.com/libp2p/go-libp2p-tls v0.3.1
github.com/libp2p/go-libp2p-yamux v0.8.0
github.com/libp2p/go-libp2p-yamux v0.8.1
github.com/libp2p/go-maddr-filter v0.1.0
github.com/mattn/go-isatty v0.0.14
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
Expand Down
18 changes: 11 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,9 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3 h1:kuZsRFDjyrDAgoHNolJBnO7xX1EETHdJ090sgD3Al5E=
github.com/filecoin-project/dagstore v0.5.2-0.20220120115845-e07b050f48d3/go.mod h1:OdlK3x5m3Mol874WC2bI79H4H2+leN+FabwWdW2D/wY=
github.com/filecoin-project/dagstore v0.5.2-0.20220121144931-72f676fe8f38/go.mod h1:7aV6HIrDeX1ypja7BeSOF9lwGX9CCbYuBjLXXXp+5sY=
github.com/filecoin-project/dagstore v0.5.2-0.20220127165531-15ec8e35f3ac h1:lb7Er4qFBD+uHR8StVbEY5MDyKWZHxBOlfh21mWISKk=
github.com/filecoin-project/dagstore v0.5.2-0.20220127165531-15ec8e35f3ac/go.mod h1:7aV6HIrDeX1ypja7BeSOF9lwGX9CCbYuBjLXXXp+5sY=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.6 h1:DWQtj38ax+ogHwyH3VULRIoT8E6loyXqsk/p81xoY7M=
Expand Down Expand Up @@ -339,8 +340,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220120121729-eb2fe1f4df58 h1:NHX4FXQQQ9dVhQBBM1BThVbYmN3EnM4Dxzvz0DYTOTk=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220120121729-eb2fe1f4df58/go.mod h1:SmDIzoUwFlej6Ac5ilBMyBImTj/TrmJmZ6wh/ILMfuI=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220123060332-16461fbde41a h1:XhjGggRUhBEBNlWnWy9lBr288gcyxXeINU0Wol4Kqvw=
github.com/filecoin-project/go-fil-markets v1.19.1-0.20220123060332-16461fbde41a/go.mod h1:j468TdQVfugNtQCDCQmJ2nQ9Uj0edBUXN3DQhzK0HCk=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
Expand Down Expand Up @@ -1078,8 +1079,9 @@ github.com/libp2p/go-libp2p v0.14.4/go.mod h1:EIRU0Of4J5S8rkockZM7eJp2S0UrCyi55m
github.com/libp2p/go-libp2p v0.15.0/go.mod h1:8Ljmwon0cZZYKrOCjFeLwQEK8bqR42dOheUZ1kSKhP0=
github.com/libp2p/go-libp2p v0.16.0/go.mod h1:ump42BsirwAWxKzsCiFnTtN1Yc+DuPu76fyMX364/O4=
github.com/libp2p/go-libp2p v0.17.0/go.mod h1:Fkin50rsGdv5mm5BshBUtPRZknt9esfmYXBOYcwOTgw=
github.com/libp2p/go-libp2p v0.18.0-rc1 h1:CFHROLGmMwe/p8tR3sHahg/1NSaZa2EGbu7nDmdC+RY=
github.com/libp2p/go-libp2p v0.18.0-rc1/go.mod h1:RgYlH7IIWHXREimC92bw5Lg1V2R5XmSzuLHb5fTnr+8=
github.com/libp2p/go-libp2p v0.18.0-rc2 h1:ZLzGMdp1cVwxmA0vFpPVUDPQYUdHHGX7I58nXwpNr7Y=
github.com/libp2p/go-libp2p v0.18.0-rc2/go.mod h1:gGNCvn0T19AzyNPDWej2vsAlZFZVnS+IxqckjnsOyM0=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E=
github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
Expand Down Expand Up @@ -1275,8 +1277,9 @@ github.com/libp2p/go-libp2p-swarm v0.5.0/go.mod h1:sU9i6BoHE0Ve5SKz3y9WfKrh8dUat
github.com/libp2p/go-libp2p-swarm v0.5.3/go.mod h1:NBn7eNW2lu568L7Ns9wdFrOhgRlkRnIDg0FLKbuu3i8=
github.com/libp2p/go-libp2p-swarm v0.8.0/go.mod h1:sOMp6dPuqco0r0GHTzfVheVBh6UEL0L1lXUZ5ot2Fvc=
github.com/libp2p/go-libp2p-swarm v0.9.0/go.mod h1:2f8d8uxTJmpeqHF/1ujjdXZp+98nNIbujVOMEZxCbZ8=
github.com/libp2p/go-libp2p-swarm v0.10.0 h1:1yr7UCwxCN92cw9g9Q+fnJSlk7lOB1RetoEewxhGVL0=
github.com/libp2p/go-libp2p-swarm v0.10.0/go.mod h1:71ceMcV6Rg/0rIQ97rsZWMzto1l9LnNquef+efcRbmA=
github.com/libp2p/go-libp2p-swarm v0.10.1 h1:lXW3pgGt+BVmkzcFX61erX7l6Lt+WAamNhwa2Kf3eJM=
github.com/libp2p/go-libp2p-swarm v0.10.1/go.mod h1:Pdkq0QU5a+qu+oyqIV3bknMsnzk9lnNyKvB9acJ5aZs=
github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
Expand Down Expand Up @@ -1326,8 +1329,9 @@ github.com/libp2p/go-libp2p-yamux v0.5.3/go.mod h1:Vy3TMonBAfTMXHWopsMc8iX/XGRYr
github.com/libp2p/go-libp2p-yamux v0.5.4/go.mod h1:tfrXbyaTqqSU654GTvK3ocnSZL3BuHoeTSqhcel1wsE=
github.com/libp2p/go-libp2p-yamux v0.6.0/go.mod h1:MRhd6mAYnFRnSISp4M8i0ClV/j+mWHo2mYLifWGw33k=
github.com/libp2p/go-libp2p-yamux v0.7.0/go.mod h1:fMyA0CsPfHkIuBU0wjRGrCjTBFiXTXxG0k5M4ETv+08=
github.com/libp2p/go-libp2p-yamux v0.8.0 h1:APQYlttIj+Rr5sfa6siojwsi0ZwcIh/exHIUl9hZr6o=
github.com/libp2p/go-libp2p-yamux v0.8.0/go.mod h1:yTkPgN2ib8FHyU1ZcVD7aelzyAqXXwEPbyx+aSKm9h8=
github.com/libp2p/go-libp2p-yamux v0.8.1 h1:pi7zUeZ4Z9TpbUMntvSvoP3dFD4SEw/VPybxBcOZGzg=
github.com/libp2p/go-libp2p-yamux v0.8.1/go.mod h1:rUozF8Jah2dL9LLGyBaBeTQeARdwhefMCTQVQt6QobE=
github.com/libp2p/go-maddr-filter v0.0.1/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q=
github.com/libp2p/go-maddr-filter v0.0.4/go.mod h1:6eT12kSQMA9x2pvFQa+xesMKUBlj9VImZbj3B9FBH/Q=
github.com/libp2p/go-maddr-filter v0.0.5/go.mod h1:Jk+36PMfIqCJhAnaASRH83bdAvfDRp/w6ENFaC9bG+M=
Expand Down
49 changes: 46 additions & 3 deletions itests/deals_anycid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@ package itests
import (
"bufio"
"context"
"errors"
"os"
"testing"
"time"

"golang.org/x/sync/errgroup"

"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
rhelp "github.com/libp2p/go-libp2p-routing-helpers"

selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"

"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -60,9 +69,16 @@ func TestDealRetrieveByAnyCid(t *testing.T) {
eightMBSectorsOpt := kit.SectorSize(8 << 20)

// Create a client, and a miner with its own full node
_, client, miner, ens := kit.EnsembleTwoOne(t, kit.MockProofs(), bsaOpt, eightMBSectorsOpt)
_, client, miner, ens := kit.EnsembleTwoOne(t, kit.MockProofs(), bsaOpt, eightMBSectorsOpt, kit.AlwaysUnsealed())
ens.InterconnectAll().BeginMining(250 * time.Millisecond)

hs, err := ens.MockNet.GenPeer()
require.NoError(t, err)

bitswapbs := bstore.NewBlockstore(datastore.NewMapDatastore())

bsclient := bitswap.New(ctx, network.NewFromIpfsHost(hs, rhelp.Null{}), bitswapbs, bitswap.MaxOutstandingBytesPerPeer(1<<30))

dh := kit.NewDealHarness(t, client, miner, miner)

// Generate a DAG with multiple levels, so that we can test the case where
Expand Down Expand Up @@ -102,6 +118,7 @@ func TestDealRetrieveByAnyCid(t *testing.T) {
dp.Data.Root = res.Root
dp.DealStartEpoch = startEpoch
dp.EpochPrice = abi.NewTokenAmount(62500000) // minimum asking price
dp.FastRetrieval = true
dealCid := dh.StartDeal(ctx, dp)

// Wait for the deal to be sealed
Expand All @@ -117,12 +134,37 @@ func TestDealRetrieveByAnyCid(t *testing.T) {
// Fetch the deal data
info, err := client.ClientGetDealInfo(ctx, *dealCid)
require.NoError(t, err)
require.NoError(t, ens.MockNet.LinkAll())
require.NoError(t, ens.MockNet.ConnectAllButSelf())

// Make retrievals against CIDs at different levels in the DAG
cidIndices := []int{1, 11, 27, 32, 47}
cidIndices := []int{1, 11, 27, 30, 35, 59, 32, 47}
// test bitswap concurrent fetches
var errg errgroup.Group
for _, val := range cidIndices {
t.Logf("performing retrieval for cid at index %d", val)
v := val
errg.Go(func() error {
t.Logf("performing bitswap retrieval for cid at index %d", v)
targetCid := cids[v]
blk, err := bsclient.GetBlock(ctx, targetCid)
if err != nil {
return err
}
if blk == nil {
return errors.New("not found")
}
blk, err = bitswapbs.Get(ctx, targetCid)
if err != nil {
return err
}
t.Logf("bitswap retrieval successfully completed for block %s with block content %s", targetCid, blk)
return nil
})
}
require.NoError(t, errg.Wait())

for _, val := range cidIndices {
t.Logf("performing retrieval for cid at index %d", val)
targetCid := cids[val]
offer, err := client.ClientMinerQueryOffer(ctx, miner.ActorAddr, targetCid, &info.PieceCID)
require.NoError(t, err)
Expand Down Expand Up @@ -158,5 +200,6 @@ func TestDealRetrieveByAnyCid(t *testing.T) {

kit.AssertFilesEqual(t, tmp.Name(), outputCar)
t.Log("car files match")

}
}
1 change: 1 addition & 0 deletions itests/kit/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ type RunConcurrentDealsOpts struct {
}

func (dh *DealHarness) RunConcurrentDeals(opts RunConcurrentDealsOpts) {

ctx := context.Background()
errgrp, _ := errgroup.WithContext(context.Background())
for i := 0; i < opts.N; i++ {
Expand Down
Loading