Skip to content

Commit

Permalink
concurrent dump-plain
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Dec 23, 2022
1 parent 6d83c97 commit 1062836
Showing 1 changed file with 160 additions and 21 deletions.
181 changes: 160 additions & 21 deletions cmd/cronosd/cmd/changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"

"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/server"
"github.com/cosmos/iavl"
"github.com/golang/protobuf/jsonpb"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cast"
"github.com/spf13/cobra"
Expand All @@ -23,6 +26,7 @@ const (
flagStartVersion = "start-version"
flagEndVersion = "end-version"
flagOutput = "output"
flagConcurrency = "concurrency"
)

func ChangeSetGroupCmd() *cobra.Command {
Expand All @@ -35,6 +39,7 @@ func ChangeSetGroupCmd() *cobra.Command {
DumpSSTChangeSetCmd(),
IngestSSTCmd(),
ConvertPlainToSSTCmd(),
PrintPlainFileCmd(),
)
return cmd
}
Expand Down Expand Up @@ -67,47 +72,90 @@ func DumpFileChangeSetCmd() *cobra.Command {
if err != nil {
return err
}
concurrency, err := cmd.Flags().GetInt(flagConcurrency)
if err != nil {
return err
}

output, err := cmd.Flags().GetString(flagOutput)
if err != nil {
return err
}

var tmpDir string
var writer io.Writer
if output == "-" {
writer = os.Stdout

tmpDir = "/tmp"
} else {
fp, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
if err != nil {
return err
}
defer fp.Close()
bufWriter := bufio.NewWriter(fp)
defer bufWriter.Flush()
writer = fp

writer = bufWriter
tmpDir = filepath.Dir(output)
}

var versionHeader [16]byte
tree := iavl.NewImmutableTree(db, cacheSize, true)
return tree.TraverseStateChanges(int64(startVersion), int64(endVersion), func(version int64, changeSet *iavl.ChangeSet) error {
bz, err := proto.Marshal(changeSet)
tree, err := iavl.NewMutableTree(db, cacheSize, true)
if err != nil {
return err
}
if endVersion == 0 {
latestVersion, err := tree.LazyLoadVersion(0)
if err != nil {
return err
}
endVersion = int(latestVersion)
}

binary.BigEndian.PutUint64(versionHeader[:8], uint64(version))
binary.PutUvarint(versionHeader[8:16], uint64(len(bz)))
works := splitWorkLoad(concurrency, Range{Start: startVersion, End: endVersion})

chs := make([]chan *os.File, len(works))
for i := 0; i < len(works); i++ {
chs[i] = make(chan *os.File, 1)
go func(i int) {
defer close(chs[i])
work := works[i]
tmpFile, err := dumpRangeBlocksWorker(tmpDir, tree.ImmutableTree, int64(work.Start), int64(work.End))
if err != nil {
fmt.Fprintf(os.Stderr, "worker failed: start: %d, end: %d, err: %e", work.Start, work.End, err)
return
}
// seek to begining, prepare to read
if _, err := tmpFile.Seek(0, 0); err != nil {
fmt.Fprintf(os.Stderr, "seek failed: %e", err)
os.Remove(tmpFile.Name())
return
}
chs[i] <- tmpFile
}(i)
}

writer.Write(versionHeader[:])
writer.Write(bz)
return nil
})
for i, ch := range chs {
tmpFile, ok := <-ch
if !ok {
return fmt.Errorf("worker failed: %d", i)
}
defer func() {
tmpFile.Close()
os.Remove(tmpFile.Name())
}()

if _, err := io.Copy(writer, tmpFile); err != nil {
return err
}
}
return nil
},
}
cmd.Flags().Int(flagStartVersion, 1, "The start version")
cmd.Flags().Int(flagEndVersion, 0, "The end version, exclusive")
cmd.Flags().String(flagOutput, "-", "Output file, default to stdout")
cmd.Flags().Int(flagConcurrency, runtime.NumCPU(), "Number concurrent goroutines to parallelize the work")
cmd.Flags().Int(server.FlagIAVLCacheSize, 7812500, "size of the iavl tree cache")
return cmd
}

