From 4eb3b9e8d2135b0af65ad9986067fe75d9638c5b Mon Sep 17 00:00:00 2001 From: dirkmc Date: Fri, 8 Apr 2022 15:57:36 +0200 Subject: [PATCH] Speed up tests on CI (#409) * feat: speed up tests on CI * add configs for batch precommit and commit * dtypes for lotus ; mockProofs for dummydeal * remove lotus-artefacts * enable market v1 deals; split itest for market v1 in two circleci jobs * split itest for market v1 * fetch-params with boostx; remove lotus binary dep in circleci config.yml * use real proofs Co-authored-by: Anton Evangelatov --- .circleci/config.yml | 59 +- cmd/boostx/main.go | 1 + cmd/boostx/utils_cmd.go | 31 +- go.mod | 4 +- go.sum | 4 +- itests/dummydeal_offline_test.go | 46 ++ itests/dummydeal_test.go | 44 +- itests/framework/framework.go | 752 +++++++++++++++++++++++++ itests/framework/log.go | 15 + itests/main_test.go | 89 --- itests/markets_v1_deal_test.go | 233 +------- itests/markets_v1_offline_deal_test.go | 89 +++ itests/test_framework.go | 602 -------------------- node/builder.go | 2 + node/modules/storageminer.go | 177 +----- pkg/devnet/devnet.go | 24 +- storagemarket/deal_execution.go | 1 - storagemarket/helper.go | 14 +- 18 files changed, 1031 insertions(+), 1156 deletions(-) create mode 100644 itests/dummydeal_offline_test.go create mode 100644 itests/framework/framework.go create mode 100644 itests/framework/log.go delete mode 100644 itests/main_test.go create mode 100644 itests/markets_v1_offline_deal_test.go delete mode 100644 itests/test_framework.go diff --git a/.circleci/config.yml b/.circleci/config.yml index b01348dcf..f5ca3fe55 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -45,38 +45,12 @@ commands: - 'v25-8mb-lotus-params' paths: - /var/tmp/filecoin-proof-parameters/ - - run: lotus fetch-params 8388608 + - run: ./boostx fetch-params 8388608 - save_cache: name: Save parameters cache key: 'v25-8mb-lotus-params' paths: - /var/tmp/filecoin-proof-parameters/ - lotus-artefacts: - steps: - - restore_cache: - name: Restore lotus artefacts cache - keys: - - 'lotus-d118ef92' - paths: - - /tmp/lotus-artefacts - - setup_remote_docker: - version: 19.03.13 - - run: - name: copy latest stable lotus binaries - command: | - docker pull public.ecr.aws/p5f3y3g5/lotus-test:d118ef92 - id=$(docker create public.ecr.aws/p5f3y3g5/lotus-test:d118ef92) - mkdir -p /tmp/lotus-artefacts - docker cp $id:/usr/local/bin/lotus-seed /tmp/lotus-artefacts/lotus-seed - docker cp $id:/usr/local/bin/lotus-miner /tmp/lotus-artefacts/lotus-miner - docker cp $id:/usr/local/bin/lotus /tmp/lotus-artefacts/lotus - docker cp $id:/lib/libhwloc.so.5 /tmp/lotus-artefacts/libhwloc.so.5 - docker rm -v $id - - save_cache: - name: Save lotus artefacts cache - key: 'lotus-d118ef92' - paths: - - /tmp/lotus-artefacts git_fetch_all_tags: steps: - run: @@ -135,14 +109,6 @@ jobs: - run: command: make debug no_output_timeout: 30m - - lotus-artefacts - - run: - name: copy library and add lotus binaries to PATH - command: | - sudo cp /tmp/lotus-artefacts/libhwloc.so.5 /lib/x86_64-linux-gnu/ - sudo cp /tmp/lotus-artefacts/lotus /usr/local/bin/ - sudo cp /tmp/lotus-artefacts/lotus-miner /usr/local/bin/ - sudo cp /tmp/lotus-artefacts/lotus-seed /usr/local/bin/ - download-params - run: name: go test @@ -318,10 +284,27 @@ workflows: tags: only: - /^v\d+\.\d+\.\d+(-rc\d+)?$/ + - test: - name: test-itest - suite: itest - target: "./itests/..." + name: test-itest-dummdeal_offline + suite: itest-dummdeal_offline + target: "./itests/dummydeal_offline_test.go" + + - test: + name: test-itest-dummdeal + suite: itest-dummdeal + target: "./itests/dummydeal_test.go" + + - test: + name: test-itest-markets_v1_deal + suite: itest-markets_v1_deal + target: "./itests/markets_v1_deal_test.go" + + - test: + name: test-itest-markets_v1_offline_deal + suite: itest-markets_v1_offline_deal + target: "./itests/markets_v1_offline_deal_test.go" + - test: name: test-all suite: all diff --git a/cmd/boostx/main.go b/cmd/boostx/main.go index f48d9647d..d48632651 100644 --- a/cmd/boostx/main.go +++ b/cmd/boostx/main.go @@ -32,6 +32,7 @@ func main() { commpCmd, generatecarCmd, marketCmd, + fetchParamCmd, }, } app.Setup() diff --git a/cmd/boostx/utils_cmd.go b/cmd/boostx/utils_cmd.go index b84f854e0..b79d20d47 100644 --- a/cmd/boostx/utils_cmd.go +++ b/cmd/boostx/utils_cmd.go @@ -22,8 +22,12 @@ import ( lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/backupds" + "github.com/docker/go-units" + paramfetch "github.com/filecoin-project/go-paramfetch" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/unixfs" "github.com/filecoin-project/lotus/node/modules" + lotus_repo "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo/imports" "github.com/ipfs/go-cidutil/cidenc" "github.com/ipfs/go-datastore" @@ -35,8 +39,6 @@ import ( "github.com/multiformats/go-multibase" "github.com/urfave/cli/v2" "golang.org/x/xerrors" - - lotus_repo "github.com/filecoin-project/lotus/node/repo" ) var marketCmd = &cli.Command{ @@ -292,3 +294,28 @@ var generatecarCmd = &cli.Command{ return nil }, } + +var fetchParamCmd = &cli.Command{ + Name: "fetch-params", + Usage: "Fetch proving parameters", + ArgsUsage: "[sectorSize]", + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + if !cctx.Args().Present() { + return xerrors.Errorf("must pass sector size to fetch params for (specify as \"32GiB\", for instance)") + } + sectorSizeInt, err := units.RAMInBytes(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("error parsing sector size (specify as \"32GiB\", for instance): %w", err) + } + sectorSize := uint64(sectorSizeInt) + + err = paramfetch.GetParams(ctx, build.ParametersJSON(), build.SrsJSON(), sectorSize) + if err != nil { + return xerrors.Errorf("fetching proof parameters: %w", err) + } + + return nil + }, +} diff --git a/go.mod b/go.mod index dac746322..da9a13b0d 100644 --- a/go.mod +++ b/go.mod @@ -28,10 +28,11 @@ require ( github.com/filecoin-project/go-fil-markets v1.20.2-0.20220325122707-b18483a0834b github.com/filecoin-project/go-jsonrpc v0.1.5 github.com/filecoin-project/go-padreader v0.0.1 + github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-state-types v0.1.3 github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/index-provider v0.5.0 - github.com/filecoin-project/lotus v1.15.1-0.20220321111228-3c1edca90295 + github.com/filecoin-project/lotus v1.15.1-0.20220407132257-de8258c7a1bd github.com/filecoin-project/specs-actors v0.9.14 github.com/filecoin-project/specs-actors/v2 v2.3.6 github.com/filecoin-project/specs-actors/v5 v5.0.4 @@ -79,7 +80,6 @@ require ( github.com/libp2p/go-libp2p-record v0.1.3 github.com/libp2p/go-libp2p-swarm v0.10.2 github.com/mattn/go-sqlite3 v1.14.9 - github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multibase v0.0.3 diff --git a/go.sum b/go.sum index cbd1d354a..de379ea01 100644 --- a/go.sum +++ b/go.sum @@ -373,8 +373,8 @@ github.com/filecoin-project/go-storedcounter v0.1.0 h1:Mui6wSUBC+cQGHbDUBcO7rfh5 github.com/filecoin-project/go-storedcounter v0.1.0/go.mod h1:4ceukaXi4vFURIoxYMfKzaRF5Xv/Pinh2oTnoxpv+z8= github.com/filecoin-project/index-provider v0.5.0 h1:k2C1RFvOvxmA2i8bhmkb3b4qun7RDRDzzs/y25/TwQg= github.com/filecoin-project/index-provider v0.5.0/go.mod h1:KHVrP2vU3YuScb+fawObwTFoR882up9U07kk0ZrfP0c= -github.com/filecoin-project/lotus v1.15.1-0.20220321111228-3c1edca90295 h1:mCENEontHYNg8ze6HNJioIqwJwhThlkJ19pcjxlatfQ= -github.com/filecoin-project/lotus v1.15.1-0.20220321111228-3c1edca90295/go.mod h1:lWTkxHRmO6ir+uyMvvupnBcC0YMJXXQ+5cOdXuCyoFU= +github.com/filecoin-project/lotus v1.15.1-0.20220407132257-de8258c7a1bd h1:JA8Vb0ytFYBHnTVOt8+UBCv0wnd6olN/Xk5CnUq0Mz4= +github.com/filecoin-project/lotus v1.15.1-0.20220407132257-de8258c7a1bd/go.mod h1:lWTkxHRmO6ir+uyMvvupnBcC0YMJXXQ+5cOdXuCyoFU= github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= github.com/filecoin-project/specs-actors v0.9.14 h1:68PVstg2UB3ZsMLF+DKFTAs/YKsqhKWynkr0IqmVRQY= github.com/filecoin-project/specs-actors v0.9.14/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= diff --git a/itests/dummydeal_offline_test.go b/itests/dummydeal_offline_test.go new file mode 100644 index 000000000..75b3c2317 --- /dev/null +++ b/itests/dummydeal_offline_test.go @@ -0,0 +1,46 @@ +package itests + +import ( + "context" + "testing" + + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestDummydealOffline(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + framework.SetLogLevel() + framework.SetPreCommitChallengeDelay(t, 5) + f := framework.NewTestFramework(ctx, t) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.AddClientProviderBalance(abi.NewTokenAmount(1e15)) + require.NoError(t, err) + + // Create a CAR file + tempdir := t.TempDir() + randomFilepath, err := testutil.CreateRandomFile(tempdir, 5, 2000000) + require.NoError(t, err) + rootCid, carFilepath, err := testutil.CreateDenseCARv2(tempdir, randomFilepath) + require.NoError(t, err) + + // make an offline deal + offlineDealUuid := uuid.New() + res, err := f.MakeDummyDeal(offlineDealUuid, carFilepath, rootCid, "", true) + require.NoError(t, err) + require.True(t, res.Accepted) + res, err = f.Boost.BoostOfflineDealWithData(context.Background(), offlineDealUuid, carFilepath) + require.NoError(t, err) + require.True(t, res.Accepted) + err = f.WaitForDealAddedToSector(offlineDealUuid) + require.NoError(t, err) +} diff --git a/itests/dummydeal_test.go b/itests/dummydeal_test.go index 8deaf95da..d46e69fbf 100644 --- a/itests/dummydeal_test.go +++ b/itests/dummydeal_test.go @@ -7,12 +7,29 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/filecoin-project/boost/itests/framework" "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/itests/kit" "github.com/google/uuid" "github.com/stretchr/testify/require" ) func TestDummydeal(t *testing.T) { + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + framework.SetPreCommitChallengeDelay(t, 5) + f := framework.NewTestFramework(ctx, t) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + err = f.AddClientProviderBalance(abi.NewTokenAmount(1e15)) + require.NoError(t, err) + // Create a CAR file tempdir := t.TempDir() log.Debugw("using tempdir", "dir", tempdir) @@ -37,42 +54,35 @@ func TestDummydeal(t *testing.T) { // Create a new dummy deal dealUuid := uuid.New() - res, err := f.makeDummyDeal(dealUuid, carFilepath, rootCid, server.URL+"/"+filepath.Base(carFilepath), false) + // Make a deal + res, err := f.MakeDummyDeal(dealUuid, carFilepath, rootCid, server.URL+"/"+filepath.Base(carFilepath), false) require.NoError(t, err) require.True(t, res.Accepted) log.Debugw("got response from MarketDummyDeal", "res", spew.Sdump(res)) time.Sleep(2 * time.Second) + // Make a second deal - it should fail because the first deal took up all + // available space failingDealUuid := uuid.New() - res2, err2 := f.makeDummyDeal(failingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath), false) + res2, err2 := f.MakeDummyDeal(failingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath), false) require.NoError(t, err2) require.Equal(t, "cannot accept piece of size 2254421, on top of already allocated 2254421 bytes, because it would exceed max staging area size 4000000", res2.Reason) log.Debugw("got response from MarketDummyDeal for failing deal", "res2", spew.Sdump(res2)) - // Wait for the deal to be added to a sector and be cleanedup so space is made - err = f.waitForDealAddedToSector(dealUuid) + // Wait for the first deal to be added to a sector and cleaned up so space is made + err = f.WaitForDealAddedToSector(dealUuid) require.NoError(t, err) time.Sleep(100 * time.Millisecond) + // Make a third deal - it should succeed because the first deal has been cleaned up passingDealUuid := uuid.New() - res2, err2 = f.makeDummyDeal(passingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath), false) + res2, err2 = f.MakeDummyDeal(passingDealUuid, failingCarFilepath, failingRootCid, server.URL+"/"+filepath.Base(failingCarFilepath), false) require.NoError(t, err2) require.True(t, res2.Accepted) log.Debugw("got response from MarketDummyDeal", "res2", spew.Sdump(res2)) // Wait for the deal to be added to a sector - err = f.waitForDealAddedToSector(passingDealUuid) - require.NoError(t, err) - - // make an offline deal - offlineDealUuid := uuid.New() - res, err = f.makeDummyDeal(offlineDealUuid, carFilepath, rootCid, server.URL+"/"+filepath.Base(carFilepath), true) - require.NoError(t, err) - require.True(t, res.Accepted) - res, err = f.boost.BoostOfflineDealWithData(context.Background(), offlineDealUuid, carFilepath) - require.NoError(t, err) - require.True(t, res.Accepted) - err = f.waitForDealAddedToSector(offlineDealUuid) + err = f.WaitForDealAddedToSector(passingDealUuid) require.NoError(t, err) } diff --git a/itests/framework/framework.go b/itests/framework/framework.go new file mode 100644 index 000000000..fc2cf1273 --- /dev/null +++ b/itests/framework/framework.go @@ -0,0 +1,752 @@ +package framework + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/filecoin-project/boost/api" + boostclient "github.com/filecoin-project/boost/client" + "github.com/filecoin-project/boost/node" + "github.com/filecoin-project/boost/node/config" + "github.com/filecoin-project/boost/node/modules/dtypes" + "github.com/filecoin-project/boost/storagemarket" + "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + types2 "github.com/filecoin-project/boost/transport/types" + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + lotus_storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/exitcode" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/v1api" + lbuild "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/policy" + chaintypes "github.com/filecoin-project/lotus/chain/types" + ltypes "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/itests/kit" + lnode "github.com/filecoin-project/lotus/node" + lotus_config "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + lotus_dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/lp2p" + lotus_repo "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + files "github.com/ipfs/go-ipfs-files" + ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" + dag "github.com/ipfs/go-merkledag" + dstest "github.com/ipfs/go-merkledag/test" + unixfile "github.com/ipfs/go-unixfs/file" + "github.com/ipld/go-car" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" +) + +var Log = logging.Logger("boosttest") + +type TestFramework struct { + ctx context.Context + Stop func() + + HomeDir string + Client *boostclient.StorageClient + Boost api.Boost + FullNode *kit.TestFullNode + LotusMiner *kit.TestMiner + ClientAddr address.Address + MinerAddr address.Address + DefaultWallet address.Address +} + +func NewTestFramework(ctx context.Context, t *testing.T) *TestFramework { + tempHome, _ := ioutil.TempDir("", "boost-tests-") + fullNode, miner := FullNodeAndMiner(t) + + return &TestFramework{ + ctx: ctx, + HomeDir: tempHome, + FullNode: fullNode, + LotusMiner: miner, + } +} + +func FullNodeAndMiner(t *testing.T) (*kit.TestFullNode, *kit.TestMiner) { + // enable 8MiB proofs so we can conduct larger transfers. + policy.SetSupportedProofTypes( + abi.RegisteredSealProof_StackedDrg2KiBV1, + abi.RegisteredSealProof_StackedDrg8MiBV1, + ) + t.Cleanup(func() { // reset when done. + policy.SetSupportedProofTypes( + abi.RegisteredSealProof_StackedDrg2KiBV1, + ) + }) + + // Set up a full node and a miner (without markets) + var fullNode kit.TestFullNode + var miner kit.TestMiner + + // 8MiB sectors + secSizeOpt := kit.SectorSize(8 << 20) + minerOpts := []kit.NodeOpt{ + kit.WithSubsystems(kit.SSealing, kit.SSectorStorage, kit.SMining), + kit.DisableLibp2p(), + kit.ThroughRPC(), + secSizeOpt, + kit.ConstructorOpts(lnode.Options( + lnode.Override(new(lotus_dtypes.GetSealingConfigFunc), func() (lotus_dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + cfg := lotus_config.DefaultStorageMiner() + sc := modules.ToSealingConfig(cfg.Dealmaking, cfg.Sealing) + sc.MaxWaitDealsSectors = 2 + sc.MaxSealingSectors = 1 + sc.MaxSealingSectorsForDeals = 3 + sc.AlwaysKeepUnsealedCopy = true + sc.WaitDealsDelay = time.Hour + sc.BatchPreCommits = false + sc.AggregateCommits = false + + return sc, nil + }, nil + }), + )), + } + fnOpts := []kit.NodeOpt{ + kit.ConstructorOpts( + lnode.Override(new(lp2p.RawHost), func() (host.Host, error) { + return libp2p.New() + }), + ), + kit.ThroughRPC(), + secSizeOpt, + } + + eOpts := []kit.EnsembleOpt{ + //TODO: at the moment we are not mocking proofs + //maybe enable this in the future to speed up tests further + + //kit.MockProofs(), + } + + blockTime := 100 * time.Millisecond + ens := kit.NewEnsemble(t, eOpts...).FullNode(&fullNode, fnOpts...).Miner(&miner, &fullNode, minerOpts...).Start() + ens.BeginMining(blockTime) + + return &fullNode, &miner +} + +func (f *TestFramework) Start() error { + lapi.RunningNodeType = lapi.NodeMiner + + fullnodeApi := f.FullNode + + // Make sure that default wallet has been setup successfully + defaultWallet, err := fullnodeApi.WalletDefaultAddress(f.ctx) + if err != nil { + return err + } + + f.DefaultWallet = defaultWallet + + bal, err := fullnodeApi.WalletBalance(f.ctx, defaultWallet) + if err != nil { + return err + } + + Log.Infof("default wallet %s has %d attoFIL", defaultWallet, bal) + + // Create a wallet for the client with some funds + var wg sync.WaitGroup + wg.Add(1) + var clientAddr address.Address + go func() { + Log.Info("Creating client wallet") + + clientAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) + + amt := abi.NewTokenAmount(1e18) + _ = sendFunds(f.ctx, fullnodeApi, clientAddr, amt) + Log.Infof("Created client wallet %s with %d attoFil", clientAddr, amt) + wg.Done() + }() + + // Create a wallet for publish storage deals with some funds + wg.Add(1) + var psdWalletAddr address.Address + go func() { + Log.Info("Creating publish storage deals wallet") + psdWalletAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) + + amt := abi.NewTokenAmount(1e18) + _ = sendFunds(f.ctx, fullnodeApi, psdWalletAddr, amt) + Log.Infof("Created publish storage deals wallet %s with %d attoFil", psdWalletAddr, amt) + wg.Done() + }() + wg.Wait() + + f.ClientAddr = clientAddr + + f.Client, err = boostclient.NewStorageClient(f.ClientAddr, f.FullNode) + if err != nil { + return err + } + + minerApi := f.LotusMiner + + minerAddr, err := minerApi.ActorAddress(f.ctx) + if err != nil { + return err + } + + Log.Debugw("got miner actor addr", "addr", minerAddr) + + f.MinerAddr = minerAddr + + // Set the control address for the storage provider to be the publish + // storage deals wallet + _ = f.setControlAddress(psdWalletAddr) + + // Create an in-memory repo + r := lotus_repo.NewMemory(nil) + + lr, err := r.Lock(node.Boost) + if err != nil { + return err + } + + // Set up the datastore + ds, err := lr.Datastore(f.ctx, "/metadata") + if err != nil { + return err + } + + // Set the miner address in the datastore + err = ds.Put(f.ctx, datastore.NewKey("miner-address"), minerAddr.Bytes()) + if err != nil { + return err + } + + // Set some config values on the repo + c, err := lr.Config() + if err != nil { + return err + } + + token, err := f.LotusMiner.AuthNew(f.ctx, api.AllPermissions) + if err != nil { + return err + } + apiInfo := fmt.Sprintf("%s:%s", token, f.LotusMiner.ListenAddr) + Log.Debugf("miner API info: %s", apiInfo) + + cfg, ok := c.(*config.Boost) + if !ok { + return fmt.Errorf("invalid config from repo, got: %T", c) + } + cfg.SectorIndexApiInfo = apiInfo + cfg.SealerApiInfo = apiInfo + cfg.Wallets.Miner = minerAddr.String() + cfg.Wallets.PublishStorageDeals = psdWalletAddr.String() + cfg.Dealmaking.PublishMsgMaxDealsPerMsg = 1 + cfg.Dealmaking.PublishMsgPeriod = config.Duration(0) + cfg.Dealmaking.MaxStagingDealsBytes = 4000000 // 4 MB + cfg.Storage.ParallelFetchLimit = 10 + + err = lr.SetConfig(func(raw interface{}) { + rcfg := raw.(*config.Boost) + *rcfg = *cfg + }) + if err != nil { + return err + } + + err = lr.Close() + if err != nil { + return err + } + + shutdownChan := make(chan struct{}) + + // Create Boost API + stop, err := node.New(f.ctx, + node.BoostAPI(&f.Boost), + node.Override(new(dtypes.ShutdownChan), shutdownChan), + node.Base(), + node.Repo(r), + node.Override(new(v1api.FullNode), fullnodeApi), + + node.Override(new(*storage.AddressSelector), modules.AddressSelector(&lotus_config.MinerAddressConfig{ + DealPublishControl: []string{ + psdWalletAddr.String(), + }, + DisableOwnerFallback: true, + DisableWorkerFallback: true, + })), + + // Reduce publish storage deals message confidence to 1 epoch so we + // don't wait so long for publish confirmation + node.Override(new(*storagemarket.ChainDealManager), func(a v1api.FullNode) *storagemarket.ChainDealManager { + cdmCfg := storagemarket.ChainDealManagerCfg{PublishDealsConfidence: 1} + return storagemarket.NewChainDealManager(a, cdmCfg) + }), + ) + if err != nil { + return err + } + + // Instantiate the boost service JSON RPC handler. + handler, err := node.BoostHandler(f.Boost, true) + if err != nil { + return err + } + + Log.Debug("getting API endpoint of boost node") + + endpoint, err := r.APIEndpoint() + if err != nil { + return err + } + + Log.Debugw("json rpc server listening", "endpoint", endpoint) + + // Serve the RPC. + rpcStopper, err := node.ServeRPC(handler, "boost", endpoint) + if err != nil { + return err + } + + // Add boost libp2p address to boost client peer store so the client knows + // how to connect to boost + boostAddrs, err := f.Boost.NetAddrsListen(f.ctx) + if err != nil { + return err + } + f.Client.PeerStore.AddAddrs(boostAddrs.ID, boostAddrs.Addrs, time.Hour) + + // Connect full node to boost so that full node can make legacy deals + // with boost + err = f.FullNode.NetConnect(f.ctx, boostAddrs) + if err != nil { + return fmt.Errorf("unable to connect full node to boost: %w", err) + } + + // Set boost libp2p address on chain + Log.Debugw("setting peer id on chain", "peer id", boostAddrs.ID) + params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(boostAddrs.ID)}) + if err != nil { + return err + } + + minerInfo, err := fullnodeApi.StateMinerInfo(f.ctx, minerAddr, ltypes.EmptyTSK) + if err != nil { + return err + } + + msg := <ypes.Message{ + To: minerAddr, + From: minerInfo.Owner, + Method: miner.Methods.ChangePeerID, + Params: params, + Value: ltypes.NewInt(0), + } + + signed, err := fullnodeApi.MpoolPushMessage(f.ctx, msg, nil) + if err != nil { + return err + } + + Log.Debugw("waiting for set peer id message to land on chain") + mw, err := fullnodeApi.StateWaitMsg(f.ctx, signed.Cid(), 1, api.LookbackNoLimit, true) + if err != nil { + return err + } + if exitcode.Ok != mw.Receipt.ExitCode { + return errors.New("expected mw.Receipt.ExitCode to be OK") + } + + Log.Debugw("test framework setup complete") + + // Monitor for shutdown. + finishCh := node.MonitorShutdown(shutdownChan, + node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, + node.ShutdownHandler{Component: "boost", StopFunc: stop}, + ) + + f.Stop = func() { + shutdownCtx, cancel := context.WithTimeout(f.ctx, 2*time.Second) + defer cancel() + shutdownChan <- struct{}{} + _ = stop(shutdownCtx) + <-finishCh + } + + return nil +} + +// Add funds escrow in StorageMarketActor for both client and provider +func (f *TestFramework) AddClientProviderBalance(bal abi.TokenAmount) error { + var errgp errgroup.Group + errgp.Go(func() error { + Log.Infof("adding client balance %d to Storage Market Actor", bal) + mcid, err := f.FullNode.MarketAddBalance(f.ctx, f.ClientAddr, f.ClientAddr, bal) + if err != nil { + return fmt.Errorf("adding client balance to Storage Market Actor: %w", err) + } + return f.WaitMsg(mcid) + }) + errgp.Go(func() error { + mi, err := f.FullNode.StateMinerInfo(f.ctx, f.MinerAddr, chaintypes.EmptyTSK) + if err != nil { + return err + } + + Log.Infof("adding provider balance %d to Storage Market Actor", bal) + mcid, err := f.FullNode.MarketAddBalance(f.ctx, mi.Owner, f.MinerAddr, bal) + if err != nil { + return fmt.Errorf("adding provider balance to Storage Market Actor: %w", err) + } + return f.WaitMsg(mcid) + }) + err := errgp.Wait() + if err != nil { + return err + } + + Log.Info("done adding balance requirements") + return nil +} + +func (f *TestFramework) WaitForDealAddedToSector(dealUuid uuid.UUID) error { + publishCtx, cancel := context.WithTimeout(f.ctx, 300*time.Second) + defer cancel() + peerID, err := f.Boost.ID(f.ctx) + if err != nil { + return err + } + + for { + resp, err := f.Client.DealStatus(f.ctx, peerID, dealUuid) + if err != nil && !xerrors.Is(err, storagemarket.ErrDealNotFound) { + return fmt.Errorf("error getting status: %s", err.Error()) + } + + if err == nil && resp.Error == "" { + Log.Infof("deal state: %s", resp.DealStatus.Status) + switch { + case resp.DealStatus.Status == dealcheckpoints.Complete.String(): + if resp.DealStatus.Error != "" { + return fmt.Errorf("Deal Error: %s", resp.DealStatus.Error) + } + return nil + case resp.DealStatus.Status == dealcheckpoints.IndexedAndAnnounced.String(): + return nil + } + } + + select { + case <-publishCtx.Done(): + return fmt.Errorf("timed out waiting for deal to be added to a sector") + case <-time.After(time.Second): + } + } +} + +func (f *TestFramework) MakeDummyDeal(dealUuid uuid.UUID, carFilepath string, rootCid cid.Cid, url string, isOffline bool) (*api.ProviderDealRejectionInfo, error) { + cidAndSize, err := storagemarket.GenerateCommP(carFilepath) + if err != nil { + return nil, err + } + + head, err := f.FullNode.ChainHead(f.ctx) + if err != nil { + return nil, fmt.Errorf("getting chain head: %w", err) + } + startEpoch := head.Height() + abi.ChainEpoch(2000) + proposal := market.DealProposal{ + PieceCID: cidAndSize.PieceCID, + PieceSize: cidAndSize.PieceSize, + VerifiedDeal: false, + Client: f.ClientAddr, + Provider: f.MinerAddr, + Label: rootCid.String(), + StartEpoch: startEpoch, + EndEpoch: startEpoch + market2.DealMinDuration, + StoragePricePerEpoch: abi.NewTokenAmount(2000000), + ProviderCollateral: abi.NewTokenAmount(0), + ClientCollateral: abi.NewTokenAmount(0), + } + + signedProposal, err := f.signProposal(f.ClientAddr, &proposal) + if err != nil { + return nil, err + } + + Log.Debugf("Client balance requirement for deal: %d attoFil", proposal.ClientBalanceRequirement()) + Log.Debugf("Provider balance requirement for deal: %d attoFil", proposal.ProviderBalanceRequirement()) + + // Save the path to the CAR file as a transfer parameter + transferParams := &types2.HttpRequest{URL: url} + transferParamsJSON, err := json.Marshal(transferParams) + if err != nil { + return nil, err + } + + peerID, err := f.Boost.ID(f.ctx) + if err != nil { + return nil, err + } + + carFileinfo, err := os.Stat(carFilepath) + if err != nil { + return nil, err + } + + dealParams := types.DealParams{ + DealUUID: dealUuid, + ClientDealProposal: *signedProposal, + IsOffline: isOffline, + DealDataRoot: rootCid, + Transfer: types.Transfer{ + Type: "http", + Params: transferParamsJSON, + Size: uint64(carFileinfo.Size()), + }, + } + + return f.Client.StorageDeal(f.ctx, dealParams, peerID) +} + +func (f *TestFramework) signProposal(addr address.Address, proposal *market.DealProposal) (*market.ClientDealProposal, error) { + buf, err := cborutil.Dump(proposal) + if err != nil { + return nil, err + } + + sig, err := f.FullNode.WalletSign(f.ctx, addr, buf) + if err != nil { + return nil, err + } + + return &market.ClientDealProposal{ + Proposal: *proposal, + ClientSignature: *sig, + }, nil +} + +func (f *TestFramework) DefaultMarketsV1DealParams() lapi.StartDealParams { + return lapi.StartDealParams{ + Data: &lotus_storagemarket.DataRef{TransferType: lotus_storagemarket.TTGraphsync}, + EpochPrice: ltypes.NewInt(62500000), // minimum asking price + MinBlocksDuration: uint64(lbuild.MinDealDuration), + Miner: f.MinerAddr, + Wallet: f.DefaultWallet, + DealStartEpoch: 35000, + FastRetrieval: true, + } +} + +func sendFunds(ctx context.Context, sender lapi.FullNode, recipient address.Address, amount abi.TokenAmount) error { + senderAddr, err := sender.WalletDefaultAddress(ctx) + if err != nil { + return err + } + + msg := &chaintypes.Message{ + From: senderAddr, + To: recipient, + Value: amount, + } + + sm, err := sender.MpoolPushMessage(ctx, msg, nil) + if err != nil { + return err + } + + _, err = sender.StateWaitMsg(ctx, sm.Cid(), 1, 1e10, true) + return err +} + +func (f *TestFramework) setControlAddress(psdAddr address.Address) error { + mi, err := f.FullNode.StateMinerInfo(f.ctx, f.MinerAddr, chaintypes.EmptyTSK) + if err != nil { + return err + } + + cwp := &miner2.ChangeWorkerAddressParams{ + NewWorker: mi.Worker, + NewControlAddrs: []address.Address{psdAddr}, + } + sp, err := actors.SerializeParams(cwp) + if err != nil { + return err + } + + smsg, err := f.FullNode.MpoolPushMessage(f.ctx, &chaintypes.Message{ + From: mi.Owner, + To: f.MinerAddr, + Method: miner.Methods.ChangeWorkerAddress, + + Value: big.Zero(), + Params: sp, + }, nil) + if err != nil { + return err + } + + err = f.WaitMsg(smsg.Cid()) + if err != nil { + return err + } + return nil +} + +func (f *TestFramework) WaitMsg(mcid cid.Cid) error { + _, err := f.FullNode.StateWaitMsg(f.ctx, mcid, 1, 1e10, true) + return err +} + +func (f *TestFramework) WaitDealSealed(ctx context.Context, deal *cid.Cid) error { + for { + di, err := f.FullNode.ClientGetDealInfo(ctx, *deal) + if err != nil { + return err + } + + switch di.State { + case lotus_storagemarket.StorageDealAwaitingPreCommit, lotus_storagemarket.StorageDealSealing: + case lotus_storagemarket.StorageDealProposalRejected: + return errors.New("deal rejected") + case lotus_storagemarket.StorageDealFailing: + return errors.New("deal failed") + case lotus_storagemarket.StorageDealError: + return fmt.Errorf("deal errored: %s", di.Message) + case lotus_storagemarket.StorageDealActive: + return nil + } + + time.Sleep(2 * time.Second) + } +} + +func (f *TestFramework) Retrieve(ctx context.Context, t *testing.T, deal *cid.Cid, root cid.Cid, carExport bool) (path string) { + // perform retrieval. + info, err := f.FullNode.ClientGetDealInfo(ctx, *deal) + require.NoError(t, err) + + offers, err := f.FullNode.ClientFindData(ctx, root, &info.PieceCID) + require.NoError(t, err) + require.NotEmpty(t, offers, "no offers") + + carFile, err := ioutil.TempFile(f.HomeDir, "ret-car") + require.NoError(t, err) + + defer carFile.Close() //nolint:errcheck + + caddr, err := f.FullNode.WalletDefaultAddress(ctx) + require.NoError(t, err) + + updatesCtx, cancel := context.WithCancel(ctx) + updates, err := f.FullNode.ClientGetRetrievalUpdates(updatesCtx) + require.NoError(t, err) + + retrievalRes, err := f.FullNode.ClientRetrieve(ctx, offers[0].Order(caddr)) + require.NoError(t, err) +consumeEvents: + for { + var evt lapi.RetrievalInfo + select { + case <-updatesCtx.Done(): + t.Fatal("Retrieval Timed Out") + case evt = <-updates: + if evt.ID != retrievalRes.DealID { + continue + } + } + switch evt.Status { + case retrievalmarket.DealStatusCompleted: + break consumeEvents + case retrievalmarket.DealStatusRejected: + t.Fatalf("Retrieval Proposal Rejected: %s", evt.Message) + case + retrievalmarket.DealStatusDealNotFound, + retrievalmarket.DealStatusErrored: + t.Fatalf("Retrieval Error: %s", evt.Message) + } + } + cancel() + + require.NoError(t, f.FullNode.ClientExport(ctx, + lapi.ExportRef{ + Root: root, + DealID: retrievalRes.DealID, + }, + lapi.FileRef{ + Path: carFile.Name(), + IsCAR: carExport, + })) + + ret := carFile.Name() + if carExport { + actualFile := f.ExtractFileFromCAR(ctx, t, carFile) + ret = actualFile.Name() + _ = actualFile.Close() //nolint:errcheck + } + + return ret +} + +func (f *TestFramework) ExtractFileFromCAR(ctx context.Context, t *testing.T, file *os.File) (out *os.File) { + bserv := dstest.Bserv() + ch, err := car.LoadCar(ctx, bserv.Blockstore(), file) + require.NoError(t, err) + + b, err := bserv.GetBlock(ctx, ch.Roots[0]) + require.NoError(t, err) + + nd, err := ipld.Decode(b) + require.NoError(t, err) + + dserv := dag.NewDAGService(bserv) + fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd) + require.NoError(t, err) + + tmpfile, err := ioutil.TempFile(f.HomeDir, "file-in-car") + require.NoError(t, err) + + defer tmpfile.Close() //nolint:errcheck + + err = files.WriteTo(fil, tmpfile.Name()) + require.NoError(t, err) + + return tmpfile +} + +func SetPreCommitChallengeDelay(t *testing.T, delay abi.ChainEpoch) { + oldDelay := policy.GetPreCommitChallengeDelay() + policy.SetPreCommitChallengeDelay(delay) + t.Cleanup(func() { + policy.SetPreCommitChallengeDelay(oldDelay) + }) +} diff --git a/itests/framework/log.go b/itests/framework/log.go new file mode 100644 index 000000000..99a5bec6d --- /dev/null +++ b/itests/framework/log.go @@ -0,0 +1,15 @@ +package framework + +import logging "github.com/ipfs/go-log/v2" + +func SetLogLevel() { + _ = logging.SetLogLevel("boosttest", "DEBUG") + _ = logging.SetLogLevel("devnet", "DEBUG") + _ = logging.SetLogLevel("boost", "DEBUG") + _ = logging.SetLogLevel("provider", "DEBUG") + _ = logging.SetLogLevel("http-transfer", "DEBUG") + _ = logging.SetLogLevel("boost-provider", "DEBUG") + _ = logging.SetLogLevel("storagemanager", "DEBUG") + _ = logging.SetLogLevel("storageadapter", "DEBUG") + _ = logging.SetLogLevel("messagepool", "WARN") +} diff --git a/itests/main_test.go b/itests/main_test.go deleted file mode 100644 index c231ed354..000000000 --- a/itests/main_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package itests - -import ( - "context" - "fmt" - "io/ioutil" - "os" - "os/exec" - "testing" - "time" - - "github.com/filecoin-project/boost/build" - "github.com/filecoin-project/boost/pkg/devnet" - logging "github.com/ipfs/go-log/v2" -) - -var ( - tempHome string // home directory where we hold the repos for various lotus services - f *testFramework -) - -func init() { - build.MessageConfidence = 1 -} - -func setLogLevel() { - _ = logging.SetLogLevel("boosttest", "DEBUG") - _ = logging.SetLogLevel("devnet", "DEBUG") - _ = logging.SetLogLevel("boost", "DEBUG") - _ = logging.SetLogLevel("actors", "DEBUG") - _ = logging.SetLogLevel("provider", "DEBUG") - _ = logging.SetLogLevel("http-transfer", "DEBUG") - _ = logging.SetLogLevel("boost-provider", "DEBUG") - _ = logging.SetLogLevel("storagemanager", "DEBUG") -} - -func TestMain(m *testing.M) { - setLogLevel() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - tempHome, _ = ioutil.TempDir("", "boost-tests-") - - done := make(chan struct{}) - go devnet.Run(ctx, tempHome, done) - - // Wait for the miner to start up by polling it - minerReadyCmd := "lotus-miner sectors list" - for waitAttempts := 0; ; waitAttempts++ { - // Check every second - select { - case <-ctx.Done(): - return - case <-time.After(time.Second): - } - - cmd := exec.CommandContext(ctx, "sh", "-c", minerReadyCmd) - cmd.Env = []string{fmt.Sprintf("HOME=%s", tempHome)} - _, err := cmd.CombinedOutput() - if err != nil { - // Still not ready - if waitAttempts%5 == 0 { - log.Debugw("miner not ready") - } - continue - } - - // Miner is ready - log.Debugw("miner ready") - time.Sleep(5 * time.Second) // wait for AddPiece - break - } - - f = newTestFramework(ctx, tempHome) - err := f.start() - if err != nil { - log.Fatalw("test framework failed to start", "err", err.Error()) - os.Exit(1) - } - - exitcode := m.Run() - - go f.stop() - cancel() - <-done - - os.Exit(exitcode) -} diff --git a/itests/markets_v1_deal_test.go b/itests/markets_v1_deal_test.go index 216e4bbdd..11177ad51 100644 --- a/itests/markets_v1_deal_test.go +++ b/itests/markets_v1_deal_test.go @@ -1,44 +1,36 @@ package itests import ( - "bytes" "context" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" "testing" - "time" - - "github.com/ipfs/go-cid" - files "github.com/ipfs/go-ipfs-files" - ipld "github.com/ipfs/go-ipld-format" - dag "github.com/ipfs/go-merkledag" - dstest "github.com/ipfs/go-merkledag/test" - unixfile "github.com/ipfs/go-unixfs/file" - "github.com/ipld/go-car" - "github.com/minio/blake2b-simd" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/lotus/api" - lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/boost/itests/framework" "github.com/filecoin-project/boost/testutil" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" ) func TestMarketsV1Deal(t *testing.T) { - // Create a CAR file - log.Debugw("using tempdir", "dir", tempHome) + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + framework.SetPreCommitChallengeDelay(t, 5) + f := framework.NewTestFramework(ctx, t) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + // Create a CAR file + log.Debugw("using tempdir", "dir", f.HomeDir) rseed := 0 size := 7 << 20 // 7MiB file - inPath, err := testutil.CreateRandomFile(tempHome, rseed, size) + inPath, err := testutil.CreateRandomFile(f.HomeDir, rseed, size) require.NoError(t, err) - res, err := f.fullNode.ClientImport(f.ctx, lapi.FileRef{Path: inPath}) + res, err := f.FullNode.ClientImport(ctx, lapi.FileRef{Path: inPath}) require.NoError(t, err) // Create a new markets v1 deal @@ -46,198 +38,17 @@ func TestMarketsV1Deal(t *testing.T) { dp.Data.Root = res.Root log.Debugw("starting deal", "root", res.Root) - dealProposalCid, err := f.fullNode.ClientStartDeal(f.ctx, &dp) + dealProposalCid, err := f.FullNode.ClientStartDeal(ctx, &dp) require.NoError(t, err) log.Debugw("got deal proposal cid", "cid", dealProposalCid) - err = f.WaitDealSealed(f.ctx, dealProposalCid) + err = f.WaitDealSealed(ctx, dealProposalCid) require.NoError(t, err) log.Debugw("deal is sealed, starting retrieval", "cid", dealProposalCid, "root", res.Root) - outPath := retrieve(t, f.ctx, dealProposalCid, res.Root, true) + outPath := f.Retrieve(ctx, t, dealProposalCid, res.Root, true) log.Debugw("retrieval is done, compare in- and out- files", "in", inPath, "out", outPath) - assertFilesEqual(t, inPath, outPath) -} - -func TestMarketsV1OfflineDeal(t *testing.T) { - // Create a CAR file - log.Debugw("using tempdir", "dir", tempHome) - - rseed := 1 - size := 7 << 20 // 7MiB file - - inPath, err := testutil.CreateRandomFile(tempHome, rseed, size) - require.NoError(t, err) - res, err := f.fullNode.ClientImport(f.ctx, lapi.FileRef{Path: inPath}) - require.NoError(t, err) - - // Get the piece size and commP - rootCid := res.Root - pieceInfo, err := f.fullNode.ClientDealPieceCID(f.ctx, rootCid) - require.NoError(t, err) - - // Create a new markets v1 deal - dp := f.DefaultMarketsV1DealParams() - dp.Data.Root = res.Root - // Replace with params for manual storage deal (offline deal) - dp.Data.TransferType = storagemarket.TTManual - dp.Data.PieceCid = &pieceInfo.PieceCID - dp.Data.PieceSize = pieceInfo.PieceSize.Unpadded() - - log.Debugw("starting offline deal", "root", res.Root) - dealProposalCid, err := f.fullNode.ClientStartDeal(f.ctx, &dp) - require.NoError(t, err) - log.Debugw("got deal proposal cid", "cid", dealProposalCid) - - // Wait for the deal to reach StorageDealCheckForAcceptance on the client - cd, err := f.fullNode.ClientGetDealInfo(f.ctx, *dealProposalCid) - require.NoError(t, err) - require.Eventually(t, func() bool { - cd, _ := f.fullNode.ClientGetDealInfo(f.ctx, *dealProposalCid) - fmt.Println(storagemarket.DealStates[cd.State]) - return cd.State == storagemarket.StorageDealCheckForAcceptance - }, 60*time.Second, 500*time.Millisecond, "actual deal status is %s", storagemarket.DealStates[cd.State]) - - // Create a CAR file from the raw file - log.Debugw("generate out.car for miner") - carFilePath := filepath.Join(tempHome, "out.car") - err = f.fullNode.ClientGenCar(f.ctx, api.FileRef{Path: inPath}, carFilePath) - require.NoError(t, err) - - // Import the CAR file on the miner - this is the equivalent to - // transferring the file across the wire in a normal (non-offline) deal - log.Debugw("import out.car in boost") - err = f.boost.MarketImportDealData(f.ctx, *dealProposalCid, carFilePath) - require.NoError(t, err) - - log.Debugw("wait until offline deal is sealed") - err = f.WaitDealSealed(f.ctx, dealProposalCid) - require.NoError(t, err) - - log.Debugw("offline deal is sealed, starting retrieval", "cid", dealProposalCid, "root", res.Root) - outPath := retrieve(t, f.ctx, dealProposalCid, res.Root, true) - - log.Debugw("retrieval of offline deal is done, compare in- and out- files", "in", inPath, "out", outPath) - assertFilesEqual(t, inPath, outPath) -} - -// assertFilesEqual compares two files by blake2b hash equality and -// fails the test if unequal. -func assertFilesEqual(t *testing.T, left, right string) { - // initialize hashes. - leftH, rightH := blake2b.New256(), blake2b.New256() - - // open files. - leftF, err := os.Open(left) - require.NoError(t, err) - - rightF, err := os.Open(right) - require.NoError(t, err) - - // feed hash functions. - _, err = io.Copy(leftH, leftF) - require.NoError(t, err) - - _, err = io.Copy(rightH, rightF) - require.NoError(t, err) - - // compute digests. - leftD, rightD := leftH.Sum(nil), rightH.Sum(nil) - - require.True(t, bytes.Equal(leftD, rightD)) -} - -func retrieve(t *testing.T, ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool) (path string) { - // perform retrieval. - info, err := f.fullNode.ClientGetDealInfo(ctx, *deal) - require.NoError(t, err) - - offers, err := f.fullNode.ClientFindData(ctx, root, &info.PieceCID) - require.NoError(t, err) - require.NotEmpty(t, offers, "no offers") - - carFile, err := ioutil.TempFile(tempHome, "ret-car") - require.NoError(t, err) - - defer carFile.Close() //nolint:errcheck - - caddr, err := f.fullNode.WalletDefaultAddress(ctx) - require.NoError(t, err) - - updatesCtx, cancel := context.WithCancel(ctx) - updates, err := f.fullNode.ClientGetRetrievalUpdates(updatesCtx) - require.NoError(t, err) - - retrievalRes, err := f.fullNode.ClientRetrieve(ctx, offers[0].Order(caddr)) - require.NoError(t, err) -consumeEvents: - for { - var evt api.RetrievalInfo - select { - case <-updatesCtx.Done(): - t.Fatal("Retrieval Timed Out") - case evt = <-updates: - if evt.ID != retrievalRes.DealID { - continue - } - } - switch evt.Status { - case retrievalmarket.DealStatusCompleted: - break consumeEvents - case retrievalmarket.DealStatusRejected: - t.Fatalf("Retrieval Proposal Rejected: %s", evt.Message) - case - retrievalmarket.DealStatusDealNotFound, - retrievalmarket.DealStatusErrored: - t.Fatalf("Retrieval Error: %s", evt.Message) - } - } - cancel() - - require.NoError(t, f.fullNode.ClientExport(ctx, - api.ExportRef{ - Root: root, - DealID: retrievalRes.DealID, - }, - api.FileRef{ - Path: carFile.Name(), - IsCAR: carExport, - })) - - ret := carFile.Name() - if carExport { - actualFile := extractFileFromCAR(t, ctx, carFile) - ret = actualFile.Name() - _ = actualFile.Close() //nolint:errcheck - } - - return ret -} - -func extractFileFromCAR(t *testing.T, ctx context.Context, file *os.File) (out *os.File) { - bserv := dstest.Bserv() - ch, err := car.LoadCar(ctx, bserv.Blockstore(), file) - require.NoError(t, err) - - b, err := bserv.GetBlock(ctx, ch.Roots[0]) - require.NoError(t, err) - - nd, err := ipld.Decode(b) - require.NoError(t, err) - - dserv := dag.NewDAGService(bserv) - fil, err := unixfile.NewUnixfsFile(ctx, dserv, nd) - require.NoError(t, err) - - tmpfile, err := ioutil.TempFile(tempHome, "file-in-car") - require.NoError(t, err) - - defer tmpfile.Close() //nolint:errcheck - - err = files.WriteTo(fil, tmpfile.Name()) - require.NoError(t, err) - - return tmpfile + kit.AssertFilesEqual(t, inPath, outPath) } diff --git a/itests/markets_v1_offline_deal_test.go b/itests/markets_v1_offline_deal_test.go new file mode 100644 index 000000000..80222b9e7 --- /dev/null +++ b/itests/markets_v1_offline_deal_test.go @@ -0,0 +1,89 @@ +package itests + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/filecoin-project/boost/itests/framework" + "github.com/filecoin-project/boost/testutil" + "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/lotus/api" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +func TestMarketsV1OfflineDeal(t *testing.T) { + ctx := context.Background() + log := framework.Log + + kit.QuietMiningLogs() + framework.SetLogLevel() + framework.SetPreCommitChallengeDelay(t, 5) + f := framework.NewTestFramework(ctx, t) + err := f.Start() + require.NoError(t, err) + defer f.Stop() + + // Create a CAR file + log.Debugw("using tempdir", "dir", f.HomeDir) + + rseed := 1 + size := 7 << 20 // 7MiB file + inPath, err := testutil.CreateRandomFile(f.HomeDir, rseed, size) + require.NoError(t, err) + res, err := f.FullNode.ClientImport(ctx, lapi.FileRef{Path: inPath}) + require.NoError(t, err) + + // Get the piece size and commP + rootCid := res.Root + pieceInfo, err := f.FullNode.ClientDealPieceCID(ctx, rootCid) + require.NoError(t, err) + + // Create a new markets v1 deal + dp := f.DefaultMarketsV1DealParams() + dp.Data.Root = res.Root + // Replace with params for manual storage deal (offline deal) + dp.Data.TransferType = storagemarket.TTManual + dp.Data.PieceCid = &pieceInfo.PieceCID + dp.Data.PieceSize = pieceInfo.PieceSize.Unpadded() + + log.Debugw("starting offline deal", "root", res.Root) + dealProposalCid, err := f.FullNode.ClientStartDeal(ctx, &dp) + require.NoError(t, err) + log.Debugw("got deal proposal cid", "cid", dealProposalCid) + + // Wait for the deal to reach StorageDealCheckForAcceptance on the client + cd, err := f.FullNode.ClientGetDealInfo(ctx, *dealProposalCid) + require.NoError(t, err) + require.Eventually(t, func() bool { + cd, _ := f.FullNode.ClientGetDealInfo(ctx, *dealProposalCid) + fmt.Println(storagemarket.DealStates[cd.State]) + return cd.State == storagemarket.StorageDealCheckForAcceptance + }, 60*time.Second, 500*time.Millisecond, "actual deal status is %s", storagemarket.DealStates[cd.State]) + + // Create a CAR file from the raw file + log.Debugw("generate out.car for miner") + carFilePath := filepath.Join(f.HomeDir, "out.car") + err = f.FullNode.ClientGenCar(ctx, api.FileRef{Path: inPath}, carFilePath) + require.NoError(t, err) + + // Import the CAR file on the miner - this is the equivalent to + // transferring the file across the wire in a normal (non-offline) deal + log.Debugw("import out.car in boost") + err = f.Boost.MarketImportDealData(ctx, *dealProposalCid, carFilePath) + require.NoError(t, err) + + log.Debugw("wait until offline deal is sealed") + err = f.WaitDealSealed(ctx, dealProposalCid) + require.NoError(t, err) + + log.Debugw("offline deal is sealed, starting retrieval", "cid", dealProposalCid, "root", res.Root) + outPath := f.Retrieve(ctx, t, dealProposalCid, res.Root, true) + + log.Debugw("retrieval of offline deal is done, compare in- and out- files", "in", inPath, "out", outPath) + kit.AssertFilesEqual(t, inPath, outPath) +} diff --git a/itests/test_framework.go b/itests/test_framework.go deleted file mode 100644 index 1bd001f6c..000000000 --- a/itests/test_framework.go +++ /dev/null @@ -1,602 +0,0 @@ -package itests - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "math/rand" - "os" - "sync" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "github.com/filecoin-project/boost/api" - - cliutil "github.com/filecoin-project/boost/cli/util" - boostclient "github.com/filecoin-project/boost/client" - "github.com/filecoin-project/boost/node" - "github.com/filecoin-project/boost/node/config" - "github.com/filecoin-project/boost/node/modules/dtypes" - "github.com/filecoin-project/boost/pkg/devnet" - "github.com/filecoin-project/boost/storagemarket" - "github.com/filecoin-project/boost/storagemarket/types" - "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" - types2 "github.com/filecoin-project/boost/transport/types" - "github.com/filecoin-project/go-address" - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-state-types/exitcode" - lapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/api/v1api" - lbuild "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" - - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - chaintypes "github.com/filecoin-project/lotus/chain/types" - ltypes "github.com/filecoin-project/lotus/chain/types" - lotus_config "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules" - lotus_repo "github.com/filecoin-project/lotus/node/repo" - "github.com/filecoin-project/lotus/storage" - - lotus_storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/specs-actors/actors/builtin/market" - miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" - "github.com/google/uuid" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - - logging "github.com/ipfs/go-log/v2" -) - -var log = logging.Logger("boosttest") - -type testFramework struct { - ctx context.Context - homedir string - stop func() - - client *boostclient.StorageClient - boost api.Boost - fullNode lapi.FullNode - clientAddr address.Address - minerAddr address.Address - defaultWallet address.Address -} - -func newTestFramework(ctx context.Context, homedir string) *testFramework { - return &testFramework{ - ctx: ctx, - homedir: homedir, - } -} - -func (f *testFramework) start() error { - addr := "ws://127.0.0.1:1234/rpc/v1" - - // Get a FullNode API - fullnodeApiString, err := devnet.GetFullnodeEndpoint(f.ctx, f.homedir) - if err != nil { - return err - } - - apiinfo := cliutil.ParseApiInfo(fullnodeApiString) - - fullnodeApi, closerFullnode, err := client.NewFullNodeRPCV1(f.ctx, addr, apiinfo.AuthHeader()) - if err != nil { - return err - } - - f.fullNode = fullnodeApi - - err = fullnodeApi.LogSetLevel(f.ctx, "actors", "DEBUG") - if err != nil { - return err - } - - wallets, err := fullnodeApi.WalletList(f.ctx) - if err != nil { - return err - } - - // Set the default wallet for the devnet daemon - err = fullnodeApi.WalletSetDefault(f.ctx, wallets[0]) - if err != nil { - return err - } - - // Make sure that default wallet has been setup successfully - defaultWallet, err := fullnodeApi.WalletDefaultAddress(f.ctx) - if err != nil { - return err - } - - f.defaultWallet = defaultWallet - - bal, err := fullnodeApi.WalletBalance(f.ctx, defaultWallet) - if err != nil { - return err - } - - log.Infof("default wallet %s has %d attoFIL", defaultWallet, bal) - - // Create a wallet for the client with some funds - var wg sync.WaitGroup - wg.Add(1) - var clientAddr address.Address - go func() { - log.Info("Creating client wallet") - - clientAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - - amt := abi.NewTokenAmount(1e18) - _ = sendFunds(f.ctx, fullnodeApi, clientAddr, amt) - log.Infof("Created client wallet %s with %d attoFil", clientAddr, amt) - wg.Done() - }() - - // Create a wallet for publish storage deals with some funds - wg.Add(1) - var psdWalletAddr address.Address - go func() { - log.Info("Creating publish storage deals wallet") - psdWalletAddr, _ = fullnodeApi.WalletNew(f.ctx, chaintypes.KTBLS) - - amt := abi.NewTokenAmount(1e18) - _ = sendFunds(f.ctx, fullnodeApi, psdWalletAddr, amt) - log.Infof("Created publish storage deals wallet %s with %d attoFil", psdWalletAddr, amt) - wg.Done() - }() - wg.Wait() - - f.clientAddr = clientAddr - - f.client, err = boostclient.NewStorageClient(f.clientAddr, f.fullNode) - if err != nil { - return err - } - - minerEndpoint, err := devnet.GetMinerEndpoint(f.ctx, f.homedir) - if err != nil { - return err - } - - minerApiInfo := cliutil.ParseApiInfo(minerEndpoint) - minerConnAddr := "ws://127.0.0.1:2345/rpc/v0" - minerApi, closerMiner, err := client.NewStorageMinerRPCV0(f.ctx, minerConnAddr, minerApiInfo.AuthHeader()) - if err != nil { - return err - } - - log.Debugw("minerApiInfo.Address", "addr", minerApiInfo.Addr) - - minerAddr, err := minerApi.ActorAddress(f.ctx) - if err != nil { - return err - } - - log.Debugw("got miner actor addr", "addr", minerAddr) - - f.minerAddr = minerAddr - - // Set the control address for the storage provider to be the publish - // storage deals wallet - _ = f.setControlAddress(psdWalletAddr) - - // Create an in-memory repo - r := lotus_repo.NewMemory(nil) - - lr, err := r.Lock(node.Boost) - if err != nil { - return err - } - - ds, err := lr.Datastore(context.Background(), "/metadata") - if err != nil { - return err - } - - err = ds.Put(context.Background(), datastore.NewKey("miner-address"), minerAddr.Bytes()) - if err != nil { - return err - } - - // Set some config values on the repo - c, err := lr.Config() - if err != nil { - return err - } - - cfg, ok := c.(*config.Boost) - if !ok { - return fmt.Errorf("invalid config from repo, got: %T", c) - } - cfg.SectorIndexApiInfo = minerEndpoint - cfg.SealerApiInfo = minerEndpoint - cfg.Wallets.Miner = minerAddr.String() - cfg.Wallets.PublishStorageDeals = psdWalletAddr.String() - cfg.Dealmaking.PublishMsgMaxDealsPerMsg = 1 - cfg.Dealmaking.PublishMsgPeriod = config.Duration(time.Second * 1) - cfg.Dealmaking.MaxStagingDealsBytes = 4000000 - cfg.Storage.ParallelFetchLimit = 10 - - err = lr.SetConfig(func(raw interface{}) { - rcfg := raw.(*config.Boost) - *rcfg = *cfg - }) - if err != nil { - return err - } - - err = lr.Close() - if err != nil { - return err - } - - shutdownChan := make(chan struct{}) - - // Create Boost API - stop, err := node.New(f.ctx, - node.BoostAPI(&f.boost), - node.Override(new(dtypes.ShutdownChan), shutdownChan), - node.Base(), - node.Repo(r), - node.Override(new(v1api.FullNode), fullnodeApi), - - node.Override(new(*storage.AddressSelector), modules.AddressSelector(&lotus_config.MinerAddressConfig{ - DealPublishControl: []string{ - psdWalletAddr.String(), - }, - DisableOwnerFallback: true, - DisableWorkerFallback: true, - })), - ) - if err != nil { - return err - } - - // Bootstrap libp2p with full node - remoteAddrs, err := fullnodeApi.NetAddrsListen(f.ctx) - if err != nil { - return err - } - - log.Debugw("bootstrapping libp2p network with full node", "maadr", remoteAddrs) - - err = f.boost.NetConnect(f.ctx, remoteAddrs) - if err != nil { - return err - } - - // Instantiate the boost service JSON RPC handler. - handler, err := node.BoostHandler(f.boost, true) - if err != nil { - return err - } - - log.Debug("getting API endpoint of boost node") - - endpoint, err := r.APIEndpoint() - if err != nil { - return err - } - - log.Debugw("json rpc server listening", "endpoint", endpoint) - - // Serve the RPC. - rpcStopper, err := node.ServeRPC(handler, "boost", endpoint) - if err != nil { - return err - } - - // Add boost libp2p address to test client peer store so the client knows - // how to connect to boost - boostAddrs, err := f.boost.NetAddrsListen(f.ctx) - if err != nil { - return err - } - f.client.PeerStore.AddAddrs(boostAddrs.ID, boostAddrs.Addrs, time.Hour) - - // Add boost libp2p to chain - log.Debugw("serialize params") - params, err := actors.SerializeParams(&miner2.ChangePeerIDParams{NewID: abi.PeerID(boostAddrs.ID)}) - if err != nil { - return err - } - - msg := <ypes.Message{ - To: minerAddr, - //From: minerAddr, - From: defaultWallet, - Method: miner.Methods.ChangePeerID, - Params: params, - Value: ltypes.NewInt(0), - } - - log.Debugw("push message to mpool") - signed, err := fullnodeApi.MpoolPushMessage(f.ctx, msg, nil) - if err != nil { - return err - } - - log.Debugw("wait for state msg") - mw, err := fullnodeApi.StateWaitMsg(f.ctx, signed.Cid(), 2, api.LookbackNoLimit, true) - if err != nil { - return err - } - if exitcode.Ok != mw.Receipt.ExitCode { - return errors.New("expected mw.Receipt.ExitCode to be OK") - } - - log.Debugw("monitoring for shutdown") - - // Monitor for shutdown. - finishCh := node.MonitorShutdown(shutdownChan, - node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, - node.ShutdownHandler{Component: "boost", StopFunc: stop}, - ) - - f.stop = func() { - shutdownChan <- struct{}{} - _ = stop(f.ctx) - <-finishCh - closerFullnode() - closerMiner() - } - - return nil -} - -func (f *testFramework) waitForDealAddedToSector(dealUuid uuid.UUID) error { - publishCtx, cancel := context.WithTimeout(f.ctx, 300*time.Second) - defer cancel() - peerID, err := f.boost.ID(f.ctx) - if err != nil { - return err - } - - for { - resp, err := f.client.DealStatus(f.ctx, peerID, dealUuid) - if err != nil && !xerrors.Is(err, storagemarket.ErrDealNotFound) { - return fmt.Errorf("error getting status: %s", err.Error()) - } - - if err == nil && resp.Error == "" { - log.Infof("deal state: %s", resp.DealStatus) - switch { - case resp.DealStatus.Status == dealcheckpoints.Complete.String(): - return nil - case resp.DealStatus.Status == dealcheckpoints.IndexedAndAnnounced.String(): - return nil - } - } - - select { - case <-publishCtx.Done(): - return fmt.Errorf("timed out waiting for deal to be added to a sector") - case <-time.After(time.Second): - } - } -} - -func (f *testFramework) makeDummyDeal(dealUuid uuid.UUID, carFilepath string, rootCid cid.Cid, url string, isOffline bool) (*api.ProviderDealRejectionInfo, error) { - cidAndSize, err := storagemarket.GenerateCommP(carFilepath) - if err != nil { - return nil, err - } - - proposal := market.DealProposal{ - PieceCID: cidAndSize.PieceCID, - PieceSize: cidAndSize.PieceSize, - VerifiedDeal: false, - Client: f.clientAddr, - Provider: f.minerAddr, - Label: rootCid.String(), - StartEpoch: 10000 + abi.ChainEpoch(rand.Intn(30000)), - EndEpoch: 800000 + abi.ChainEpoch(rand.Intn(10000)), - StoragePricePerEpoch: abi.NewTokenAmount(2000000), - ProviderCollateral: abi.NewTokenAmount(0), - ClientCollateral: abi.NewTokenAmount(0), - } - - signedProposal, err := f.signProposal(f.clientAddr, &proposal) - if err != nil { - return nil, err - } - - log.Debugf("Client balance requirement for deal: %d attoFil", proposal.ClientBalanceRequirement()) - log.Debugf("Provider balance requirement for deal: %d attoFil", proposal.ProviderBalanceRequirement()) - - clientBal, err := f.fullNode.WalletBalance(f.ctx, f.clientAddr) - if err != nil { - return nil, err - } - log.Debugf("Client balance: %d attoFil", clientBal) - - provBal, err := f.fullNode.WalletBalance(f.ctx, f.minerAddr) - if err != nil { - return nil, err - } - log.Debugf("Provider balance: %d attoFil", provBal) - - // Add client and provider funds for deal to StorageMarketActor - var errgp errgroup.Group - errgp.Go(func() error { - bal := proposal.ClientBalanceRequirement() - log.Infof("adding client balance requirement %d to Storage Market Actor", bal) - if big.Cmp(bal, big.Zero()) > 0 { - mcid, err := f.fullNode.MarketAddBalance(f.ctx, f.clientAddr, f.clientAddr, bal) - if err != nil { - return err - } - return f.WaitMsg(mcid) - } - return nil - }) - errgp.Go(func() error { - bal := proposal.ProviderBalanceRequirement() - log.Infof("adding provider balance requirement %d to Storage Market Actor", bal) - if big.Cmp(bal, big.Zero()) > 0 { - mcid, err := f.fullNode.MarketAddBalance(f.ctx, f.minerAddr, f.minerAddr, bal) - if err != nil { - return err - } - return f.WaitMsg(mcid) - } - return nil - }) - err = errgp.Wait() - if err != nil { - return nil, err - } - - log.Info("done adding balance requirements") - - // Save the path to the CAR file as a transfer parameter - transferParams := &types2.HttpRequest{URL: url} - transferParamsJSON, err := json.Marshal(transferParams) - if err != nil { - return nil, err - } - - peerID, err := f.boost.ID(f.ctx) - if err != nil { - return nil, err - } - - carFileinfo, err := os.Stat(carFilepath) - if err != nil { - return nil, err - } - - dealParams := types.DealParams{ - DealUUID: dealUuid, - ClientDealProposal: *signedProposal, - IsOffline: isOffline, - DealDataRoot: rootCid, - Transfer: types.Transfer{ - Type: "http", - Params: transferParamsJSON, - Size: uint64(carFileinfo.Size()), - }, - } - - return f.client.StorageDeal(f.ctx, dealParams, peerID) -} - -func (f *testFramework) signProposal(addr address.Address, proposal *market.DealProposal) (*market.ClientDealProposal, error) { - buf, err := cborutil.Dump(proposal) - if err != nil { - return nil, err - } - - sig, err := f.fullNode.WalletSign(f.ctx, addr, buf) - if err != nil { - return nil, err - } - - return &market.ClientDealProposal{ - Proposal: *proposal, - ClientSignature: *sig, - }, nil -} - -func (f *testFramework) DefaultMarketsV1DealParams() lapi.StartDealParams { - return lapi.StartDealParams{ - Data: &lotus_storagemarket.DataRef{TransferType: lotus_storagemarket.TTGraphsync}, - EpochPrice: ltypes.NewInt(62500000), // minimum asking price - MinBlocksDuration: uint64(lbuild.MinDealDuration), - Miner: f.minerAddr, - Wallet: f.defaultWallet, - DealStartEpoch: 30000 + abi.ChainEpoch(rand.Intn(10000)), - FastRetrieval: true, - } -} - -func sendFunds(ctx context.Context, sender lapi.FullNode, recipient address.Address, amount abi.TokenAmount) error { - senderAddr, err := sender.WalletDefaultAddress(ctx) - if err != nil { - return err - } - - msg := &chaintypes.Message{ - From: senderAddr, - To: recipient, - Value: amount, - } - - sm, err := sender.MpoolPushMessage(ctx, msg, nil) - if err != nil { - return err - } - - _, err = sender.StateWaitMsg(ctx, sm.Cid(), 1, 1e10, true) - return err -} - -func (f *testFramework) setControlAddress(psdAddr address.Address) error { - mi, err := f.fullNode.StateMinerInfo(f.ctx, f.minerAddr, chaintypes.EmptyTSK) - if err != nil { - return err - } - - cwp := &miner2.ChangeWorkerAddressParams{ - NewWorker: mi.Worker, - NewControlAddrs: []address.Address{psdAddr}, - } - sp, err := actors.SerializeParams(cwp) - if err != nil { - return err - } - - smsg, err := f.fullNode.MpoolPushMessage(f.ctx, &chaintypes.Message{ - From: mi.Owner, - To: f.minerAddr, - Method: miner.Methods.ChangeWorkerAddress, - - Value: big.Zero(), - Params: sp, - }, nil) - if err != nil { - return err - } - - err = f.WaitMsg(smsg.Cid()) - if err != nil { - return err - } - return nil -} - -func (f *testFramework) WaitMsg(mcid cid.Cid) error { - _, err := f.fullNode.StateWaitMsg(f.ctx, mcid, 1, 1e10, true) - return err -} - -func (f *testFramework) WaitDealSealed(ctx context.Context, deal *cid.Cid) error { - for { - di, err := f.fullNode.ClientGetDealInfo(ctx, *deal) - if err != nil { - return err - } - - switch di.State { - case lotus_storagemarket.StorageDealAwaitingPreCommit, lotus_storagemarket.StorageDealSealing: - case lotus_storagemarket.StorageDealProposalRejected: - return errors.New("deal rejected") - case lotus_storagemarket.StorageDealFailing: - return errors.New("deal failed") - case lotus_storagemarket.StorageDealError: - return fmt.Errorf("deal errored: %s", di.Message) - case lotus_storagemarket.StorageDealActive: - return nil - } - - time.Sleep(2 * time.Second) - } -} diff --git a/node/builder.go b/node/builder.go index a340c90fa..dba672a58 100644 --- a/node/builder.go +++ b/node/builder.go @@ -438,6 +438,8 @@ func ConfigBoost(c interface{}) Option { Override(new(*indexprovider.Wrapper), indexprovider.NewWrapper(cfg.DAGStore)), + Override(new(*storagemarket.ChainDealManager), modules.NewChainDealManager), + Override(new(*storagemarket.Provider), modules.NewStorageMarketProvider(walletMiner)), // GraphQL server diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 2cb5aea33..41e640fe7 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -1,16 +1,15 @@ package modules import ( - "bytes" "context" "database/sql" "errors" "fmt" - "net/http" "path" - "strings" "time" + "github.com/filecoin-project/boost/build" + "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-state-types/crypto" ctypes "github.com/filecoin-project/lotus/chain/types" @@ -29,33 +28,16 @@ import ( "github.com/filecoin-project/boost/storagemarket/lp2pimpl" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl" - rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" lotus_storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket" - "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-statestore" - lapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" - "github.com/filecoin-project/lotus/blockstore" - sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" - "github.com/filecoin-project/lotus/extern/sector-storage/stores" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/markets" "github.com/filecoin-project/lotus/markets/dagstore" - marketevents "github.com/filecoin-project/lotus/markets/loggers" - "github.com/filecoin-project/lotus/markets/pricing" "github.com/filecoin-project/lotus/markets/storageadapter" - lotus_config "github.com/filecoin-project/lotus/node/config" lotus_dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" lotus_repo "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p-core/host" "go.uber.org/fx" "go.uber.org/multierr" @@ -66,75 +48,6 @@ var ( StorageCounterDSPrefix = "/storage/nextid" ) -func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) { - m.OnReady(marketevents.ReadyLogger("retrieval provider")) - lc.Append(fx.Hook{ - - OnStart: func(ctx context.Context) error { - m.SubscribeToEvents(marketevents.RetrievalProviderLogger) - - evtType := j.RegisterEventType("markets/retrieval/provider", "state_change") - m.SubscribeToEvents(markets.RetrievalProviderJournaler(j, evtType)) - - return m.Start(ctx) - }, - OnStop: func(context.Context) error { - return m.Stop() - }, - }) -} - -func HandleMigrateProviderFunds(lc fx.Lifecycle, ds lotus_dtypes.MetadataDS, node lapi.FullNode, minerAddress lotus_dtypes.MinerAddress) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - b, err := ds.Get(ctx, datastore.NewKey("/marketfunds/provider")) - if err != nil { - if xerrors.Is(err, datastore.ErrNotFound) { - return nil - } - return err - } - - var value abi.TokenAmount - if err = value.UnmarshalCBOR(bytes.NewReader(b)); err != nil { - return err - } - ts, err := node.ChainHead(ctx) - if err != nil { - log.Errorf("provider funds migration - getting chain head: %v", err) - return nil - } - - mi, err := node.StateMinerInfo(ctx, address.Address(minerAddress), ts.Key()) - if err != nil { - log.Errorf("provider funds migration - getting miner info %s: %v", minerAddress, err) - return nil - } - - _, err = node.MarketReserveFunds(ctx, mi.Worker, address.Address(minerAddress), value) - if err != nil { - log.Errorf("provider funds migration - reserving funds (wallet %s, addr %s, funds %d): %v", - mi.Worker, minerAddress, value, err) - return nil - } - - return ds.Delete(ctx, datastore.NewKey("/marketfunds/provider")) - }, - }) -} - -// StagingBlockstore creates a blockstore for staging blocks for a miner -// in a storage deal, prior to sealing -func StagingBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r lotus_repo.LockedRepo) (lotus_dtypes.StagingBlockstore, error) { - ctx := helpers.LifecycleCtx(mctx, lc) - stagingds, err := r.Datastore(ctx, "/staging") - if err != nil { - return nil, err - } - - return blockstore.FromDatastore(stagingds), nil -} - func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter { return func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, @@ -168,79 +81,6 @@ func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dt } } -func RetrievalNetwork(h host.Host) rmnet.RetrievalMarketNetwork { - return rmnet.NewFromLibp2pHost(h) -} - -var WorkerCallsPrefix = datastore.NewKey("/worker/calls") -var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls") - -func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, urls stores.URLs) (*stores.Local, error) { - ctx := helpers.LifecycleCtx(mctx, lc) - return stores.NewLocal(ctx, ls, si, urls) -} - -func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote { - return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{}) -} - -func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor *stores.Remote, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds lotus_dtypes.MetadataDS) (*sectorstorage.Manager, error) { - ctx := helpers.LifecycleCtx(mctx, lc) - - wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) - smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix)) - - sst, err := sectorstorage.New(ctx, lstor, stor, ls, si, sc, wsts, smsts) - if err != nil { - return nil, err - } - - lc.Append(fx.Hook{ - OnStop: sst.Close, - }) - - return sst, nil -} - -// LotusRetrievalPricingFunc configures the pricing function to use for retrieval deals. // TODO(anteva): Fix me -func LotusRetrievalPricingFunc() func(_ lotus_dtypes.ConsiderOnlineRetrievalDealsConfigFunc, - _ lotus_dtypes.ConsiderOfflineRetrievalDealsConfigFunc) lotus_dtypes.RetrievalPricingFunc { - - cfg := lotus_config.DealmakingConfig{} - - return func(_ lotus_dtypes.ConsiderOnlineRetrievalDealsConfigFunc, - _ lotus_dtypes.ConsiderOfflineRetrievalDealsConfigFunc) lotus_dtypes.RetrievalPricingFunc { - if cfg.RetrievalPricing.Strategy == lotus_config.RetrievalPricingExternalMode { - return pricing.ExternalRetrievalPricingFunc(cfg.RetrievalPricing.External.Path) - } - - return retrievalimpl.DefaultPricingFunc(cfg.RetrievalPricing.Default.VerifiedDealsFreeTransfer) - } -} - -func StorageAuth(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) { - token, err := ca.AuthNew(ctx, []auth.Permission{"admin"}) - if err != nil { - return nil, xerrors.Errorf("creating storage auth header: %w", err) - } - - headers := http.Header{} - headers.Add("Authorization", "Bearer "+string(token)) - return sectorstorage.StorageAuth(headers), nil -} - -func StorageAuthWithURL(apiInfo string) func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) { - return func(ctx helpers.MetricsCtx, ca v0api.Common) (sectorstorage.StorageAuth, error) { - s := strings.Split(apiInfo, ":") - if len(s) != 2 { - return nil, errors.New("unexpected format of `apiInfo`") - } - headers := http.Header{} - headers.Add("Authorization", "Bearer "+s[0]) - return sectorstorage.StorageAuth(headers), nil - } -} - func NewConsiderOnlineStorageDealsConfigFunc(r lotus_repo.LockedRepo) (dtypes.ConsiderOnlineStorageDealsConfigFunc, error) { return func() (out bool, err error) { err = readCfg(r, func(cfg *config.Boost) { @@ -515,17 +355,24 @@ func (s *signatureVerifier) VerifySignature(ctx context.Context, sig crypto.Sign return err == nil, err } +func NewChainDealManager(a v1api.FullNode) *storagemarket.ChainDealManager { + cdmCfg := storagemarket.ChainDealManagerCfg{PublishDealsConfidence: 2 * build.MessageConfidence} + return storagemarket.NewChainDealManager(a, cdmCfg) +} + func NewStorageMarketProvider(provAddr address.Address) func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, - dagst *dagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper, lp lotus_storagemarket.StorageProvider) (*storagemarket.Provider, error) { + dagst *dagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper, lp lotus_storagemarket.StorageProvider, + cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) { return func(lc fx.Lifecycle, r repo.LockedRepo, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, - dagst *dagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper, lp lotus_storagemarket.StorageProvider) (*storagemarket.Provider, error) { + dagst *dagstore.Wrapper, ps lotus_dtypes.ProviderPieceStore, ip *indexprovider.Wrapper, + lp lotus_storagemarket.StorageProvider, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) { prov, err := storagemarket.NewProvider(r.Path(), h, sqldb, dealsDB, fundMgr, storageMgr, a, dp, provAddr, secb, - sps, storagemarket.NewChainDealManager(a), df, logsSqlDB.db, logsDB, dagst, ps, ip, lp, &signatureVerifier{a}) + sps, cdm, df, logsSqlDB.db, logsDB, dagst, ps, ip, lp, &signatureVerifier{a}) if err != nil { return nil, err } diff --git a/pkg/devnet/devnet.go b/pkg/devnet/devnet.go index 581cd16a8..1f53f2828 100644 --- a/pkg/devnet/devnet.go +++ b/pkg/devnet/devnet.go @@ -43,7 +43,7 @@ func Run(ctx context.Context, tempHome string, done chan struct{}) { cancel() } - wg.Add(3) + wg.Add(2) go func() { runLotusDaemon(ctx, tempHome) log.Debugw("shut down lotus daemon") @@ -56,11 +56,6 @@ func Run(ctx context.Context, tempHome string, done chan struct{}) { wg.Done() }() - go func() { - publishDealsPeriodicallyCmd(ctx, tempHome) - wg.Done() - }() - //TODO: Fix setDefaultWalletCmd to work with a temporary $HOME //go func() { //setDefaultWalletCmd(ctx, tempHome) @@ -96,7 +91,7 @@ func runCmdsWithLog(ctx context.Context, name string, commands [][]string, homeD func runLotusDaemon(ctx context.Context, home string) { cmds := [][]string{ {"lotus-seed", "genesis", "new", "localnet.json"}, - {"lotus-seed", "pre-seal", "--sector-size=8388608", "--num-sectors=2"}, + {"lotus-seed", "pre-seal", "--sector-size=8388608", "--num-sectors=1"}, {"lotus-seed", "genesis", "add-miner", "localnet.json", filepath.Join(home, ".genesis-sectors", "pre-seal-t01000.json")}, {"lotus", "daemon", "--lotus-make-genesis=dev.gen", @@ -137,21 +132,6 @@ func runLotusMiner(ctx context.Context, home string) { runCmdsWithLog(ctx, "lotus-miner", cmds, home) } -func publishDealsPeriodicallyCmd(ctx context.Context, homeDir string) { - for { - select { - case <-ctx.Done(): - return - case <-time.After(5 * time.Second): - } - - cmd := exec.CommandContext(ctx, "lotus-miner", - "storage-deals", "pending-publish", "--publish-now") - cmd.Env = []string{fmt.Sprintf("HOME=%s", homeDir)} - _ = cmd.Run() // we ignore errors - } -} - //func setDefaultWalletCmd(ctx context.Context, _ string) { //// TODO: do this without a shell //setDefaultWalletCmd := "lotus wallet list | grep t3 | awk '{print $1}' | xargs lotus wallet set-default" diff --git a/storagemarket/deal_execution.go b/storagemarket/deal_execution.go index 98c08dbdb..e679d3582 100644 --- a/storagemarket/deal_execution.go +++ b/storagemarket/deal_execution.go @@ -557,7 +557,6 @@ func (p *Provider) addPiece(ctx context.Context, pub event.Emitter, deal *types. return p.updateCheckpoint(pub, deal, dealcheckpoints.AddedPiece) } -// TODO Index Provider integration to announce deals to the network Indexer func (p *Provider) indexAndAnnounce(ctx context.Context, pub event.Emitter, deal *types.ProviderDealState) error { pc := deal.ClientDealProposal.Proposal.PieceCID diff --git a/storagemarket/helper.go b/storagemarket/helper.go index 1698bd8cb..84960d376 100644 --- a/storagemarket/helper.go +++ b/storagemarket/helper.go @@ -12,23 +12,27 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/build" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/ipfs/go-cid" "golang.org/x/xerrors" ) +type ChainDealManagerCfg struct { + PublishDealsConfidence uint64 +} + type ChainDealManager struct { - fullnodeApi v1api.FullNode + fullnodeApi v1api.FullNode + cfg ChainDealManagerCfg } -func NewChainDealManager(a v1api.FullNode) *ChainDealManager { - return &ChainDealManager{a} +func NewChainDealManager(a v1api.FullNode, cfg ChainDealManagerCfg) *ChainDealManager { + return &ChainDealManager{fullnodeApi: a, cfg: cfg} } func (c *ChainDealManager) WaitForPublishDeals(ctx context.Context, publishCid cid.Cid, proposal market2.DealProposal) (*storagemarket.PublishDealsWaitResult, error) { // Wait for deal to be published (plus additional time for confidence) - receipt, err := c.fullnodeApi.StateWaitMsg(ctx, publishCid, 2*build.MessageConfidence, api.LookbackNoLimit, true) + receipt, err := c.fullnodeApi.StateWaitMsg(ctx, publishCid, c.cfg.PublishDealsConfidence, api.LookbackNoLimit, true) if err != nil { return nil, xerrors.Errorf("WaitForPublishDeals errored: %w", err) }