Skip to content
8 changes: 7 additions & 1 deletion cmd/boostd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,13 @@ func migrateMarketsConfig(cctx *cli.Context, mktsRepo lotus_repo.LockedRepo, boo
// Clear the DAG store root dir config, because the DAG store is no longer configurable in Boost
// (it is always at <repo path>/dagstore
rcfg.DAGStore.RootDir = ""
rcfg.IndexProvider = mktsCfg.IndexProvider
rcfg.IndexProvider = config.IndexProviderConfig{
Enable: mktsCfg.IndexProvider.Enable,
EntriesCacheCapacity: mktsCfg.IndexProvider.EntriesCacheCapacity,
EntriesChunkSize: mktsCfg.IndexProvider.EntriesChunkSize,
TopicName: mktsCfg.IndexProvider.TopicName,
PurgeCacheOnStart: mktsCfg.IndexProvider.PurgeCacheOnStart,
}
rcfg.IndexProvider.Enable = true // Enable index provider in Boost by default

if fromMonolith {
Expand Down
8 changes: 7 additions & 1 deletion node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,20 @@ func DefaultBoost() *Boost {
MaxConcurrencyStorageCalls: 100,
GCInterval: lotus_config.Duration(1 * time.Minute),
},
IndexProvider: lotus_config.IndexProviderConfig{
IndexProvider: IndexProviderConfig{
Enable: true,
EntriesCacheCapacity: 1024,
EntriesChunkSize: 16384,
// The default empty TopicName means it is inferred from network name, in the following
// format: "/indexer/ingest/<network-name>"
TopicName: "",
PurgeCacheOnStart: false,

HttpPublisher: IndexProviderHttpPublisherConfig{
Enabled: false,
PublicHostname: "",
Port: 3104,
},
},
}
return cfg
Expand Down
76 changes: 75 additions & 1 deletion node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 45 additions & 1 deletion node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Boost struct {
LotusDealmaking lotus_config.DealmakingConfig
LotusFees FeeConfig
DAGStore lotus_config.DAGStoreConfig
IndexProvider lotus_config.IndexProviderConfig
IndexProvider IndexProviderConfig
}

func (b *Boost) GetDealmakingConfig() lotus_config.DealmakingConfig {
Expand Down Expand Up @@ -279,6 +279,50 @@ type ContractDealsConfig struct {
From string
}

type IndexProviderConfig struct {
// Enable set whether to enable indexing announcement to the network and expose endpoints that
// allow indexer nodes to process announcements. Enabled by default.
Enable bool

// EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement
// entries. Defaults to 1024 if not specified. The cache is evicted using LRU policy. The
// maximum storage used by the cache is a factor of EntriesCacheCapacity, EntriesChunkSize and
// the length of multihashes being advertised. For example, advertising 128-bit long multihashes
// with the default EntriesCacheCapacity, and EntriesChunkSize means the cache size can grow to
// 256MiB when full.
EntriesCacheCapacity int

// EntriesChunkSize sets the maximum number of multihashes to include in a single entries chunk.
// Defaults to 16384 if not specified. Note that chunks are chained together for indexing
// advertisements that include more multihashes than the configured EntriesChunkSize.
EntriesChunkSize int

// TopicName sets the topic name on which the changes to the advertised content are announced.
// If not explicitly specified, the topic name is automatically inferred from the network name
// in following format: '/indexer/ingest/<network-name>'
// Defaults to empty, which implies the topic name is inferred from network name.
TopicName string

// PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine
// starts. By default, the cache is rehydrated from previously cached entries stored in
// datastore if any is present.
PurgeCacheOnStart bool

HttpPublisher IndexProviderHttpPublisherConfig
}

type IndexProviderHttpPublisherConfig struct {
// If not enabled, requests are served over graphsync instead.
Enabled bool
// Set the public hostname / IP for the index provider listener.
// eg "82.129.73.111"
// This is usually the same as the for the boost node.
PublicHostname string
// Set the port on which to listen for index provider requests over HTTP.
// Note that this port must be open on the firewall.
Port int
}

type FeeConfig struct {
// The maximum fee to pay when sending the PublishStorageDeals message
MaxPublishDealsFee types.FIL
Expand Down
62 changes: 40 additions & 22 deletions node/modules/storageminer_idxprov.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"fmt"
"github.com/filecoin-project/boost/build"
"github.com/filecoin-project/boost/indexprovider"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/node/modules/dtypes"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/filecoin-project/boost/util"
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/transport/graphsync"
datatransferv2 "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/lotus/node/config"
lotus_dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -77,30 +78,47 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo
"pid", marketHost.ID(),
"topic", topicName,
"retAddrs", marketHost.Addrs())
// If announcements to the network are enabled, then set options for datatransfer publisher.

// If announcements to the network are enabled, then set options for the publisher.
var e *engine.Engine
if cfg.Enable {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default.
t, err := ps.Join(topicName)
if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", topicName, err)
}
// Advertisements can be served over the data transfer protocol
// (on graphsync) or over HTTP
if cfg.HttpPublisher.Enabled {
announceAddr, err := util.ToHttpMultiaddr(cfg.HttpPublisher.PublicHostname, cfg.HttpPublisher.Port)
if err != nil {
return nil, fmt.Errorf("parsing HTTP Publisher hostname '%s' / port %d: %w",
cfg.HttpPublisher.PublicHostname, cfg.HttpPublisher.Port, err)
}
opts = append(opts,
engine.WithPublisherKind(engine.HttpPublisher),
engine.WithHttpPublisherListenAddr(fmt.Sprintf("0.0.0.0:%d", cfg.HttpPublisher.Port)),
engine.WithHttpPublisherAnnounceAddr(announceAddr.String()),
)
llog = llog.With("publisher", "http")
} else {
// Join the indexer topic using the market's pubsub instance. Otherwise, the provider
// engine would create its own instance of pubsub down the line in go-legs, which has
// no validators by default.
t, err := ps.Join(topicName)
if err != nil {
llog.Errorw("Failed to join indexer topic", "err", err)
return nil, xerrors.Errorf("joining indexer topic %s: %w", topicName, err)
}

// Get the miner ID and set as extra gossip data.
// The extra data is required by the lotus-specific index-provider gossip message validators.
ma := address.Address(maddr)
opts = append(opts,
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDataTransfer(dtV1ToIndexerDT(dt, func() ipld.LinkSystem {
return *e.LinkSystem()
})),
engine.WithExtraGossipData(ma.Bytes()),
engine.WithTopic(t),
)
llog = llog.With("extraGossipData", ma, "publisher", "data-transfer")
// Get the miner ID and set as extra gossip data.
// The extra data is required by the lotus-specific index-provider gossip message validators.
ma := address.Address(maddr)
opts = append(opts,
engine.WithTopic(t),
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDataTransfer(dtV1ToIndexerDT(dt, func() ipld.LinkSystem {
return *e.LinkSystem()
})),
engine.WithExtraGossipData(ma.Bytes()),
)
llog = llog.With("extraGossipData", ma, "publisher", "data-transfer")
}
} else {
opts = append(opts, engine.WithPublisherKind(engine.NoPublisher))
llog = llog.With("publisher", "none")
Expand Down
22 changes: 22 additions & 0 deletions util/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package util

import (
"fmt"
ma "github.com/multiformats/go-multiaddr"
"net"
"strings"
)

func ToHttpMultiaddr(hostname string, port int) (ma.Multiaddr, error) {
var saddr string
if n := net.ParseIP(hostname); n != nil {
ipVersion := "ip4"
if strings.Contains(hostname, ":") {
ipVersion = "ip6"
}
saddr = fmt.Sprintf("/%s/%s/tcp/%d/http", ipVersion, hostname, port)
} else {
saddr = fmt.Sprintf("/dns/%s/tcp/%d/http", hostname, port)
}
return ma.NewMultiaddr(saddr)
}
34 changes: 34 additions & 0 deletions util/addr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package util

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestToHttpMultiaddr(t *testing.T) {
tcs := []struct {
hostname string
port int
expected string
}{{
hostname: "192.168.1.1",
port: 1234,
expected: "/ip4/192.168.1.1/tcp/1234/http",
}, {
hostname: "2001:db8::68",
port: 1234,
expected: "/ip6/2001:db8::68/tcp/1234/http",
}, {
hostname: "example.com",
port: 1234,
expected: "/dns/example.com/tcp/1234/http",
}}

for _, tc := range tcs {
t.Run("", func(t *testing.T) {
ma, err := ToHttpMultiaddr(tc.hostname, tc.port)
require.NoError(t, err)
require.Equal(t, tc.expected, ma.String())
})
}
}