Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions pkg/bmt/bmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package bmt
import (
"encoding/binary"
"hash"
"sync"

"github.com/ethersphere/bee/v2/pkg/swarm"
)
Expand All @@ -30,25 +31,27 @@ var (
// Sum gives back the tree to the pool and guaranteed to leave
// the tree and itself in a state reusable for hashing a new chunk.
type Hasher struct {
*Conf // configuration
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
size int // bytes written to Hasher since last Reset()
pos int // index of rightmost currently open segment
result chan []byte // result channel
errc chan error // error channel
span []byte // The span of the data subsumed under the chunk
*Conf // configuration
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
size int // bytes written to Hasher since last Reset()
pos int // index of rightmost currently open segment
result chan []byte // result channel
errc chan error // error channel
span []byte // The span of the data subsumed under the chunk
resultOnce *sync.Once // ensures only one result is sent (pointer allows reset)
}

// NewHasher gives back an instance of a Hasher struct
func NewHasher(hasherFact func() hash.Hash) *Hasher {
conf := NewConf(hasherFact, swarm.BmtBranches, 32)

return &Hasher{
Conf: conf,
result: make(chan []byte),
errc: make(chan error, 1),
span: make([]byte, SpanSize),
bmt: newTree(conf.maxSize, conf.depth, conf.hasher),
Conf: conf,
result: make(chan []byte, 1), // buffered to prevent blocking
errc: make(chan error, 1),
span: make([]byte, SpanSize),
bmt: newTree(conf.maxSize, conf.depth, conf.hasher),
resultOnce: &sync.Once{},
}
}

Expand Down Expand Up @@ -142,6 +145,17 @@ func (h *Hasher) Reset() {
h.pos = 0
h.size = 0
copy(h.span, zerospan)
// Drain any remaining values from channels to prepare for reuse
select {
case <-h.result:
default:
}
select {
case <-h.errc:
default:
}
// Reset the sync.Once by creating a new one
h.resultOnce = &sync.Once{}
}

// processSection writes the hash of i-th section into level 1 node of the BMT tree.
Expand Down Expand Up @@ -182,7 +196,13 @@ func (h *Hasher) writeNode(n *node, isLeft bool, s []byte) {
for {
// at the root of the bmt just write the result to the result channel
if n == nil {
h.result <- s
h.resultOnce.Do(func() {
select {
case h.result <- s:
default:
// Channel is full or already has a result, skip
}
})
return
}
// otherwise assign child hash to left or right segment
Expand Down Expand Up @@ -221,7 +241,13 @@ func (h *Hasher) writeFinalNode(level int, n *node, isLeft bool, s []byte) {
// at the root of the bmt just write the result to the result channel
if n == nil {
if s != nil {
h.result <- s
h.resultOnce.Do(func() {
select {
case h.result <- s:
default:
// Channel is full or already has a result, skip
}
})
}
return
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/bmt/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package bmt

import (
"hash"
"sync"
"sync/atomic"
)

Expand Down Expand Up @@ -74,11 +75,12 @@ func NewPool(c *Conf) *Pool {
func (p *Pool) Get() *Hasher {
t := <-p.c
return &Hasher{
Conf: p.Conf,
result: make(chan []byte),
errc: make(chan error, 1),
span: make([]byte, SpanSize),
bmt: t,
Conf: p.Conf,
result: make(chan []byte, 1), // buffered to prevent blocking
errc: make(chan error, 1),
span: make([]byte, SpanSize),
bmt: t,
resultOnce: &sync.Once{},
}
}

Expand Down
Loading