Skip to content

Commit c9d91e6

Browse files
hannahhowarddirkmc
andauthored
Add block filter via BadBits (#825)
* feat(booster-bitswap): add block filter via BadBits * refactor(booster-bitswap): use bitswap blockfilter for filtering * feat(blockfilter): only update when list is modified * feat(blockFilter): add on disk caching * Update cmd/booster-bitswap/blockfilter/blockfilter.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * fix(blockfilter): minor PR fixups Co-authored-by: dirkmc <dirkmdev@gmail.com>
1 parent a58ea5a commit c9d91e6

File tree

4 files changed

+391
-9
lines changed

4 files changed

+391
-9
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package blockfilter
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"encoding/json"
8+
"fmt"
9+
"io"
10+
"net/http"
11+
"os"
12+
"path/filepath"
13+
"sync"
14+
"time"
15+
16+
"github.com/benbjohnson/clock"
17+
"github.com/ipfs/go-cid"
18+
logging "github.com/ipfs/go-log/v2"
19+
"github.com/multiformats/go-multibase"
20+
)
21+
22+
var log = logging.Logger("booster-bitswap")
23+
24+
// BadBitsDenyList is the URL for well known bad bits list
25+
const BadBitsDenyList string = "https://badbits.dwebops.pub/denylist.json"
26+
27+
// UpdateInterval is the default interval at which the public list is refected and updated
28+
const UpdateInterval = 5 * time.Minute
29+
30+
// DenyListFetcher is a function that fetches a deny list in the json style of the BadBits list
31+
// The first return value indicates whether any update has occurred since the last fetch time
32+
// The second return is a stream of data if an update has occurred
33+
// The third is any error
34+
type DenyListFetcher func(lastFetchTime time.Time) (bool, io.ReadCloser, error)
35+
36+
const expectedListGrowth = 128
37+
38+
// FetchBadBitsList is the default function used to get the BadBits list
39+
func FetchBadBitsList(ifModifiedSince time.Time) (bool, io.ReadCloser, error) {
40+
req, err := http.NewRequest("GET", BadBitsDenyList, nil)
41+
if err != nil {
42+
return false, nil, err
43+
}
44+
// set the modification sync header, assuming we are not given time zero
45+
if !ifModifiedSince.IsZero() {
46+
req.Header.Set("If-Modified-Since", ifModifiedSince.Format(http.TimeFormat))
47+
}
48+
response, err := http.DefaultClient.Do(req)
49+
if err != nil {
50+
return false, nil, err
51+
}
52+
if response.StatusCode == http.StatusNotModified {
53+
return false, nil, nil
54+
}
55+
if response.StatusCode < 200 && response.StatusCode > 299 {
56+
bodyText, _ := io.ReadAll(response.Body)
57+
return false, nil, fmt.Errorf("expected HTTP success code, got: %s, response body: %s", http.StatusText(response.StatusCode), string(bodyText))
58+
}
59+
return true, response.Body, nil
60+
}
61+
62+
// BlockFilter manages updating a deny list and checking for CID inclusion in that list
63+
type BlockFilter struct {
64+
cacheFile string
65+
lastUpdated time.Time
66+
denyListFetcher DenyListFetcher
67+
filteredHashesLk sync.RWMutex
68+
filteredHashes map[string]struct{}
69+
ctx context.Context
70+
cancel context.CancelFunc
71+
clock clock.Clock
72+
onTimerSet func()
73+
}
74+
75+
func newBlockFilter(cfgDir string, denyListFetcher DenyListFetcher, clock clock.Clock, onTimerSet func()) *BlockFilter {
76+
return &BlockFilter{
77+
cacheFile: filepath.Join(cfgDir, "denylist.json"),
78+
denyListFetcher: denyListFetcher,
79+
filteredHashes: make(map[string]struct{}),
80+
clock: clock,
81+
onTimerSet: onTimerSet,
82+
}
83+
}
84+
85+
// NewBlockFilter returns a block filter
86+
func NewBlockFilter(cfgDir string) *BlockFilter {
87+
return newBlockFilter(cfgDir, FetchBadBitsList, clock.New(), nil)
88+
}
89+
90+
// Start initializes asynchronous updates to the deny list filter
91+
// It blocks to confirm at least one synchronous update of the denylist
92+
func (bf *BlockFilter) Start(ctx context.Context) error {
93+
bf.ctx, bf.cancel = context.WithCancel(ctx)
94+
// open the cache file if it eixsts
95+
cache, err := os.Open(bf.cacheFile)
96+
var cachedCopy bool
97+
// if the file does not exist, synchronously fetch the list
98+
if err != nil {
99+
if !os.IsNotExist(err) {
100+
return fmt.Errorf("fetching badbits list: %w", err)
101+
}
102+
bf.updateDenyList()
103+
} else {
104+
defer cache.Close()
105+
// otherwise, read the file and fetch the list asynchronously
106+
cachedCopy = true
107+
bf.filteredHashes, err = bf.parseDenyList(cache, len(bf.filteredHashes)+expectedListGrowth)
108+
if err != nil {
109+
return err
110+
}
111+
}
112+
go bf.run(cachedCopy)
113+
return nil
114+
}
115+
116+
// Close shuts down asynchronous updating
117+
func (bf *BlockFilter) Close() {
118+
bf.cancel()
119+
}
120+
121+
// IsFiltered checks if a given CID is in the deny list, per the rules
122+
// of hashing cids (convert to base32, add "/" to path, then sha256 hash)
123+
func (bf *BlockFilter) IsFiltered(c cid.Cid) (bool, error) {
124+
// convert CIDv0 to CIDv1
125+
if c.Version() == 0 {
126+
c = cid.NewCidV1(cid.DagProtobuf, c.Hash())
127+
}
128+
// get base32 string
129+
cidStr, err := c.StringOfBase(multibase.Base32)
130+
if err != nil {
131+
return false, err
132+
}
133+
// add "/"
134+
cidStr += "/"
135+
// sha256 sum the bytes
136+
shaBytes := sha256.Sum256([]byte(cidStr))
137+
// encode to a hex string
138+
shaString := hex.EncodeToString(shaBytes[:])
139+
140+
// check for set inclusion
141+
bf.filteredHashesLk.RLock()
142+
_, has := bf.filteredHashes[shaString]
143+
bf.filteredHashesLk.RUnlock()
144+
return has, nil
145+
}
146+
147+
// fetch deny list fetches and parses a deny list to get a new set of filtered hashes
148+
// it uses streaming JSON decoding to avoid an intermediate copy of the entire response
149+
// lenSuggestion is used to avoid a large number of allocations as the list grows
150+
func (bf *BlockFilter) parseDenyList(denyListStream io.Reader, lenSuggestion int) (map[string]struct{}, error) {
151+
// first fetch the reading for the deny list
152+
type blockedCid struct {
153+
Anchor string `json:"anchor"`
154+
}
155+
// initialize a json decoder
156+
jsonDenyList := json.NewDecoder(denyListStream)
157+
158+
// read open bracket
159+
_, err := jsonDenyList.Token()
160+
if err != nil {
161+
return nil, fmt.Errorf("parsing denylist: %w", err)
162+
}
163+
164+
filteredHashes := make(map[string]struct{}, lenSuggestion)
165+
// while the array contains values
166+
for jsonDenyList.More() {
167+
var b blockedCid
168+
// decode an array value (Message)
169+
err = jsonDenyList.Decode(&b)
170+
if err != nil {
171+
return nil, fmt.Errorf("parsing denylist: %w", err)
172+
}
173+
// save it in the filtered hash set
174+
filteredHashes[b.Anchor] = struct{}{}
175+
}
176+
177+
// read closing bracket
178+
_, err = jsonDenyList.Token()
179+
if err != nil {
180+
return nil, fmt.Errorf("parsing denylist: %w", err)
181+
}
182+
183+
return filteredHashes, nil
184+
}
185+
186+
// updateDenyList replaces the current filtered hashes after successfully
187+
// fetching and parsing the latest deny list
188+
func (bf *BlockFilter) updateDenyList() {
189+
fetchTime := time.Now()
190+
updated, denyListStream, err := bf.denyListFetcher(bf.lastUpdated)
191+
if err != nil {
192+
log.Errorf("fetching deny list: %s", err)
193+
return
194+
}
195+
if !updated {
196+
return
197+
}
198+
defer denyListStream.Close()
199+
// open the cache file
200+
cache, err := os.OpenFile(bf.cacheFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
201+
if err != nil {
202+
log.Errorf("opening cache file: %s", err)
203+
}
204+
defer cache.Close()
205+
forkedStream := io.TeeReader(denyListStream, cache)
206+
bf.lastUpdated = fetchTime
207+
filteredHashes, err := bf.parseDenyList(forkedStream, len(bf.filteredHashes)+expectedListGrowth)
208+
if err != nil {
209+
log.Errorf("parsing deny list: %s", err)
210+
return
211+
}
212+
bf.filteredHashesLk.Lock()
213+
bf.filteredHashes = filteredHashes
214+
bf.filteredHashesLk.Unlock()
215+
}
216+
217+
// run periodically updates the deny list asynchronously
218+
func (bf *BlockFilter) run(cachedCopy bool) {
219+
// if there was a cached copy, immediately asynchornously fetch an update
220+
if cachedCopy {
221+
bf.updateDenyList()
222+
}
223+
updater := bf.clock.Ticker(UpdateInterval)
224+
// call the callback if set
225+
if bf.onTimerSet != nil {
226+
bf.onTimerSet()
227+
}
228+
for {
229+
select {
230+
case <-bf.ctx.Done():
231+
return
232+
case <-updater.C:
233+
// when timer expires, update deny list
234+
bf.updateDenyList()
235+
// call the callback if set
236+
if bf.onTimerSet != nil {
237+
bf.onTimerSet()
238+
}
239+
}
240+
}
241+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package blockfilter
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io"
7+
"io/ioutil"
8+
"os"
9+
"path/filepath"
10+
"strings"
11+
"testing"
12+
"time"
13+
14+
"github.com/benbjohnson/clock"
15+
"github.com/ipfs/go-cid"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
func TestBlockFilter(t *testing.T) {
20+
blockedCid1, err := cid.Parse("QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u")
21+
require.NoError(t, err)
22+
blockedCid2, err := cid.Parse("QmTn7prGSqKUd7cqvAjnULrH7zxBEBWrnj9kE7kZSGtDuQ")
23+
require.NoError(t, err)
24+
timerSetChan := make(chan struct{}, 1)
25+
onTimerSet := func() {
26+
timerSetChan <- struct{}{}
27+
}
28+
ff := &fakeFetcher{}
29+
clock := clock.NewMock()
30+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
31+
defer cancel()
32+
cfgDir, err := os.MkdirTemp("", "blockFilter")
33+
require.NoError(t, err)
34+
bf := newBlockFilter(cfgDir, ff.fetchDenyList, clock, onTimerSet)
35+
err = bf.Start(ctx)
36+
require.NoError(t, err)
37+
cache, err := os.ReadFile(filepath.Join(cfgDir, "denylist.json"))
38+
require.NoError(t, err)
39+
require.Equal(t, `[
40+
{ "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"}
41+
]
42+
`, string(cache))
43+
isFiltered, err := bf.IsFiltered(blockedCid1)
44+
require.NoError(t, err)
45+
require.True(t, isFiltered)
46+
isFiltered, err = bf.IsFiltered(blockedCid2)
47+
require.NoError(t, err)
48+
require.False(t, isFiltered)
49+
select {
50+
case <-ctx.Done():
51+
t.Fatal("should have updated list but didn't")
52+
case <-timerSetChan:
53+
}
54+
clock.Add(UpdateInterval)
55+
select {
56+
case <-ctx.Done():
57+
t.Fatal("should have updated list but didn't")
58+
case <-timerSetChan:
59+
}
60+
isFiltered, err = bf.IsFiltered(blockedCid1)
61+
require.NoError(t, err)
62+
require.True(t, isFiltered)
63+
isFiltered, err = bf.IsFiltered(blockedCid2)
64+
require.NoError(t, err)
65+
require.False(t, isFiltered)
66+
clock.Add(UpdateInterval)
67+
select {
68+
case <-ctx.Done():
69+
t.Fatal("should have updated list but didn't")
70+
case <-timerSetChan:
71+
}
72+
isFiltered, err = bf.IsFiltered(blockedCid1)
73+
require.NoError(t, err)
74+
require.True(t, isFiltered)
75+
isFiltered, err = bf.IsFiltered(blockedCid2)
76+
require.NoError(t, err)
77+
require.True(t, isFiltered)
78+
cache, err = os.ReadFile(filepath.Join(cfgDir, "denylist.json"))
79+
require.NoError(t, err)
80+
require.Equal(t, `[
81+
{ "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"},
82+
{ "anchor": "6a98dfc49e852da7eee32d7df49801cb3ae7a432aa73200cd652ba149272481a"}
83+
]
84+
`, string(cache))
85+
86+
// now restart a new instance, with a fetcher that always errors,
87+
// and verify disk cache works
88+
bf.Close()
89+
bf = newBlockFilter(cfgDir, func(time.Time) (bool, io.ReadCloser, error) {
90+
return false, nil, errors.New("something went wrong")
91+
}, clock, onTimerSet)
92+
err = bf.Start(ctx)
93+
require.NoError(t, err)
94+
isFiltered, err = bf.IsFiltered(blockedCid1)
95+
require.NoError(t, err)
96+
require.True(t, isFiltered)
97+
isFiltered, err = bf.IsFiltered(blockedCid2)
98+
require.NoError(t, err)
99+
require.True(t, isFiltered)
100+
}
101+
102+
type fakeFetcher struct {
103+
fetchCount int
104+
}
105+
106+
func (ff *fakeFetcher) fetchDenyList(fetchTime time.Time) (bool, io.ReadCloser, error) {
107+
denyList := `[
108+
{ "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"}
109+
]
110+
`
111+
updated := true
112+
if ff.fetchCount == 1 {
113+
updated = false
114+
}
115+
if ff.fetchCount > 1 {
116+
denyList = `[
117+
{ "anchor": "09770fe7ec3124653c1d8f6917e3cd72cbd58a3e24a734bc362f656844c4ee7d"},
118+
{ "anchor": "6a98dfc49e852da7eee32d7df49801cb3ae7a432aa73200cd652ba149272481a"}
119+
]
120+
`
121+
}
122+
ff.fetchCount++
123+
return updated, ioutil.NopCloser(strings.NewReader(denyList)), nil
124+
}

cmd/booster-bitswap/run.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/filecoin-project/boost/api"
1111
bclient "github.com/filecoin-project/boost/api/client"
1212
cliutil "github.com/filecoin-project/boost/cli/util"
13+
"github.com/filecoin-project/boost/cmd/booster-bitswap/blockfilter"
1314
"github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore"
1415
"github.com/filecoin-project/boost/tracing"
1516
"github.com/filecoin-project/go-jsonrpc"
@@ -89,7 +90,13 @@ var runCmd = &cli.Command{
8990
return fmt.Errorf("setting up libp2p host: %w", err)
9091
}
9192
// Start the server
92-
server := NewBitswapServer(remoteStore, host)
93+
94+
blockFilter := blockfilter.NewBlockFilter(repoDir)
95+
err = blockFilter.Start(ctx)
96+
if err != nil {
97+
return fmt.Errorf("starting block filter: %w", err)
98+
}
99+
server := NewBitswapServer(remoteStore, host, blockFilter)
93100

94101
addrs, err := bapi.NetAddrsListen(ctx)
95102
if err != nil {

0 commit comments

Comments
 (0)