Skip to content

Commit

Permalink
feat: streaming output of CAR contents
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Feb 8, 2023
1 parent cb1eb2f commit cc89830
Show file tree
Hide file tree
Showing 9 changed files with 662 additions and 816 deletions.
39 changes: 31 additions & 8 deletions cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"os"
"strings"
"time"

Expand All @@ -13,8 +14,9 @@ import (
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/storeutil"
carblockstore "github.com/ipld/go-car/v2/blockstore"
carv2 "github.com/ipld/go-car/v2"
carstore "github.com/ipld/go-car/v2/storage"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
)
Expand Down Expand Up @@ -99,9 +101,28 @@ func Fetch(c *cli.Context) error {
if c.IsSet("output") {
outfile = c.String("output")
}

var parentOpener = func() (*carblockstore.ReadWrite, error) {
return carblockstore.OpenReadWrite(outfile, []cid.Cid{rootCid}, carblockstore.WriteAsCarV1(true))
var openedFile *os.File
defer func() {
if openedFile != nil {
openedFile.Close()
}
}()

// TODO: unfortunately errors from here have to propagate through graphsync
// then data-transfer and all the way back up to retrieval; we should
// probably have a way to cancel the retrieval and return an error
// immediately if this fails.
var parentOpener = func() (*carstore.StorageCar, error) {
var err error
// always Create, truncating and making a new store - can't resume here
// because our headers (roots) won't match
// TODO: option to resume existing CAR and ignore mismatching header? it
// could just append blocks and leave the header as it is
openedFile, err = os.Create(outfile)
if err != nil {
return nil, err
}
return carstore.NewReadableWritable(openedFile, []cid.Cid{rootCid}, carv2.WriteAsCarV1(true))
}

var blockCount int
Expand All @@ -115,8 +136,10 @@ func Fetch(c *cli.Context) error {
fmt.Printf("\rReceived %d blocks / %s...", blockCount, humanize.IBytes(byteLength))
}
}
bstore := cmdinternal.NewPutCbBlockstore(parentOpener, putCb)
linkSystem := storeutil.LinkSystemForBlockstore(bstore)
store := cmdinternal.NewPutCbStore(parentOpener, putCb)
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.SetReadStorage(store)
linkSystem.SetWriteStorage(store)

_, stats, err := lassie.Fetch(c.Context, rootCid, linkSystem)
if err != nil {
Expand All @@ -134,7 +157,7 @@ func Fetch(c *cli.Context) error {
humanize.IBytes(stats.Size),
)

return bstore.Finalize()
return store.Finalize()
}

type progressPrinter struct {
Expand Down
108 changes: 0 additions & 108 deletions cmd/lassie/internal/putcbblockstore.go

This file was deleted.

75 changes: 75 additions & 0 deletions cmd/lassie/internal/wrappedstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package internal

import (
"context"
"io"

carstore "github.com/ipld/go-car/v2/storage"
"github.com/ipld/go-ipld-prime/storage"
)

// putCbStore simply calls a callback on each put(), with the number of blocks put
var _ storage.StreamingReadableStorage = (*putCbStore)(nil)
var _ storage.ReadableStorage = (*putCbStore)(nil)
var _ storage.WritableStorage = (*putCbStore)(nil)

type putCbStore struct {
// parentOpener lazily opens the parent Store upon first call to this Store.
// This avoids Store instantiation until there is some interaction from the retriever.
// In the case of CARv2 Stores, this will avoid creation of empty .car files should
// the retriever fail to find any candidates.
parentOpener func() (*carstore.StorageCar, error)
// parent is lazily instantiated and should not be directly used; use parentStore instead.
parent *carstore.StorageCar
cb func(putCount int, putBytes int)
}

func NewPutCbStore(parentOpener func() (*carstore.StorageCar, error), cb func(putCount int, putBytes int)) *putCbStore {
return &putCbStore{parentOpener: parentOpener, cb: cb}
}

func (pcb *putCbStore) Has(ctx context.Context, key string) (bool, error) {
pbs, err := pcb.parentStore()
if err != nil {
return false, err
}
return pbs.Has(ctx, key)
}
func (pcb *putCbStore) Get(ctx context.Context, key string) ([]byte, error) {
pbs, err := pcb.parentStore()
if err != nil {
return nil, err
}
return pbs.Get(ctx, key)
}
func (pcb *putCbStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) {
pbs, err := pcb.parentStore()
if err != nil {
return nil, err
}
return pbs.GetStream(ctx, key)
}
func (pcb *putCbStore) Put(ctx context.Context, key string, data []byte) error {
pbs, err := pcb.parentStore()
if err != nil {
return err
}
pcb.cb(1, len(data))
return pbs.Put(ctx, key, data)
}
func (pcb *putCbStore) Finalize() error {
if pbs, err := pcb.parentStore(); err != nil {
return err
} else {
return pbs.Finalize()
}
}
func (pcb *putCbStore) parentStore() (*carstore.StorageCar, error) {
if pcb.parent == nil {
var err error
if pcb.parent, err = pcb.parentOpener(); err != nil {
return nil, err
}
}
return pcb.parent, nil
}
51 changes: 25 additions & 26 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,30 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/hannahhoward/go-pubsub v1.0.0
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-graphsync v0.14.0
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car/v2 v2.5.1
github.com/ipld/go-car/v2 v2.6.1-0.20230206113223-02849a8ac62f
github.com/ipld/go-ipld-prime v0.19.0
github.com/ipni/storetheindex v0.5.4
github.com/libp2p/go-libp2p v0.23.2
github.com/multiformats/go-multiaddr v0.7.0
github.com/multiformats/go-multicodec v0.6.0
github.com/libp2p/go-libp2p v0.23.4
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multicodec v0.7.0
github.com/multiformats/go-multihash v0.2.1
github.com/prometheus/client_golang v1.14.0
github.com/rvagg/go-prioritywaitqueue v1.0.3
github.com/stretchr/testify v1.8.1
github.com/urfave/cli/v2 v2.23.7
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.10.0
go.opentelemetry.io/otel/trace v1.10.0
go.uber.org/multierr v1.8.0
golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5
go.opentelemetry.io/otel v1.12.0
go.opentelemetry.io/otel/trace v1.12.0
go.uber.org/multierr v1.9.0
golang.org/x/net v0.5.0
)

require (
github.com/Stebalien/go-bitfield v0.0.1 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.1 // indirect
Expand Down Expand Up @@ -80,26 +77,28 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-blockservice v0.4.0 // indirect
github.com/ipfs/go-block-format v0.1.1 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
github.com/ipfs/go-ipfs-blockstore v1.2.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-libipfs v0.4.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.8.1 // indirect
github.com/ipfs/go-merkledag v0.9.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-peertaskqueue v0.8.0 // indirect
github.com/ipfs/go-unixfsnode v1.4.0 // indirect
github.com/ipfs/go-verifcid v0.0.2 // indirect
github.com/ipld/go-codec-dagpb v1.5.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.15.10 // indirect
github.com/klauspost/cpuid/v2 v2.1.1 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand All @@ -117,7 +116,7 @@ require (
github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect
github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.50 // indirect
Expand All @@ -127,12 +126,12 @@ require (
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multistream v0.3.3 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
Expand All @@ -141,7 +140,7 @@ require (
github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
Expand All @@ -152,16 +151,16 @@ require (
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20220514204315-f29c37e9c44c // indirect
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/sync v0.0.0-20220907140024-f12130a52804 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/tools v0.1.12 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
Loading

0 comments on commit cc89830

Please sign in to comment.