diff --git a/cmd-car-split.go b/cmd-car-split.go index ca3a31bb..adbf1017 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -49,7 +49,10 @@ var ( maxSectionSize = 2 << 20 // 2 MiB ) -const maxLinks = 432000 / 18 // 18 subsets +const ( + maxLinks = 432000 / 18 // 18 subsets + bufSize = ((16 << 20) / 128 * 127) +) type subsetInfo struct { fileName string @@ -121,7 +124,6 @@ func newCmd_SplitCar() *cli.Command { bufferedWriter *bufio.Writer currentSubsetInfo subsetInfo subsetLinks []datamodel.Link - writer io.Writer carFiles []carFile metadata *splitcarfetcher.Metadata ) @@ -168,31 +170,17 @@ func newCmd_SplitCar() *cli.Command { outputDir = "." } - cp := new(commp.Calc) - createNewFile := func() error { if currentFile != nil { - sl, err := writeSubsetNode(currentSubsetInfo, writer) + sl, err := writeSubsetNode(currentSubsetInfo, bufferedWriter) if err != nil { return fmt.Errorf("failed to write subset node: %w", err) } subsetLinks = append(subsetLinks, sl) - rawCommP, ps, err := cp.Digest() - if err != nil { - return fmt.Errorf("failed to calculate commp digest: %w", err) - } - - commCid, err := commcid.DataCommitmentV1ToCID(rawCommP) - if err != nil { - return fmt.Errorf("failed to calculate commitment to cid: %w", err) - } - cf := carFile{ name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), - commP: commCid, payloadCid: sl.(cidlink.Link).Cid, - paddedSize: ps, fileSize: currentFileSize, } carFiles = append(carFiles, cf) @@ -203,8 +191,6 @@ func newCmd_SplitCar() *cli.Command { Name: currentSubsetInfo.fileName, ContentSize: currentFileSize - hdrSize, HeaderSize: hdrSize, - CommP: commCid, - PaddedSize: ps, }) err = closeFile(bufferedWriter, currentFile) @@ -217,7 +203,6 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to replace root: %w", err) } - cp.Reset() } currentFileNum++ @@ -228,9 +213,8 @@ func newCmd_SplitCar() *cli.Command { } bufferedWriter = bufio.NewWriter(currentFile) - writer = io.MultiWriter(bufferedWriter, cp) - if err := car.WriteHeader(hdr, writer); err != nil { + if err := car.WriteHeader(hdr, bufferedWriter); err != nil { return fmt.Errorf("failed to write header: %w", err) } @@ -241,7 +225,7 @@ func newCmd_SplitCar() *cli.Command { } writeObject := func(data []byte) error { - _, err := writer.Write(data) + _, err := bufferedWriter.Write(data) if err != nil { return fmt.Errorf("failed to write object to car file: %s, error: %w", currentFile.Name(), err) } @@ -318,7 +302,7 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to run accumulator while accumulating objects: %w", err) } - sl, err := writeSubsetNode(currentSubsetInfo, writer) + sl, err := writeSubsetNode(currentSubsetInfo, bufferedWriter) if err != nil { return fmt.Errorf("failed to write subset node: %w", err) } @@ -339,26 +323,14 @@ func newCmd_SplitCar() *cli.Command { return fmt.Errorf("failed to construct epochNode: %w", err) } - _, err = writeNode(epochNode, writer) + _, err = writeNode(epochNode, bufferedWriter) if err != nil { return fmt.Errorf("failed to write epochNode: %w", err) } - rawCommP, ps, err := cp.Digest() - if err != nil { - return fmt.Errorf("failed to calculate commp digest: %w", err) - } - - commCid, err := commcid.DataCommitmentV1ToCID(rawCommP) - if err != nil { - return fmt.Errorf("failed to calculate commitment to cid: %w", err) - } - cf := carFile{ name: fmt.Sprintf("epoch-%d-%d.car", epoch, currentFileNum), - commP: commCid, payloadCid: sl.(cidlink.Link).Cid, - paddedSize: ps, fileSize: currentFileSize, } @@ -369,7 +341,6 @@ func newCmd_SplitCar() *cli.Command { Name: currentSubsetInfo.fileName, ContentSize: currentFileSize - hdrSize, HeaderSize: hdrSize, - CommP: commCid, }) err = closeFile(bufferedWriter, currentFile) @@ -394,17 +365,26 @@ func newCmd_SplitCar() *cli.Command { return err } defer w.Flush() - for _, c := range carFiles { + for idx, c := range carFiles { + commP, paddedPieceSize, err := calcCommP(c.name) + if err != nil { + return fmt.Errorf("failed to calculate commP: %w", err) + } + err = w.Write([]string{ c.name, - c.commP.String(), + commP.String(), c.payloadCid.String(), - strconv.FormatUint(c.paddedSize, 10), + strconv.FormatUint(paddedPieceSize, 10), strconv.FormatUint(c.fileSize, 10), }) if err != nil { return fmt.Errorf("failed to write metatadata csv: %w", err) } + + // it is assumed that metadata and car files are in the same order + metadata.CarPieces.CarPieces[idx].CommP = commP + metadata.CarPieces.CarPieces[idx].PaddedSize = paddedPieceSize } err = writeMetadata(metadata, epoch) @@ -738,3 +718,27 @@ func fetchFromOffset(url string, offset int64) ([]byte, error) { return io.ReadAll(resp.Body) } + +func calcCommP(fileName string) (cid.Cid, uint64, error) { + r, err := os.Open(fileName) + if err != nil { + return cid.Undef, 0, fmt.Errorf("failed to open file: %w", err) + } + cp := new(commp.Calc) + + streamBuf := bufio.NewReaderSize(io.TeeReader(r, cp), bufSize) + + _, err = io.Copy(io.Discard, streamBuf) + if err != nil { + return cid.Undef, 0, fmt.Errorf("failed to copy stream: %w", err) + } + + rawCommP, paddedSize, err := cp.Digest() + if err != nil { + return cid.Undef, 0, fmt.Errorf("failed to calculate CommP: %w", err) + } + + commCid, err := commcid.DataCommitmentV1ToCID(rawCommP) + + return commCid, paddedSize, err +}