Skip to content

Commit

Permalink
plumb through pseudoversions for context changes
Browse files Browse the repository at this point in the history
This is for testing the libp2p release, since the context changes are
not plumbed all the way through.
  • Loading branch information
guseggert authored and marten-seemann committed Nov 16, 2021
1 parent dea5469 commit 4142b46
Show file tree
Hide file tree
Showing 29 changed files with 262 additions and 225 deletions.
10 changes: 5 additions & 5 deletions blocks/blockstoreutil/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

cid "github.com/ipfs/go-cid"
bs "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-pinner"
pin "github.com/ipfs/go-ipfs-pinner"
)

// RemovedBlock is used to represent the result of removing a block.
Expand Down Expand Up @@ -40,15 +40,15 @@ func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids
go func() {
defer close(out)

unlocker := blocks.GCLock()
defer unlocker.Unlock()
unlocker := blocks.GCLock(ctx)
defer unlocker.Unlock(ctx)

stillOkay := FilterPinned(ctx, pins, out, cids)

for _, c := range stillOkay {
// Kept for backwards compatibility. We may want to
// remove this sometime in the future.
has, err := blocks.Has(c)
has, err := blocks.Has(ctx, c)
if err != nil {
out <- &RemovedBlock{Hash: c.String(), Error: err.Error()}
continue
Expand All @@ -58,7 +58,7 @@ func RmBlocks(ctx context.Context, blocks bs.GCBlockstore, pins pin.Pinner, cids
continue
}

err = blocks.DeleteBlock(c)
err = blocks.DeleteBlock(ctx, c)
if err != nil {
out <- &RemovedBlock{Hash: c.String(), Error: err.Error()}
} else if !opts.Quiet {
Expand Down
6 changes: 3 additions & 3 deletions core/commands/dag/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
unlocker := node.Blockstore.PinLock()
defer unlocker.Unlock()
unlocker := node.Blockstore.PinLock(req.Context)
defer unlocker.Unlock(req.Context)

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

Expand Down Expand Up @@ -87,7 +87,7 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment

ret := RootMeta{Cid: c}

if block, err := node.Blockstore.Get(c); err != nil {
if block, err := node.Blockstore.Get(req.Context, c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipld.Decode(block); err != nil {
ret.PinErrorMsg = err.Error()
Expand Down
2 changes: 1 addition & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ var provideRefDhtCmd = &cmds.Command{
return err
}

has, err := nd.Blockstore.Has(c)
has, err := nd.Blockstore.Has(req.Context, c)
if err != nil {
return err
}
Expand Down
13 changes: 7 additions & 6 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package commands

import (
"context"
"fmt"
"io"
"os"

filestore "github.com/ipfs/go-filestore"
cmds "github.com/ipfs/go-ipfs-cmds"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-cmds"
)

var FileStoreCmd = &cmds.Command{
Expand Down Expand Up @@ -56,7 +57,7 @@ The output is:
}
args := req.Arguments
if len(args) > 0 {
return listByArgs(res, fs, args)
return listByArgs(req.Context, res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
Expand Down Expand Up @@ -133,7 +134,7 @@ For ERROR entries the error will also be printed to stderr.
}
args := req.Arguments
if len(args) > 0 {
return listByArgs(res, fs, args)
return listByArgs(req.Context, res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
Expand Down Expand Up @@ -206,7 +207,7 @@ var dupsFileStore = &cmds.Command{
}

for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
have, err := fs.MainBlockstore().Has(req.Context, cid)
if err != nil {
return res.Emit(&RefWrapper{Err: err.Error()})
}
Expand Down Expand Up @@ -235,7 +236,7 @@ func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, e
return n, fs, err
}

func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
func listByArgs(ctx context.Context, res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
Expand All @@ -248,7 +249,7 @@ func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string
}
continue
}
r := filestore.Verify(fs, c)
r := filestore.Verify(ctx, fs, c)
if err := res.Emit(r); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/commands/pin/remotepin.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ NOTE: a comma-separated notation is supported in CLI for convenience:
return err
}

isInBlockstore, err := node.Blockstore.Has(rp.Cid())
isInBlockstore, err := node.Blockstore.Has(req.Context, rp.Cid())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/commands/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func verifyWorkerRun(ctx context.Context, wg *sync.WaitGroup, keys <-chan cid.Ci
defer wg.Done()

for k := range keys {
_, err := bs.Get(k)
_, err := bs.Get(ctx, k)
if err != nil {
select {
case results <- fmt.Sprintf("block %s was corrupt (%s)", k, err):
Expand Down Expand Up @@ -288,7 +288,7 @@ var repoVerifyCmd = &cmds.Command{
}

bs := bstore.NewBlockstore(nd.Repo.Datastore())
bs.HashOnRead(true)
bs.HashOnRead(req.Context, true)

keys, err := bs.AllKeysChan(req.Context)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc
}

if settings.Pin {
defer api.blockstore.PinLock().Unlock()
defer api.blockstore.PinLock(ctx).Unlock(ctx)
}

err = api.blocks.AddBlock(b)
err = api.blocks.AddBlock(ctx, b)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions core/coreapi/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-pinner"
pin "github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
)
Expand All @@ -18,7 +18,7 @@ type dagAPI struct {
type pinningAdder CoreAPI

func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {
defer adder.blockstore.PinLock().Unlock()
defer adder.blockstore.PinLock(ctx).Unlock(ctx)

if err := adder.dag.Add(ctx, nd); err != nil {
return err
Expand All @@ -30,7 +30,7 @@ func (adder *pinningAdder) Add(ctx context.Context, nd ipld.Node) error {
}

func (adder *pinningAdder) AddMany(ctx context.Context, nds []ipld.Node) error {
defer adder.blockstore.PinLock().Unlock()
defer adder.blockstore.PinLock(ctx).Unlock(ctx)

if err := adder.dag.AddMany(ctx, nds); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.D

c := rp.Cid()

has, err := api.blockstore.Has(c)
has, err := api.blockstore.Has(ctx, c)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/coreapi/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"io/ioutil"

cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-pinner"
pin "github.com/ipfs/go-ipfs-pinner"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-merkledag/dagutils"
Expand Down Expand Up @@ -110,7 +110,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj
}

if options.Pin {
defer api.blockstore.PinLock().Unlock()
defer api.blockstore.PinLock(ctx).Unlock(ctx)
}

err = api.dag.Add(ctx, dagnode)
Expand Down
6 changes: 3 additions & 3 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (api *PinAPI) Add(ctx context.Context, p path.Path, opts ...caopts.PinAddOp
return err
}

defer api.blockstore.PinLock().Unlock()
defer api.blockstore.PinLock(ctx).Unlock(ctx)

err = api.pinning.Pin(ctx, dagNode, settings.Recursive)
if err != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func (api *PinAPI) Rm(ctx context.Context, p path.Path, opts ...caopts.PinRmOpti

// Note: after unpin the pin sets are flushed to the blockstore, so we need
// to take a lock to prevent a concurrent garbage collection
defer api.blockstore.PinLock().Unlock()
defer api.blockstore.PinLock(ctx).Unlock(ctx)

if err = api.pinning.Unpin(ctx, rp.Cid(), settings.Recursive); err != nil {
return err
Expand All @@ -114,7 +114,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
return err
}

defer api.blockstore.PinLock().Unlock()
defer api.blockstore.PinLock(ctx).Unlock(ctx)

err = api.pinning.Update(ctx, fp.Cid(), tp.Cid(), settings.Unpin)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
DAGService: dserv,
syncFn: func() error {
ds := api.repo.Datastore()
if err := ds.Sync(bstore.BlockPrefix); err != nil {
if err := ds.Sync(ctx, bstore.BlockPrefix); err != nil {
return err
}
return ds.Sync(filestore.FilestorePrefix)
return ds.Sync(ctx, filestore.FilestorePrefix)
},
}
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
fileAdder.SetMfsRoot(mr)
}

nd, err := fileAdder.AddAllAndPin(files)
nd, err := fileAdder.AddAllAndPin(ctx, files)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func ConditionalGC(ctx context.Context, node *core.IpfsNode, offset uint64) erro
}

func (gc *GC) maybeGC(ctx context.Context, offset uint64) error {
storage, err := gc.Repo.GetStorageUsage()
storage, err := gc.Repo.GetStorageUsage(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/corerepo/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func RepoSize(ctx context.Context, n *core.IpfsNode) (SizeStat, error) {
return SizeStat{}, err
}

usage, err := r.GetStorageUsage()
usage, err := r.GetStorageUsage(ctx)
if err != nil {
return SizeStat{}, err
}
Expand Down
32 changes: 16 additions & 16 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
"github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs-pinner"
"github.com/ipfs/go-ipfs-posinfo"
files "github.com/ipfs/go-ipfs-files"
pin "github.com/ipfs/go-ipfs-pinner"
posinfo "github.com/ipfs/go-ipfs-posinfo"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
dag "github.com/ipfs/go-merkledag"
Expand Down Expand Up @@ -254,17 +254,17 @@ func (adder *Adder) addNode(node ipld.Node, path string) error {
}

// AddAllAndPin adds the given request's files and pin them.
func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
func (adder *Adder) AddAllAndPin(ctx context.Context, file files.Node) (ipld.Node, error) {
if adder.Pin {
adder.unlocker = adder.gcLocker.PinLock()
adder.unlocker = adder.gcLocker.PinLock(ctx)
}
defer func() {
if adder.unlocker != nil {
adder.unlocker.Unlock()
adder.unlocker.Unlock(ctx)
}
}()

if err := adder.addFileNode("", file, true); err != nil {
if err := adder.addFileNode(ctx, "", file, true); err != nil {
return nil, err
}

Expand Down Expand Up @@ -333,10 +333,10 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
return nd, adder.PinRoot(nd)
}

func (adder *Adder) addFileNode(path string, file files.Node, toplevel bool) error {
func (adder *Adder) addFileNode(ctx context.Context, path string, file files.Node, toplevel bool) error {
defer file.Close()

err := adder.maybePauseForGC()
err := adder.maybePauseForGC(ctx)
if err != nil {
return err
}
Expand All @@ -357,7 +357,7 @@ func (adder *Adder) addFileNode(path string, file files.Node, toplevel bool) err

switch f := file.(type) {
case files.Directory:
return adder.addDir(path, f, toplevel)
return adder.addDir(ctx, path, f, toplevel)
case *files.Symlink:
return adder.addSymlink(path, f)
case files.File:
Expand Down Expand Up @@ -405,7 +405,7 @@ func (adder *Adder) addFile(path string, file files.File) error {
return adder.addNode(dagnode, path)
}

func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) error {
func (adder *Adder) addDir(ctx context.Context, path string, dir files.Directory, toplevel bool) error {
log.Infof("adding directory: %s", path)

if !(toplevel && path == "") {
Expand All @@ -426,7 +426,7 @@ func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) erro
it := dir.Entries()
for it.Next() {
fpath := gopath.Join(path, it.Name())
err := adder.addFileNode(fpath, it.Node(), false)
err := adder.addFileNode(ctx, fpath, it.Node(), false)
if err != nil {
return err
}
Expand All @@ -435,8 +435,8 @@ func (adder *Adder) addDir(path string, dir files.Directory, toplevel bool) erro
return it.Err()
}

func (adder *Adder) maybePauseForGC() error {
if adder.unlocker != nil && adder.gcLocker.GCRequested() {
func (adder *Adder) maybePauseForGC(ctx context.Context) error {
if adder.unlocker != nil && adder.gcLocker.GCRequested(ctx) {
rn, err := adder.curRootNode()
if err != nil {
return err
Expand All @@ -447,8 +447,8 @@ func (adder *Adder) maybePauseForGC() error {
return err
}

adder.unlocker.Unlock()
adder.unlocker = adder.gcLocker.PinLock()
adder.unlocker.Unlock(ctx)
adder.unlocker = adder.gcLocker.PinLock(ctx)
}
return nil
}
Expand Down
Loading

0 comments on commit 4142b46

Please sign in to comment.