Skip to content

Commit a2d2cad

Browse files
authored
change accumulation of wal files (#480)
change code after linters Co-authored-by: Фёдор Партанский <partanskiy.f@n-t.io>
1 parent 86be08d commit a2d2cad

File tree

6 files changed

+301
-174
lines changed

6 files changed

+301
-174
lines changed

pkg/wal/reader.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func NewLogRecordReader(logger api.Logger, fileName string) (*LogRecordReader, e
3636
}
3737

3838
var err error
39+
3940
r.logFile, err = os.Open(fileName)
4041
if err != nil {
4142
return nil, err
@@ -44,32 +45,42 @@ func NewLogRecordReader(logger api.Logger, fileName string) (*LogRecordReader, e
4445
_, err = r.logFile.Seek(0, io.SeekStart)
4546
if err != nil {
4647
_ = r.Close()
48+
4749
return nil, err
4850
}
4951

50-
//read the CRC-Anchor, the first record of every file
52+
// read the CRC-Anchor, the first record of every file
5153
recLen, crc, err := r.readHeader()
5254
if err != nil {
5355
_ = r.Close()
56+
5457
return nil, err
5558
}
59+
5660
padSize := getPadSize(int(recLen))
61+
5762
payload, err := r.readPayload(int(recLen) + padSize)
5863
if err != nil {
5964
_ = r.Close()
65+
6066
return nil, err
6167
}
62-
var record = &protos.LogRecord{}
68+
69+
record := &protos.LogRecord{}
70+
6371
err = proto.Unmarshal(payload[:recLen], record)
6472
if err != nil {
6573
_ = r.Close()
74+
6675
return nil, err
6776
}
6877

6978
if record.Type != protos.LogRecord_CRC_ANCHOR {
7079
_ = r.Close()
80+
7181
return nil, fmt.Errorf("failed reading CRC-Anchor from log file: %s", fileName)
7282
}
83+
7384
r.crc = crc
7485

7586
r.logger.Debugf("Initialized reader: CRC-Anchor: %08X, file: %s", r.crc, r.fileName)
@@ -100,22 +111,27 @@ func (r *LogRecordReader) Read() (*protos.LogRecord, error) {
100111
if err != nil {
101112
return nil, err
102113
}
114+
103115
padSize := getPadSize(int(recLen))
116+
104117
payload, err := r.readPayload(int(recLen) + padSize)
105118
if err != nil {
106119
return nil, err
107120
}
108-
var record = &protos.LogRecord{}
121+
122+
record := &protos.LogRecord{}
123+
109124
err = proto.Unmarshal(payload[:recLen], record)
110125
if err != nil {
111-
return nil, ErrWALUnmarshalPayload //fmt.Errorf("wal: failed to unmarshal payload: %w", err)
126+
return nil, ErrWALUnmarshalPayload // fmt.Errorf("wal: failed to unmarshal payload: %w", err)
112127
}
113128

114129
switch record.Type {
115130
case protos.LogRecord_ENTRY, protos.LogRecord_CONTROL:
116131
if !verifyCRC(r.crc, crc, payload) {
117132
return nil, ErrCRC
118133
}
134+
119135
fallthrough
120136
case protos.LogRecord_CRC_ANCHOR:
121137
r.crc = crc
@@ -130,9 +146,11 @@ func (r *LogRecordReader) Read() (*protos.LogRecord, error) {
130146
// If it fails, it fails like io.ReadFull().
131147
func (r *LogRecordReader) readHeader() (length, crc uint32, err error) {
132148
buff := make([]byte, recordHeaderSize)
149+
133150
n, err := io.ReadFull(r.logFile, buff)
134151
if err != nil {
135152
r.logger.Debugf("Failed to read header in full: expected=%d, actual=%d; error: %s", recordHeaderSize, n, err)
153+
136154
return 0, 0, err
137155
}
138156

@@ -147,9 +165,11 @@ func (r *LogRecordReader) readHeader() (length, crc uint32, err error) {
147165
// If it fails, it fails like io.ReadFull().
148166
func (r *LogRecordReader) readPayload(len int) (payload []byte, err error) {
149167
buff := make([]byte, len)
168+
150169
n, err := io.ReadFull(r.logFile, buff)
151170
if err != nil {
152171
r.logger.Debugf("Failed to read payload in full: expected=%d, actual=%d; error: %s", len, n, err)
172+
153173
return nil, err
154174
}
155175

@@ -158,5 +178,6 @@ func (r *LogRecordReader) readPayload(len int) (payload []byte, err error) {
158178

159179
func verifyCRC(prevCRC, expectedCRC uint32, data []byte) bool {
160180
dataCRC := crc32.Update(prevCRC, crcTable, data)
181+
161182
return dataCRC == expectedCRC
162183
}

pkg/wal/reader_test.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,18 @@ import (
2222
func TestLogRecordReader(t *testing.T) {
2323
testDir, err := ioutil.TempDir("", "unittest")
2424
assert.NoErrorf(t, err, "generate temporary test dir")
25+
2526
defer os.RemoveAll(testDir)
2627

2728
basicLog, err := zap.NewDevelopment()
2829
assert.NoError(t, err)
30+
2931
logger := basicLog.Sugar()
3032

3133
wal, err := Create(logger, testDir, nil)
3234
assert.NoError(t, err)
3335
assert.NotNil(t, wal)
36+
3437
if wal == nil {
3538
return
3639
}
@@ -42,24 +45,26 @@ func TestLogRecordReader(t *testing.T) {
4245
}
4346
err = wal.Append(rec1.Data, rec1.TruncateTo)
4447
assert.NoError(t, err)
45-
crc1 := wal.CRC()
4648

49+
crc1 := wal.CRC()
4750
rec2 := &smartbftprotos.LogRecord{
4851
Type: smartbftprotos.LogRecord_ENTRY,
4952
TruncateTo: false,
5053
Data: []byte{5, 6, 7, 8, 9, 10, 11, 12},
5154
}
55+
5256
err = wal.Append(rec2.Data, rec2.TruncateTo)
5357
assert.NoError(t, err)
54-
crc2 := wal.CRC()
5558

59+
crc2 := wal.CRC()
5660
rec3 := &smartbftprotos.LogRecord{
5761
Type: smartbftprotos.LogRecord_ENTRY,
5862
TruncateTo: false,
5963
Data: []byte{13, 14, 15, 16, 17, 18, 19, 20},
6064
}
6165
err = wal.Append(rec3.Data, rec3.TruncateTo)
6266
assert.NoError(t, err)
67+
6368
crc3 := wal.CRC()
6469
wal.Close()
6570

@@ -81,7 +86,8 @@ func TestLogRecordReader(t *testing.T) {
8186

8287
t.Run("bad anchor", func(t *testing.T) {
8388
testFile := filepath.Join(testDir, "test1.wal")
84-
copyFile(fileName, testFile)
89+
err = copyFile(fileName, testFile)
90+
assert.NoError(t, err)
8591
f, err := os.OpenFile(testFile, os.O_RDWR, walFilePermPrivateRW)
8692
assert.NoError(t, err)
8793
_, err = f.Seek(0, io.SeekStart)
@@ -98,8 +104,10 @@ func TestLogRecordReader(t *testing.T) {
98104

99105
t.Run("corrupt anchor crc", func(t *testing.T) {
100106
testFile := filepath.Join(testDir, "test2.wal")
101-
copyFile(fileName, testFile)
107+
err = copyFile(fileName, testFile)
108+
assert.NoError(t, err)
102109
f, err := os.OpenFile(fileName, os.O_RDWR, walFilePermPrivateRW)
110+
assert.NoError(t, err)
103111
h := make([]byte, 8)
104112
_, err = f.Read(h)
105113
assert.NoError(t, err)
@@ -130,8 +138,8 @@ func TestLogRecordReader(t *testing.T) {
130138

131139
t.Run("file with tail", func(t *testing.T) {
132140
testFile := filepath.Join(testDir, "test3.wal")
133-
copyFile(fileName, testFile)
134-
141+
err = copyFile(fileName, testFile)
142+
assert.NoError(t, err)
135143
f, err := os.OpenFile(testFile, os.O_RDWR, walFilePermPrivateRW)
136144
assert.NoError(t, err)
137145
_, err = f.Seek(0, io.SeekEnd)
@@ -159,8 +167,8 @@ func TestLogRecordReader(t *testing.T) {
159167

160168
t.Run("partial record", func(t *testing.T) {
161169
testFile := filepath.Join(testDir, "test4.wal")
162-
copyFile(fileName, testFile)
163-
170+
err = copyFile(fileName, testFile)
171+
assert.NoError(t, err)
164172
f, err := os.OpenFile(testFile, os.O_RDWR, walFilePermPrivateRW)
165173
assert.NoError(t, err)
166174
offset, err := f.Seek(-1, io.SeekEnd)
@@ -187,15 +195,16 @@ func TestLogRecordReader(t *testing.T) {
187195

188196
t.Run("corrupt record pad", func(t *testing.T) {
189197
testFile := filepath.Join(testDir, "test5.wal")
190-
copyFile(fileName, testFile)
198+
err = copyFile(fileName, testFile)
199+
assert.NoError(t, err)
191200
r, err := NewLogRecordReader(logger, fileName)
192201
assert.NoError(t, err)
193202
assert.NotNil(t, r)
194203
assertReadRecord(t, r, rec1, crc1)
195204
assertReadRecord(t, r, rec2, crc2)
196-
len, _, err := r.readHeader()
205+
length, _, err := r.readHeader()
197206
assert.NoError(t, err)
198-
assert.True(t, getPadSize(int(len)) > 0)
207+
assert.True(t, getPadSize(int(length)) > 0)
199208
_ = r.Close()
200209

201210
f, err := os.OpenFile(testFile, os.O_RDWR, walFilePermPrivateRW)
@@ -205,7 +214,7 @@ func TestLogRecordReader(t *testing.T) {
205214
b := []byte{0}
206215
_, err = f.Read(b)
207216
assert.NoError(t, err)
208-
b[0] = b[0] ^ 0x01
217+
b[0] ^= 0x01
209218
_, err = f.Seek(-1, io.SeekEnd)
210219
assert.NoError(t, err)
211220
_, err = f.Write(b)
@@ -227,7 +236,8 @@ func TestLogRecordReader(t *testing.T) {
227236

228237
t.Run("corrupt record", func(t *testing.T) {
229238
testFile := filepath.Join(testDir, "test6.wal")
230-
copyFile(fileName, testFile)
239+
err = copyFile(fileName, testFile)
240+
assert.NoError(t, err)
231241
r, err := NewLogRecordReader(logger, fileName)
232242
assert.NoError(t, err)
233243
assertReadRecord(t, r, rec1, crc1)

0 commit comments

Comments
 (0)