Skip to content

Commit

Permalink
fix commp mismatch (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor authored Nov 10, 2024
1 parent 6227dd8 commit 09451bb
Showing 1 changed file with 45 additions and 41 deletions.
86 changes: 45 additions & 41 deletions cmd-car-split.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ var (
maxSectionSize = 2 << 20 // 2 MiB
)

const maxLinks = 432000 / 18 // 18 subsets
const (
maxLinks = 432000 / 18 // 18 subsets
bufSize = ((16 << 20) / 128 * 127)
)

type subsetInfo struct {
fileName string
Expand Down Expand Up @@ -121,7 +124,6 @@ func newCmd_SplitCar() *cli.Command {
bufferedWriter *bufio.Writer
currentSubsetInfo subsetInfo
subsetLinks []datamodel.Link
writer io.Writer
carFiles []carFile
metadata *splitcarfetcher.Metadata
)
Expand Down Expand Up @@ -168,31 +170,17 @@ func newCmd_SplitCar() *cli.Command {
outputDir = "."
}

cp := new(commp.Calc)

createNewFile := func() error {
if currentFile != nil {
sl, err := writeSubsetNode(currentSubsetInfo, writer)
sl, err := writeSubsetNode(currentSubsetInfo, bufferedWriter)
if err != nil {
return fmt.Errorf("failed to write subset node: %w", err)
}
subsetLinks = append(subsetLinks, sl)

rawCommP, ps, err := cp.Digest()
if err != nil {
return fmt.Errorf("failed to calculate commp digest: %w", err)
}

commCid, err := commcid.DataCommitmentV1ToCID(rawCommP)
if err != nil {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

cf := carFile{
name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum),
commP: commCid,
payloadCid: sl.(cidlink.Link).Cid,
paddedSize: ps,
fileSize: currentFileSize,
}
carFiles = append(carFiles, cf)
Expand All @@ -203,8 +191,6 @@ func newCmd_SplitCar() *cli.Command {
Name: currentSubsetInfo.fileName,
ContentSize: currentFileSize - hdrSize,
HeaderSize: hdrSize,
CommP: commCid,
PaddedSize: ps,
})

err = closeFile(bufferedWriter, currentFile)
Expand All @@ -217,7 +203,6 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to replace root: %w", err)
}

cp.Reset()
}

currentFileNum++
Expand All @@ -228,9 +213,8 @@ func newCmd_SplitCar() *cli.Command {
}

bufferedWriter = bufio.NewWriter(currentFile)
writer = io.MultiWriter(bufferedWriter, cp)

if err := car.WriteHeader(hdr, writer); err != nil {
if err := car.WriteHeader(hdr, bufferedWriter); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

Expand All @@ -241,7 +225,7 @@ func newCmd_SplitCar() *cli.Command {
}

writeObject := func(data []byte) error {
_, err := writer.Write(data)
_, err := bufferedWriter.Write(data)
if err != nil {
return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err)
}
Expand Down Expand Up @@ -318,7 +302,7 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to run accumulator while accumulating objects: %w", err)
}

sl, err := writeSubsetNode(currentSubsetInfo, writer)
sl, err := writeSubsetNode(currentSubsetInfo, bufferedWriter)
if err != nil {
return fmt.Errorf("failed to write subset node: %w", err)
}
Expand All @@ -339,26 +323,14 @@ func newCmd_SplitCar() *cli.Command {
return fmt.Errorf("failed to construct epochNode: %w", err)
}

_, err = writeNode(epochNode, writer)
_, err = writeNode(epochNode, bufferedWriter)
if err != nil {
return fmt.Errorf("failed to write epochNode: %w", err)
}

rawCommP, ps, err := cp.Digest()
if err != nil {
return fmt.Errorf("failed to calculate commp digest: %w", err)
}

commCid, err := commcid.DataCommitmentV1ToCID(rawCommP)
if err != nil {
return fmt.Errorf("failed to calculate commitment to cid: %w", err)
}

cf := carFile{
name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum),
commP: commCid,
payloadCid: sl.(cidlink.Link).Cid,
paddedSize: ps,
fileSize: currentFileSize,
}

Expand All @@ -369,7 +341,6 @@ func newCmd_SplitCar() *cli.Command {
Name: currentSubsetInfo.fileName,
ContentSize: currentFileSize - hdrSize,
HeaderSize: hdrSize,
CommP: commCid,
})

err = closeFile(bufferedWriter, currentFile)
Expand All @@ -394,17 +365,26 @@ func newCmd_SplitCar() *cli.Command {
return err
}
defer w.Flush()
for _, c := range carFiles {
for idx, c := range carFiles {
commP, paddedPieceSize, err := calcCommP(c.name)
if err != nil {
return fmt.Errorf("failed to calculate commP: %w", err)
}

err = w.Write([]string{
c.name,
c.commP.String(),
commP.String(),
c.payloadCid.String(),
strconv.FormatUint(c.paddedSize, 10),
strconv.FormatUint(paddedPieceSize, 10),
strconv.FormatUint(c.fileSize, 10),
})
if err != nil {
return fmt.Errorf("failed to write metatadata csv: %w", err)
}

// it is assumed that metadata and car files are in the same order
metadata.CarPieces.CarPieces[idx].CommP = commP
metadata.CarPieces.CarPieces[idx].PaddedSize = paddedPieceSize
}

err = writeMetadata(metadata, epoch)
Expand Down Expand Up @@ -738,3 +718,27 @@ func fetchFromOffset(url string, offset int64) ([]byte, error) {

return io.ReadAll(resp.Body)
}

func calcCommP(fileName string) (cid.Cid, uint64, error) {
r, err := os.Open(fileName)
if err != nil {
return cid.Undef, 0, fmt.Errorf("failed to open file: %w", err)
}
cp := new(commp.Calc)

streamBuf := bufio.NewReaderSize(io.TeeReader(r, cp), bufSize)

_, err = io.Copy(io.Discard, streamBuf)
if err != nil {
return cid.Undef, 0, fmt.Errorf("failed to copy stream: %w", err)
}

rawCommP, paddedSize, err := cp.Digest()
if err != nil {
return cid.Undef, 0, fmt.Errorf("failed to calculate CommP: %w", err)
}

commCid, err := commcid.DataCommitmentV1ToCID(rawCommP)

return commCid, paddedSize, err
}

0 comments on commit 09451bb

Please sign in to comment.