forked from Finschia/finschia-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
109 lines (96 loc) · 3.06 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package snapshots
import (
"bufio"
"compress/zlib"
"io"
protoio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
sdkerrors "github.com/Finschia/finschia-sdk/types/errors"
)
const (
// Do not change chunk size without new snapshot format (must be uniform across nodes)
snapshotChunkSize = uint64(10e6)
snapshotBufferSize = int(snapshotChunkSize)
// Do not change compression level without new snapshot format (must be uniform across nodes)
snapshotCompressionLevel = 7
)
// StreamWriter set up a stream pipeline to serialize snapshot nodes:
// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
type StreamWriter struct {
chunkWriter *ChunkWriter
bufWriter *bufio.Writer
zWriter *zlib.Writer
protoWriter protoio.WriteCloser
}
// NewStreamWriter set up a stream pipeline to serialize snapshot DB records.
func NewStreamWriter(ch chan<- io.ReadCloser) *StreamWriter {
chunkWriter := NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, err := zlib.NewWriterLevel(bufWriter, snapshotCompressionLevel)
if err != nil {
chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure"))
return nil
}
protoWriter := protoio.NewDelimitedWriter(zWriter)
return &StreamWriter{
chunkWriter: chunkWriter,
bufWriter: bufWriter,
zWriter: zWriter,
protoWriter: protoWriter,
}
}
// WriteMsg implements protoio.Write interface
func (sw *StreamWriter) WriteMsg(msg proto.Message) error {
return sw.protoWriter.WriteMsg(msg)
}
// Close implements io.Closer interface
func (sw *StreamWriter) Close() error {
if err := sw.protoWriter.Close(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
if err := sw.zWriter.Close(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
if err := sw.bufWriter.Flush(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
return sw.chunkWriter.Close()
}
// CloseWithError pass error to chunkWriter
func (sw *StreamWriter) CloseWithError(err error) {
sw.chunkWriter.CloseWithError(err)
}
// StreamReader set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
type StreamReader struct {
chunkReader *ChunkReader
zReader io.ReadCloser
protoReader protoio.ReadCloser
}
// NewStreamReader set up a restore stream pipeline.
func NewStreamReader(chunks <-chan io.ReadCloser) (*StreamReader, error) {
chunkReader := NewChunkReader(chunks)
zReader, err := zlib.NewReader(chunkReader)
if err != nil {
return nil, sdkerrors.Wrap(err, "zlib failure")
}
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
return &StreamReader{
chunkReader: chunkReader,
zReader: zReader,
protoReader: protoReader,
}, nil
}
// ReadMsg implements protoio.Reader interface
func (sr *StreamReader) ReadMsg(msg proto.Message) error {
return sr.protoReader.ReadMsg(msg)
}
// Close implements io.Closer interface
func (sr *StreamReader) Close() error {
sr.protoReader.Close()
sr.zReader.Close()
return sr.chunkReader.Close()
}