Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metadata #132

Merged
merged 10 commits into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 138 additions & 27 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/csv"
"fmt"
"io"
Expand All @@ -12,6 +14,7 @@ import (
"path/filepath"
"strconv"

"github.com/anjor/carlet"
commcid "github.com/filecoin-project/go-fil-commcid"
commp "github.com/filecoin-project/go-fil-commp-hashhash"
"github.com/filecoin-project/go-leb128"
Expand All @@ -28,11 +31,20 @@ import (
"github.com/rpcpool/yellowstone-faithful/carreader"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
splitcarfetcher "github.com/rpcpool/yellowstone-faithful/split-car-fetcher"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
"k8s.io/klog/v2"
)

var CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau")
var (
CBOR_SHA256_DUMMY_CID = cid.MustParse("bafyreics5uul5lbtxslcigtoa5fkba7qgwu7cyb7ih7z6fzsh4lgfgraau")
hdr = &car.CarHeader{
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
hdrSize, _ = car.HeaderSize(hdr)
)

const maxLinks = 432000 / 18 // 18 subsets

Expand All @@ -48,7 +60,7 @@ type carFile struct {
commP cid.Cid
payloadCid cid.Cid
paddedSize uint64
fileSize int64
fileSize uint64
}
anjor marked this conversation as resolved.
Show resolved Hide resolved

func newCmd_SplitCar() *cli.Command {
Expand Down Expand Up @@ -99,7 +111,35 @@ func newCmd_SplitCar() *cli.Command {
defer file.Close()
}

rd, err := carreader.New(file)
var (
currentFileSize uint64
currentFileNum int
currentFile *os.File
bufferedWriter *bufio.Writer
currentSubsetInfo subsetInfo
subsetLinks []datamodel.Link
writer io.Writer
carFiles []carFile
metadata *splitcarfetcher.Metadata
)

metadata = &splitcarfetcher.Metadata{}
headerBuf := new(bytes.Buffer)
teeReader := io.TeeReader(file, headerBuf)

streamBuf := bufio.NewReaderSize(teeReader, 1<<20)

actualHeader, headerSize, err := readHeader(streamBuf)
if err != nil {
return fmt.Errorf("failed to read header: %w", err)
}

encodedHeader := base64.StdEncoding.EncodeToString(actualHeader)

metadata.CarPieces = &carlet.CarPiecesAndMetadata{OriginalCarHeader: encodedHeader, OriginalCarHeaderSize: uint64(headerSize)}

combinedReader := io.MultiReader(headerBuf, file)
rd, err := carreader.New(io.NopCloser(combinedReader))
if err != nil {
return fmt.Errorf("failed to open CAR: %w", err)
}
Expand All @@ -117,7 +157,7 @@ func newCmd_SplitCar() *cli.Command {
}

epoch := c.Int("epoch")
maxFileSize := c.Int64("size")
maxFileSize := uint64(c.Int64("size"))
outputDir := c.String("output-dir")
meta := c.String("metadata")

Expand All @@ -127,17 +167,6 @@ func newCmd_SplitCar() *cli.Command {

cp := new(commp.Calc)

var (
currentFileSize int64
currentFileNum int
currentFile *os.File
bufferedWriter *bufio.Writer
currentSubsetInfo subsetInfo
subsetLinks []datamodel.Link
writer io.Writer
carFiles []carFile
)

createNewFile := func() error {

if currentFile != nil {
Expand All @@ -157,9 +186,25 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}
cf := carFile{
name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum),
commP: commCid,
payloadCid: sl.(cidlink.Link).Cid,
paddedSize: ps,
fileSize: currentFileSize,
}
carFiles = append(carFiles, cf)

metadata.CarPieces.CarPieces = append(
metadata.CarPieces.CarPieces,
carlet.CarFile{
Name: currentSubsetInfo.fileName,
ContentSize: currentFileSize - hdrSize,
HeaderSize: hdrSize,
CommP: commCid,
PaddedSize: ps,
})

err = closeFile(bufferedWriter, currentFile)
if err != nil {
return fmt.Errorf("failed to close file: %w", err)
Expand All @@ -183,17 +228,12 @@ func newCmd_SplitCar() *cli.Command {
bufferedWriter = bufio.NewWriter(currentFile)
writer = io.MultiWriter(bufferedWriter, cp)

// Write the header
hdr := car.CarHeader{
Roots: []cid.Cid{CBOR_SHA256_DUMMY_CID}, // placeholder
Version: 1,
}
if err := car.WriteHeader(&hdr, writer); err != nil {
if err := car.WriteHeader(hdr, writer); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

// Set the currentFileSize to the size of the header
currentFileSize = int64(len(nulRootCarHeader))
currentFileSize = uint64(len(nulRootCarHeader))
currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1}
return nil
}
Expand All @@ -203,7 +243,7 @@ func newCmd_SplitCar() *cli.Command {
if err != nil {
return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err)
}
currentFileSize += int64(len(data))
currentFileSize += uint64(len(data))
return nil
}

Expand Down Expand Up @@ -238,7 +278,8 @@ func newCmd_SplitCar() *cli.Command {
dagSize += owm.RawSectionSize()
}

if currentFile == nil || currentFileSize+int64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks {
if currentFile == nil || currentFileSize+uint64(dagSize) > maxFileSize || len(currentSubsetInfo.blockLinks) > maxLinks {
klog.Infof("Creating new file, currentFileSize: %d, dagSize: %d, maxFileSize: %d, maxLinks: %d, currentSubsetInfo.blockLinks: %d", currentFileSize, dagSize, maxFileSize, maxLinks, len(currentSubsetInfo.blockLinks))
err := createNewFile()
if err != nil {
return fmt.Errorf("failed to create a new file: %w", err)
Expand Down Expand Up @@ -311,8 +352,23 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

cf := carFile{name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), commP: commCid, payloadCid: sl.(cidlink.Link).Cid, paddedSize: ps, fileSize: currentFileSize}
cf := carFile{
name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum),
commP: commCid,
payloadCid: sl.(cidlink.Link).Cid,
paddedSize: ps,
fileSize: currentFileSize,
}

carFiles = append(carFiles, cf)
metadata.CarPieces.CarPieces = append(
metadata.CarPieces.CarPieces,
carlet.CarFile{
Name: currentSubsetInfo.fileName,
ContentSize: currentFileSize - hdrSize,
HeaderSize: hdrSize,
CommP: commCid,
})

err = closeFile(bufferedWriter, currentFile)
if err != nil {
Expand Down Expand Up @@ -342,8 +398,16 @@ func newCmd_SplitCar() *cli.Command {
c.commP.String(),
c.payloadCid.String(),
strconv.FormatUint(c.paddedSize, 10),
strconv.FormatInt(c.fileSize, 10),
strconv.FormatUint(c.fileSize, 10),
})
if err != nil {
return fmt.Errorf("failed to write metatadata csv: %w", err)
}
}

err = writeMetadata(metadata, epoch)
if err != nil {
return fmt.Errorf("failed to write metatadata yaml: %w", err)
}

return nil
Expand Down Expand Up @@ -419,3 +483,50 @@ func writeNode(node datamodel.Node, w io.Writer) (cid.Cid, error) {
}
return cd, nil
}

func writeMetadata(metadata *splitcarfetcher.Metadata, epoch int) error {
metadataFileName := fmt.Sprintf("epoch-%d-metadata.yaml", epoch)

// Open file in append mode
metadataFile, err := os.OpenFile(metadataFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open metadata file: %w", err)
}
defer metadataFile.Close()

encoder := yaml.NewEncoder(metadataFile)
err = encoder.Encode(metadata)
if err != nil {
return fmt.Errorf("failed to encode metadata: %w", err)
}

return nil
}

func readHeader(streamBuf *bufio.Reader) ([]byte, int64, error) {
maybeHeaderLen, err := streamBuf.Peek(varintSize)
if err != nil {
return nil, 0, fmt.Errorf("failed to read header: %s", err)
}

hdrLen, viLen := binary.Uvarint(maybeHeaderLen)
if hdrLen <= 0 || viLen < 0 {
return nil, 0, fmt.Errorf("unexpected header len = %d, varint len = %d", hdrLen, viLen)
}

actualViLen, err := io.CopyN(io.Discard, streamBuf, int64(viLen))
if err != nil {
return nil, 0, fmt.Errorf("failed to discard header varint: %s", err)
}
streamLen := actualViLen

headerBuf := new(bytes.Buffer)

actualHdrLen, err := io.CopyN(headerBuf, streamBuf, int64(hdrLen))
if err != nil {
return nil, 0, fmt.Errorf("failed to read header: %s", err)
}
streamLen += actualHdrLen

return headerBuf.Bytes(), streamLen, nil
}
Loading