Skip to content

Commit 056fe16

Browse files
committed
Add versioning to config, and support for data.car.from_pieces.piece_to_uri
Closes rpcpool#80, rpcpool#79
1 parent b30e35f commit 056fe16

File tree

3 files changed

+129
-68
lines changed

3 files changed

+129
-68
lines changed

cmd-check-deals.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func newCmd_check_deals() *cli.Command {
121121
epoch := *config.Epoch
122122
isLassieMode := config.IsFilecoinMode()
123123
isCarMode := !isLassieMode
124-
if isCarMode && config.IsSplitCarMode() {
124+
if isCarMode && config.IsCarFromPieces() {
125125
klog.Infof("Checking pieces for epoch %d from %q", epoch, config.ConfigFilepath())
126126

127127
metadata, err := splitcarfetcher.MetadataFromYaml(string(config.Data.Car.FromPieces.Metadata.URI))

config.go

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,15 @@ import (
1212
"github.com/libp2p/go-libp2p/core/peer"
1313
)
1414

15+
const ConfigVersion = 1
16+
1517
type URI string
1618

19+
// String() returns the URI as a string.
20+
func (u URI) String() string {
21+
return string(u)
22+
}
23+
1724
// IsZero returns true if the URI is empty.
1825
func (u URI) IsZero() bool {
1926
return u == ""
@@ -93,10 +100,15 @@ func hashFileSha256(filePath string) (string, error) {
93100
return fmt.Sprintf("%x", h.Sum(nil)), nil
94101
}
95102

103+
type PieceURLInfo struct {
104+
URI URI `json:"uri" yaml:"uri"` // URL to the piece.
105+
}
106+
96107
type Config struct {
97108
originalFilepath string
98109
hashOfConfigFile string
99110
Epoch *uint64 `json:"epoch" yaml:"epoch"`
111+
Version *uint64 `json:"version" yaml:"version"`
100112
Data struct {
101113
Car *struct {
102114
URI URI `json:"uri" yaml:"uri"`
@@ -107,6 +119,7 @@ type Config struct {
107119
Deals struct {
108120
URI URI `json:"uri" yaml:"uri"` // Local path to the deals file.
109121
} `json:"deals" yaml:"deals"`
122+
PieceToURI map[cid.Cid]PieceURLInfo `json:"piece_to_uri" yaml:"piece_to_uri"` // Map of piece CID to URL.
110123
} `json:"from_pieces" yaml:"from_pieces"`
111124
} `json:"car" yaml:"car"`
112125
Filecoin *struct {
@@ -173,8 +186,12 @@ func (c *Config) IsFilecoinMode() bool {
173186
return c.Data.Filecoin != nil && c.Data.Filecoin.Enable
174187
}
175188

176-
func (c *Config) IsSplitCarMode() bool {
177-
return c.Data.Car != nil && c.Data.Car.FromPieces != nil && !c.Data.Car.FromPieces.Metadata.URI.IsZero() && !c.Data.Car.FromPieces.Deals.URI.IsZero()
189+
func (c *Config) IsCarFromPieces() bool {
190+
if c.Data.Car == nil || c.Data.Car.FromPieces == nil {
191+
return false
192+
}
193+
fromPieces := c.Data.Car.FromPieces
194+
return !fromPieces.Metadata.URI.IsZero() && (!fromPieces.Deals.URI.IsZero() || len(fromPieces.PieceToURI) > 0)
178195
}
179196

180197
type ConfigSlice []*Config
@@ -223,6 +240,12 @@ func (c *Config) Validate() error {
223240
if c.Epoch == nil {
224241
return fmt.Errorf("epoch must be set")
225242
}
243+
if c.Version == nil {
244+
return fmt.Errorf("version must be set")
245+
}
246+
if *c.Version != ConfigVersion {
247+
return fmt.Errorf("version must be %d", ConfigVersion)
248+
}
226249
// Distinguish between CAR-mode and Filecoin-mode.
227250
// In CAR-mode, the data is fetched from a CAR file (local or remote).
228251
// In Filecoin-mode, the data is fetched from Filecoin directly (by CID via Lassie).
@@ -254,12 +277,28 @@ func (c *Config) Validate() error {
254277
}
255278
}
256279
{
257-
if c.Data.Car.FromPieces.Deals.URI.IsZero() {
258-
return fmt.Errorf("data.car.from_pieces.deals.uri must be set")
280+
if c.Data.Car.FromPieces.Deals.URI.IsZero() && len(c.Data.Car.FromPieces.PieceToURI) == 0 {
281+
return fmt.Errorf("data.car.from_pieces.deals.uri or data.car.from_pieces.piece_to_uri must be set")
259282
}
260-
if !c.Data.Car.FromPieces.Deals.URI.IsLocal() {
283+
if !c.Data.Car.FromPieces.Deals.URI.IsZero() && len(c.Data.Car.FromPieces.PieceToURI) > 0 {
284+
return fmt.Errorf("data.car.from_pieces.deals.uri and data.car.from_pieces.piece_to_uri cannot both be set")
285+
}
286+
if !c.Data.Car.FromPieces.Deals.URI.IsZero() && !c.Data.Car.FromPieces.Deals.URI.IsLocal() {
261287
return fmt.Errorf("data.car.from_pieces.deals.uri must be a local file")
262288
}
289+
if len(c.Data.Car.FromPieces.PieceToURI) > 0 {
290+
for pieceCID, uri := range c.Data.Car.FromPieces.PieceToURI {
291+
if !pieceCID.Defined() {
292+
return fmt.Errorf("data.car.from_pieces.piece_to_uri[%s] must be a valid CID", pieceCID)
293+
}
294+
if uri.URI.IsZero() {
295+
return fmt.Errorf("data.car.from_pieces.piece_to_uri[%s].uri must be set", pieceCID)
296+
}
297+
if !uri.URI.IsRemoteWeb() {
298+
return fmt.Errorf("data.car.from_pieces.piece_to_uri[%s].uri must be a remote web URI", pieceCID)
299+
}
300+
}
301+
}
263302
}
264303
}
265304
// CidToOffsetAndSize and CidToOffset cannot be both set or both unset.
@@ -340,7 +379,7 @@ func (c *Config) Validate() error {
340379
if !c.Data.Car.FromPieces.Metadata.URI.IsValid() {
341380
return fmt.Errorf("data.car.from_pieces.metadata.uri is invalid")
342381
}
343-
if !c.Data.Car.FromPieces.Deals.URI.IsValid() {
382+
if !c.Data.Car.FromPieces.Deals.URI.IsZero() && !c.Data.Car.FromPieces.Deals.URI.IsValid() {
344383
return fmt.Errorf("data.car.from_pieces.deals.uri is invalid")
345384
}
346385
} else {

epoch.go

Lines changed: 83 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -283,76 +283,95 @@ func NewEpochFromConfig(
283283
var localCarReader *carv2.Reader
284284
var remoteCarReader ReaderAtCloser
285285
var err error
286-
if config.IsSplitCarMode() {
286+
if config.IsCarFromPieces() {
287287

288288
metadata, err := splitcarfetcher.MetadataFromYaml(string(config.Data.Car.FromPieces.Metadata.URI))
289289
if err != nil {
290290
return nil, fmt.Errorf("failed to read pieces metadata: %w", err)
291291
}
292292

293-
dealRegistry, err := splitcarfetcher.DealsFromCSV(string(config.Data.Car.FromPieces.Deals.URI))
294-
if err != nil {
295-
return nil, fmt.Errorf("failed to read deals: %w", err)
296-
}
293+
isFromDeals := !config.Data.Car.FromPieces.Deals.URI.IsZero()
297294

298-
lotusAPIAddress := "https://api.node.glif.io"
299-
cl := jsonrpc.NewClient(lotusAPIAddress)
300-
dm := splitcarfetcher.NewMinerInfo(
301-
cl,
302-
5*time.Minute,
303-
5*time.Second,
304-
)
295+
if isFromDeals {
296+
dealRegistry, err := splitcarfetcher.DealsFromCSV(string(config.Data.Car.FromPieces.Deals.URI))
297+
if err != nil {
298+
return nil, fmt.Errorf("failed to read deals: %w", err)
299+
}
305300

306-
scr, err := splitcarfetcher.NewSplitCarReader(metadata.CarPieces,
307-
func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) {
308-
minerID, ok := dealRegistry.GetMinerByPieceCID(piece.CommP)
309-
if !ok {
310-
return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP)
311-
}
312-
klog.Infof("piece CID %s is stored on miner %s", piece.CommP, minerID)
313-
minerInfo, err := dm.GetProviderInfo(c.Context, minerID)
314-
if err != nil {
315-
return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err)
316-
}
317-
if len(minerInfo.Multiaddrs) == 0 {
318-
return nil, fmt.Errorf("miner %s has no multiaddrs", minerID)
319-
}
320-
spew.Dump(minerInfo)
321-
// extract the IP address from the multiaddr:
322-
split := multiaddr.Split(minerInfo.Multiaddrs[0])
323-
if len(split) < 2 {
324-
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
325-
}
326-
component0 := split[0].(*multiaddr.Component)
327-
component1 := split[1].(*multiaddr.Component)
328-
329-
var ip string
330-
var port string
331-
332-
if component0.Protocol().Code == multiaddr.P_IP4 {
333-
ip = component0.Value()
334-
port = component1.Value()
335-
} else if component1.Protocol().Code == multiaddr.P_IP4 {
336-
ip = component1.Value()
337-
port = component0.Value()
338-
} else {
339-
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
340-
}
341-
// reset the port to 80:
342-
// TODO: use the appropriate port (80, better if 443 with TLS)
343-
port = "80"
344-
minerIP := fmt.Sprintf("%s:%s", ip, port)
345-
klog.Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
346-
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
347-
return splitcarfetcher.NewRemoteFileSplitCarReader(
348-
piece.CommP.String(),
349-
formattedURL,
350-
)
351-
})
352-
if err != nil {
353-
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
301+
lotusAPIAddress := "https://api.node.glif.io"
302+
cl := jsonrpc.NewClient(lotusAPIAddress)
303+
dm := splitcarfetcher.NewMinerInfo(
304+
cl,
305+
5*time.Minute,
306+
5*time.Second,
307+
)
308+
309+
scr, err := splitcarfetcher.NewSplitCarReader(
310+
metadata.CarPieces,
311+
func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) {
312+
minerID, ok := dealRegistry.GetMinerByPieceCID(piece.CommP)
313+
if !ok {
314+
return nil, fmt.Errorf("failed to find miner for piece CID %s", piece.CommP)
315+
}
316+
klog.Infof("piece CID %s is stored on miner %s", piece.CommP, minerID)
317+
minerInfo, err := dm.GetProviderInfo(c.Context, minerID)
318+
if err != nil {
319+
return nil, fmt.Errorf("failed to get miner info for miner %s, for piece %s: %w", minerID, piece.CommP, err)
320+
}
321+
if len(minerInfo.Multiaddrs) == 0 {
322+
return nil, fmt.Errorf("miner %s has no multiaddrs", minerID)
323+
}
324+
spew.Dump(minerInfo)
325+
// extract the IP address from the multiaddr:
326+
split := multiaddr.Split(minerInfo.Multiaddrs[0])
327+
if len(split) < 2 {
328+
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
329+
}
330+
component0 := split[0].(*multiaddr.Component)
331+
component1 := split[1].(*multiaddr.Component)
332+
333+
var ip string
334+
// TODO: use the appropriate port (80, better if 443 with TLS)
335+
port := "80"
336+
337+
if component0.Protocol().Code == multiaddr.P_IP4 {
338+
ip = component0.Value()
339+
} else if component1.Protocol().Code == multiaddr.P_IP4 {
340+
ip = component1.Value()
341+
} else {
342+
return nil, fmt.Errorf("invalid multiaddr: %s", minerInfo.Multiaddrs[0])
343+
}
344+
minerIP := fmt.Sprintf("%s:%s", ip, port)
345+
klog.Infof("piece CID %s is stored on miner %s (%s)", piece.CommP, minerID, minerIP)
346+
formattedURL := fmt.Sprintf("http://%s/piece/%s", minerIP, piece.CommP.String())
347+
return splitcarfetcher.NewRemoteFileSplitCarReader(
348+
piece.CommP.String(),
349+
formattedURL,
350+
)
351+
})
352+
if err != nil {
353+
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
354+
}
355+
remoteCarReader = scr
356+
} else {
357+
// is from pieceToURL mapping:
358+
scrFromURLs, err := splitcarfetcher.NewSplitCarReader(
359+
metadata.CarPieces,
360+
func(piece carlet.CarFile) (splitcarfetcher.ReaderAtCloserSize, error) {
361+
pieceURL, ok := config.Data.Car.FromPieces.PieceToURI[piece.CommP]
362+
if !ok {
363+
return nil, fmt.Errorf("failed to find URL for piece CID %s", piece.CommP)
364+
}
365+
return splitcarfetcher.NewRemoteFileSplitCarReader(
366+
piece.CommP.String(),
367+
pieceURL.URI.String(),
368+
)
369+
})
370+
if err != nil {
371+
return nil, fmt.Errorf("failed to open CAR file from pieces: %w", err)
372+
}
373+
remoteCarReader = scrFromURLs
354374
}
355-
remoteCarReader = scr
356375
} else {
357376
localCarReader, remoteCarReader, err = openCarStorage(c.Context, string(config.Data.Car.URI))
358377
if err != nil {
@@ -397,6 +416,9 @@ func NewEpochFromConfig(
397416
headerSize := uint64(buf.Len())
398417
ep.carHeaderSize = headerSize
399418
}
419+
if remoteCarReader == nil && localCarReader == nil {
420+
return nil, fmt.Errorf("no CAR reader available")
421+
}
400422
}
401423
{
402424
sigExistsFile, err := openIndexStorage(

0 commit comments

Comments
 (0)