Expand Down Expand Up @@ -229,6 +277,80 @@ func ConvertPlainToSSTCmd() *cobra.Command {
return cmd
}

func PrintPlainFileCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "print-plain [plain-file]",
Short: "Pretty-print content of plain changeset file",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
var (
err error
reader io.Reader
)
if args[0] == "-" {
reader = os.Stdin
} else {
reader, err = os.Open(args[0])
if err != nil {
return err
}
}
marshaler := jsonpb.Marshaler{}
offset, err := readPlainFile(reader, func(version int64, changeSet *iavl.ChangeSet) error {
fmt.Printf("version: %d\n", version)
for _, pair := range changeSet.Pairs {
js, err := marshaler.MarshalToString(pair)
if err != nil {
return err
}
fmt.Println(js)
}
return nil
})
if err == io.ErrUnexpectedEOF {
// incomplete end of file, we'll output a warning and process the completed versions.
fmt.Fprintln(os.Stderr, "file incomplete, the completed versions are processed, the last completed file offset: %d\n", offset)
} else if err != nil {
return err
}
return nil
},
}
return cmd
}

func dumpRangeBlocksWorker(dir string, tree *iavl.ImmutableTree, startVersion, endVersion int64) (*os.File, error) {
fp, err := ioutil.TempFile(dir, "tmp-*")
if err != nil {
return nil, err
}
writer := bufio.NewWriter(fp)
defer writer.Flush()

if err := dumpRangeBlocks(writer, tree, startVersion, endVersion); err != nil {
os.Remove(fp.Name())
return nil, err
}
return fp, nil
}

func dumpRangeBlocks(writer io.Writer, tree *iavl.ImmutableTree, startVersion, endVersion int64) error {
var versionHeader [16]byte
return tree.TraverseStateChanges(int64(startVersion), int64(endVersion), func(version int64, changeSet *iavl.ChangeSet) error {
bz, err := proto.Marshal(changeSet)
if err != nil {
return err
}

binary.BigEndian.PutUint64(versionHeader[:8], uint64(version))
binary.BigEndian.PutUint64(versionHeader[8:16], uint64(len(bz)))

writer.Write(versionHeader[:])
writer.Write(bz)
return nil
})
}

func readPlainFile(input io.Reader, fn func(version int64, changeSet *iavl.ChangeSet) error) (int, error) {
var (
err error
Expand All @@ -243,16 +365,16 @@ func readPlainFile(input io.Reader, fn func(version int64, changeSet *iavl.Chang
}
version := binary.BigEndian.Uint64(versionHeader[:8])
size := int(binary.BigEndian.Uint64(versionHeader[8:16]))

var changeSet iavl.ChangeSet
if size > 0 {
buf := make([]byte, size)
if _, err = io.ReadFull(input, buf[:]); err != nil {
break
}

buf := make([]byte, size)
if _, err = io.ReadFull(input, buf[:]); err != nil {
break
}

if err = proto.Unmarshal(buf[:], &changeSet); err != nil {
return lastValidOffset, err
if err = proto.Unmarshal(buf[:], &changeSet); err != nil {
return lastValidOffset, err
}
}

if err = fn(int64(version), &changeSet); err != nil {
Expand Down Expand Up @@ -320,3 +442,20 @@ func newSSTFileWriter() *grocksdb.SSTFileWriter {
opts.SetCompressionOptionsZstdDictTrainer(true)
return grocksdb.NewSSTFileWriter(envOpts, opts)
}

type Range struct {
Start, End int
}

func splitWorkLoad(workers int, full Range) []Range {
var chunks []Range
chunkSize := (full.End - full.Start + workers - 1) / workers
for i := full.Start; i < full.End; i += chunkSize {
end := i + chunkSize
if end > full.End {
end = full.End
}
chunks = append(chunks, Range{Start: i, End: end})
}
return chunks
}

0 comments on commit 1062836

Please sign in to comment.