Skip to content

Commit 3c23379

Browse files
authored
txnsync: bloom filter refactoring (#2834)
Refactor the bloom filter implementation to have two "implementations" : testableBloomFilter and bloomFilter. testableBloomFilter is used exclusively when being received from the network; it does not contain any encoded fields, nor does it include transactionsRange which is used for generating new bloom filters. bloomFilter on the flip side is used only for outgoing bloom filters. It doesn't support testing, and it contains only the encoded bloom filter ( and not the underlying bloom filter needed for testing, for instance ). This separation allows us to ensure we don't store bloom filter object beyond the required scope, improving the memory utilization.
1 parent c8d6194 commit 3c23379

File tree

11 files changed

+177
-187
lines changed

11 files changed

+177
-187
lines changed

txnsync/bloomFilter.go

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ import (
3131
const bloomFilterFalsePositiveRate = 0.01
3232

3333
var errInvalidBloomFilterEncoding = errors.New("invalid bloom filter encoding")
34+
var errEncodingBloomFilterFailed = errors.New("encoding bloom filter failed")
3435

3536
//msgp:ignore bloomFilterType
3637
type bloomFilterType byte
3738

3839
const (
39-
invalidBloomFilter bloomFilterType = iota
40+
invalidBloomFilter bloomFilterType = iota //nolint:deadcode,varcheck
4041
multiHashBloomFilter
4142
xorBloomFilter32
4243
xorBloomFilter8
@@ -53,18 +54,25 @@ type transactionsRange struct {
5354
}
5455

5556
type bloomFilter struct {
56-
encodingParams requestParams
57+
containedTxnsRange transactionsRange
5758

58-
filter bloom.GenericFilter
59+
encoded encodedBloomFilter
5960

60-
containedTxnsRange transactionsRange
61+
encodedLength int
62+
}
6163

62-
encoded *encodedBloomFilter
64+
// testableBloomFilter is used for a bloom filters that were received from the network, decoded
65+
// and are ready to be tested against.
66+
type testableBloomFilter struct {
67+
encodingParams requestParams
6368

64-
filterType bloomFilterType
69+
filter bloom.GenericFilter
6570
}
6671

67-
func decodeBloomFilter(enc encodedBloomFilter) (outFilter bloomFilter, err error) {
72+
func decodeBloomFilter(enc encodedBloomFilter) (outFilter *testableBloomFilter, err error) {
73+
outFilter = &testableBloomFilter{
74+
encodingParams: enc.EncodingParams,
75+
}
6876
switch bloomFilterType(enc.BloomFilterType) {
6977
case multiHashBloomFilter:
7078
outFilter.filter, err = bloom.UnmarshalBinary(enc.BloomFilter)
@@ -75,52 +83,40 @@ func decodeBloomFilter(enc encodedBloomFilter) (outFilter bloomFilter, err error
7583
outFilter.filter = new(bloom.XorFilter8)
7684
err = outFilter.filter.UnmarshalBinary(enc.BloomFilter)
7785
default:
78-
return bloomFilter{}, errInvalidBloomFilterEncoding
86+
return nil, errInvalidBloomFilterEncoding
7987
}
8088

8189
if err != nil {
82-
return bloomFilter{}, err
90+
return nil, err
8391
}
84-
outFilter.filterType = bloomFilterType(enc.BloomFilterType)
8592
outFilter.encodingParams = enc.EncodingParams
86-
return outFilter, nil
93+
return
8794
}
8895

89-
func (bf *bloomFilter) encode() (out *encodedBloomFilter, err error) {
90-
if bf.encoded != nil {
91-
return bf.encoded, nil
92-
}
93-
out = new(encodedBloomFilter)
94-
out.BloomFilterType = byte(invalidBloomFilter)
95-
out.EncodingParams = bf.encodingParams
96-
if bf.filter != nil {
97-
out.BloomFilterType = byte(bf.filterType)
98-
out.BloomFilter, err = bf.filter.MarshalBinary()
99-
if err != nil || len(out.BloomFilter) == 0 {
100-
out = nil
101-
} else {
102-
bf.encoded = out
103-
// increase the counter for a successful bloom filter encoding
104-
txsyncEncodedBloomFiltersTotal.Inc(nil)
105-
}
96+
func (bf *bloomFilter) encode(filter bloom.GenericFilter, filterType bloomFilterType) (err error) {
97+
bf.encoded.BloomFilterType = byte(filterType)
98+
bf.encoded.BloomFilter, err = filter.MarshalBinary()
99+
bf.encodedLength = len(bf.encoded.BloomFilter)
100+
if err != nil || bf.encodedLength == 0 {
101+
return errEncodingBloomFilterFailed
106102
}
103+
// increase the counter for a successful bloom filter encoding
104+
txsyncEncodedBloomFiltersTotal.Inc(nil)
107105
return
108106
}
109107

110108
func (bf *bloomFilter) sameParams(other bloomFilter) bool {
111-
return (bf.encodingParams == other.encodingParams) && (bf.containedTxnsRange == other.containedTxnsRange)
109+
return (bf.encoded.EncodingParams == other.encoded.EncodingParams) &&
110+
(bf.containedTxnsRange == other.containedTxnsRange)
112111
}
113112

114-
func (bf *bloomFilter) test(txID transactions.Txid) bool {
115-
if bf.filter != nil {
116-
if bf.encodingParams.Modulator > 1 {
117-
if txidToUint64(txID)%uint64(bf.encodingParams.Modulator) != uint64(bf.encodingParams.Offset) {
118-
return false
119-
}
113+
func (bf *testableBloomFilter) test(txID transactions.Txid) bool {
114+
if bf.encodingParams.Modulator > 1 {
115+
if txidToUint64(txID)%uint64(bf.encodingParams.Modulator) != uint64(bf.encodingParams.Offset) {
116+
return false
120117
}
121-
return bf.filter.Test(txID[:])
122118
}
123-
return false
119+
return bf.filter.Test(txID[:])
124120
}
125121

126122
func filterFactoryBloom(numEntries int, s *syncState) (filter bloom.GenericFilter, filterType bloomFilterType) {
@@ -142,7 +138,7 @@ func filterFactoryXor32(numEntries int, s *syncState) (filter bloom.GenericFilte
142138
var filterFactory func(int, *syncState) (filter bloom.GenericFilter, filterType bloomFilterType) = filterFactoryXor32
143139

144140
func (s *syncState) makeBloomFilter(encodingParams requestParams, txnGroups []pooldata.SignedTxGroup, hintPrevBloomFilter *bloomFilter) (result bloomFilter) {
145-
result.encodingParams = encodingParams
141+
result.encoded.EncodingParams = encodingParams
146142
switch {
147143
case encodingParams.Modulator == 0:
148144
// we want none.
@@ -153,6 +149,8 @@ func (s *syncState) makeBloomFilter(encodingParams requestParams, txnGroups []po
153149
result.containedTxnsRange.firstCounter = txnGroups[0].GroupCounter
154150
result.containedTxnsRange.lastCounter = txnGroups[len(txnGroups)-1].GroupCounter
155151
result.containedTxnsRange.transactionsCount = uint64(len(txnGroups))
152+
} else {
153+
return
156154
}
157155

158156
if hintPrevBloomFilter != nil {
@@ -161,17 +159,19 @@ func (s *syncState) makeBloomFilter(encodingParams requestParams, txnGroups []po
161159
}
162160
}
163161

164-
result.filter, result.filterType = filterFactory(len(txnGroups), s)
162+
filter, filterType := filterFactory(len(txnGroups), s)
165163
for _, group := range txnGroups {
166-
result.filter.Set(group.GroupTransactionID[:])
164+
filter.Set(group.GroupTransactionID[:])
167165
}
168-
_, err := result.encode()
166+
err := result.encode(filter, filterType)
169167
if err != nil {
170168
// fall back to standard bloom filter
171-
result.filter, result.filterType = filterFactoryBloom(len(txnGroups), s)
169+
filter, filterType = filterFactoryBloom(len(txnGroups), s)
172170
for _, group := range txnGroups {
173-
result.filter.Set(group.GroupTransactionID[:])
171+
filter.Set(group.GroupTransactionID[:])
174172
}
173+
result.encode(filter, filterType) //nolint:errcheck
174+
// the error in the above case can be silently ignored.
175175
}
176176
default:
177177
// we want subset.
@@ -199,22 +199,28 @@ func (s *syncState) makeBloomFilter(encodingParams requestParams, txnGroups []po
199199
}
200200
}
201201

202-
result.filter, result.filterType = filterFactory(len(filteredTransactionsIDs), s)
202+
if len(filteredTransactionsIDs) == 0 {
203+
return
204+
}
205+
206+
filter, filterType := filterFactory(len(filteredTransactionsIDs), s)
203207

204208
for _, txid := range filteredTransactionsIDs {
205-
result.filter.Set(txid[:])
209+
filter.Set(txid[:])
206210
}
207-
_, err := result.encode()
211+
err := result.encode(filter, filterType)
208212
if err != nil {
209213
// fall back to standard bloom filter
210-
result.filter, result.filterType = filterFactoryBloom(len(txnGroups), s)
214+
filter, filterType = filterFactoryBloom(len(filteredTransactionsIDs), s)
211215
for _, txid := range filteredTransactionsIDs {
212-
result.filter.Set(txid[:])
216+
filter.Set(txid[:])
213217
}
218+
result.encode(filter, filterType) //nolint:errcheck
219+
// the error in the above case can be silently ignored.
214220
}
215221
}
216222

217-
return
223+
return result
218224
}
219225

220226
func txidToUint64(txID transactions.Txid) uint64 {

0 commit comments

Comments
 (0)