Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockservice & exchange & bitswap: add non variadic NotifyNewBlock #242

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added
🛠 - New non variadic `NotifyNewBlock` function. This changes the `blockservice.Interface`. The new function avoids allocating a slice on each call when called with one block.

* `boxo/bitswap/server`:
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
Expand Down
7 changes: 7 additions & 0 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
return bs
}

func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlock(ctx, blk),
bs.Server.NotifyNewBlock(ctx, blk),
)
}

func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlocks(ctx, blks...),
Expand Down
10 changes: 10 additions & 0 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return session.GetBlocks(ctx, keys)
}

// NotifyNewBlock announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
Expand Down
5 changes: 3 additions & 2 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,9 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) {
}
}

// NotifyNewBlocks is called when new blocks becomes available locally, and in particular when the caller of bitswap
// decide to store those blocks and make them available on the network.
// NotifyNewBlocks is called when new blocks become available locally, and in
// particular when the caller of bitswap decides to store those blocks and make
// them available on the network.
func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
if len(blks) == 0 {
return
Expand Down
11 changes: 11 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,17 @@ func (bs *Server) Stat() (Stat, error) {
return s, nil
}

// NotifyNewBlock announces the existence of block to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
Expand Down
11 changes: 4 additions & 7 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
if err := s.exchange.NotifyNewBlock(ctx, o); err != nil {
logger.Errorf("NotifyNewBlock: %s", err.Error())

Check warning on line 180 in blockservice/blockservice.go

View check run for this annotation

Codecov / codecov/patch

blockservice/blockservice.go#L180

Added line #L180 was not covered by tests
}
}

Expand Down Expand Up @@ -282,7 +282,7 @@
return nil, err
}
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
err = ex.NotifyNewBlock(ctx, blk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -364,7 +364,6 @@
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
select {
Expand All @@ -386,13 +385,11 @@

if ex != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
err = ex.NotifyNewBlock(ctx, b)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}
cache[0] = nil // early gc
}

select {
Expand Down
9 changes: 9 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ type notifyCountingExchange struct {
notifyCount int
}

func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error {
n.notifyCount++
return n.Interface.NotifyNewBlock(ctx, blocks)
}

func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
n.notifyCount += len(blocks)
return n.Interface.NotifyNewBlocks(ctx, blocks...)
Expand Down Expand Up @@ -312,6 +317,10 @@ func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fe
return f.ses
}

func (*fakeIsNewSessionCreateExchange) NotifyNewBlock(context.Context, blocks.Block) error {
return nil
}

func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
type Interface interface { // type Exchanger interface
Fetcher

// NotifyNewBlock tells the exchange that a new block is available and can be served.
NotifyNewBlock(ctx context.Context, blocks blocks.Block) error
Comment on lines +16 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea how widespread the dependency on this interface (and therefore the result of the breakage) is and the benefits of implementing this? For example, does this show up at all in profiles as a meaningful thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think everyone relies either on the offline exchange (in order to wrap a blockstore) or bitswap. I don't belive many implementations exists, even in outside codebases.

Even if someone else implement this, just forwarding the call to NotifyNewBlocks with a slice of 1, this is legal and easy to write.
This just aims to get over golang's lacking interprocedural virtual optimisations, it does not change any feature.

Most of our other APIs have both single and multiple versions for this reason (Put vs PutMany, GetBlock vs GetBlocks, ...).

// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error

Expand Down
6 changes: 6 additions & 0 deletions exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block
return blk, err
}

// NotifyNewBlock tells the exchange that a new block is available and can be served.
func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error {
// as an offline exchange we have nothing to do
return nil
}

// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// as an offline exchange we have nothing to do
Expand Down