Skip to content

Commit b3078b5

Browse files
committed
fix batch code comments
1 parent aa0af20 commit b3078b5

File tree

3 files changed

+23
-30
lines changed

3 files changed

+23
-30
lines changed

segment.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,21 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB
180180
return nil, ErrClosed
181181
}
182182

183+
// if the left block size can not hold the chunk header, padding the block
183184
if seg.currentBlockSize+chunkHeaderSize >= blockSize {
184185
// padding if necessary
185186
if seg.currentBlockSize < blockSize {
186-
chunkBuffer.Write(make([]byte, blockSize-seg.currentBlockSize))
187+
p := make([]byte, blockSize-seg.currentBlockSize)
188+
chunkBuffer.B = append(chunkBuffer.B, p...)
187189
padding += blockSize - seg.currentBlockSize
190+
191+
// a new block
188192
seg.currentBlockNumber += 1
189193
seg.currentBlockSize = 0
190194
}
191-
192195
}
196+
197+
// return the start position of the chunk, for reading.
193198
position := &ChunkPosition{
194199
SegmentId: seg.id,
195200
BlockNumber: seg.currentBlockNumber,
@@ -240,9 +245,12 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB
240245

241246
position.ChunkSize = blockCount*chunkHeaderSize + dataSize
242247
}
248+
249+
// the buffer length must be equal to chunkSize+padding length
243250
endLen := chunkBuffer.Len()
244251
if position.ChunkSize+padding != uint32(endLen-startLen) {
245-
panic(fmt.Sprintf("chunk size %d, len %d", position.ChunkSize, endLen-startLen-int(padding)))
252+
panic(fmt.Sprintf("wrong!!! the chunk size %d is not equal to the buffer len %d",
253+
position.ChunkSize+padding, endLen-startLen))
246254
}
247255

248256
// update segment status
@@ -261,6 +269,7 @@ func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err err
261269
return nil, ErrClosed
262270
}
263271

272+
// if any error occurs, restore the segment status
264273
originBlockNumber := seg.currentBlockNumber
265274
originBlockSize := seg.currentBlockSize
266275

@@ -276,6 +285,7 @@ func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err err
276285
bytebufferpool.Put(chunkBuffer)
277286
}()
278287

288+
// write all data to the chunk buffer
279289
var pos *ChunkPosition
280290
positions = make([]*ChunkPosition, len(data))
281291
for i := 0; i < len(positions); i++ {
@@ -285,6 +295,7 @@ func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err err
285295
}
286296
positions[i] = pos
287297
}
298+
// write the chunk buffer to the segment file
288299
if err = seg.writeChunkBuffer(chunkBuffer); err != nil {
289300
return
290301
}

wal.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ func (wal *WAL) ClearPendingWrites() {
310310
}
311311

312312
// PendingWrites add data to wal.pendingWrites and wait for batch write.
313-
// If the data in pendingWrites exceeds the size of one segment, it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites
313+
// If the data in pendingWrites exceeds the size of one segment,
314+
// it will return a 'ErrPendingSizeTooLarge' error and clear the pendingWrites.
314315
func (wal *WAL) PendingWrites(data []byte) error {
315316
wal.pendingWritesLock.Lock()
316317
defer wal.pendingWritesLock.Unlock()
@@ -326,8 +327,8 @@ func (wal *WAL) PendingWrites(data []byte) error {
326327
return nil
327328
}
328329

329-
// flushActiveSegment create a new segment file and replace the activeSegment.
330-
func (wal *WAL) flushActiveSegment() error {
330+
// rotateActiveSegment create a new segment file and replace the activeSegment.
331+
func (wal *WAL) rotateActiveSegment() error {
331332
if err := wal.activeSegment.Sync(); err != nil {
332333
return err
333334
}
@@ -354,17 +355,20 @@ func (wal *WAL) WriteAll() ([]*ChunkPosition, error) {
354355
wal.mu.Unlock()
355356
}()
356357

358+
// if the active segment file is full, sync it and create a new one.
357359
if wal.activeSegment.Size()+wal.pendingSize > wal.options.SegmentSize {
358-
if err := wal.flushActiveSegment(); err != nil {
360+
if err := wal.rotateActiveSegment(); err != nil {
359361
return nil, err
360362
}
361363
}
362364

365+
// write all data to the active segment file.
363366
positions, err := wal.activeSegment.writeAll(wal.pendingWrites)
364367
if err != nil {
365368
return nil, err
366369
}
367370

371+
// sync the active segment file to ensure the durability.
368372
if err := wal.activeSegment.Sync(); err != nil {
369373
return nil, err
370374
}
@@ -383,7 +387,7 @@ func (wal *WAL) Write(data []byte) (*ChunkPosition, error) {
383387
}
384388
// if the active segment file is full, sync it and create a new one.
385389
if wal.isFull(int64(len(data))) {
386-
if err := wal.flushActiveSegment(); err != nil {
390+
if err := wal.rotateActiveSegment(); err != nil {
387391
return nil, err
388392
}
389393
}

wal_test.go

-22
Original file line numberDiff line numberDiff line change
@@ -331,25 +331,3 @@ func TestWAL_RenameFileExt(t *testing.T) {
331331
assert.Nil(t, err)
332332
}
333333
}
334-
335-
func TestWAL_calSizeUpperBound(t *testing.T) {
336-
dir, _ := os.MkdirTemp("", "wal-test-TestWAL_calSizeUpperBound")
337-
opts := Options{
338-
DirPath: dir,
339-
SegmentFileExt: ".VLOG.1.temp",
340-
SegmentSize: 8 * 1024 * 1024,
341-
BlockCache: 32 * KB * 10,
342-
}
343-
wal, err := Open(opts)
344-
assert.Nil(t, err)
345-
defer destroyWAL(wal)
346-
347-
size := wal.maxDataWriteSize(int64(0))
348-
assert.Equal(t, int64(14), size)
349-
350-
size = wal.maxDataWriteSize(int64(32761))
351-
assert.Equal(t, int64(32775), size)
352-
353-
size = wal.maxDataWriteSize(int64(32769))
354-
assert.Equal(t, int64(32790), size)
355-
}

0 commit comments

Comments
 (0)