Skip to content

Commit 60dbad5

Browse files
author
Iwasaki Yudai
committed
compactor: Make periodic compactor runs every hour
Closes #7868.
1 parent 3b251b0 commit 60dbad5

File tree

2 files changed

+32
-30
lines changed

2 files changed

+32
-30
lines changed

compactor/compactor.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ var (
3030
)
3131

3232
const (
33-
checkCompactionInterval = 5 * time.Minute
33+
checkCompactionInterval = 5 * time.Minute
34+
executeCompactionInterval = time.Hour
3435
)
3536

3637
type Compactable interface {
@@ -41,6 +42,8 @@ type RevGetter interface {
4142
Rev() int64
4243
}
4344

45+
// Periodic compacts the log by purging revisions older than
46+
// the configured retention time. Compaction happens hourly.
4447
type Periodic struct {
4548
clock clockwork.Clock
4649
periodInHour int
@@ -85,19 +88,20 @@ func (t *Periodic) Run() {
8588
continue
8689
}
8790
}
88-
if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour {
91+
92+
if clock.Now().Sub(last) < executeCompactionInterval {
8993
continue
9094
}
9195

92-
rev := t.getRev(t.periodInHour)
96+
rev, remaining := t.getRev(t.periodInHour)
9397
if rev < 0 {
9498
continue
9599
}
96100

97101
plog.Noticef("Starting auto-compaction at revision %d", rev)
98102
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
99103
if err == nil || err == mvcc.ErrCompacted {
100-
t.revs = make([]int64, 0)
104+
t.revs = remaining
101105
last = clock.Now()
102106
plog.Noticef("Finished auto-compaction at revision %d", rev)
103107
} else {
@@ -124,10 +128,10 @@ func (t *Periodic) Resume() {
124128
t.paused = false
125129
}
126130

127-
func (t *Periodic) getRev(h int) int64 {
131+
func (t *Periodic) getRev(h int) (int64, []int64) {
128132
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
129133
if i < 0 {
130-
return -1
134+
return -1, t.revs
131135
}
132-
return t.revs[i]
136+
return t.revs[i], t.revs[i+1:]
133137
}

compactor/compactor_test.go

+21-23
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ import (
2626
)
2727

2828
func TestPeriodic(t *testing.T) {
29+
retentionHours := 2
30+
2931
fc := clockwork.NewFakeClock()
3032
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
3133
compactable := &fakeCompactable{testutil.NewRecorderStream()}
3234
tb := &Periodic{
3335
clock: fc,
34-
periodInHour: 1,
36+
periodInHour: retentionHours,
3537
rg: rg,
3638
c: compactable,
3739
}
@@ -40,31 +42,26 @@ func TestPeriodic(t *testing.T) {
4042
defer tb.Stop()
4143

4244
n := int(time.Hour / checkCompactionInterval)
43-
// collect 3 hours of revisions
44-
for i := 0; i < 3; i++ {
45-
// advance one (hour - checkCompactionInterval), one revision for each interval
46-
for j := 0; j < n-1; j++ {
47-
_, err := rg.Wait(1)
48-
if err != nil {
49-
t.Fatal(err)
50-
}
45+
// collect 5 hours of revisions
46+
for i := 0; i < 5; i++ {
47+
// advance one hour, one revision for each interval
48+
for j := 0; j < n; j++ {
49+
rg.Wait(1)
5150
fc.Advance(checkCompactionInterval)
5251
}
53-
_, err := rg.Wait(1)
54-
if err != nil {
55-
t.Fatal(err)
52+
53+
// compaction doesn't happen til 2 hours elapses
54+
if i+1 < retentionHours {
55+
continue
5656
}
57-
// ready to acknowledge hour "i"
58-
// block until compactor calls clock.After()
59-
fc.BlockUntil(1)
60-
// unblock the After()
61-
fc.Advance(checkCompactionInterval)
57+
6258
a, err := compactable.Wait(1)
6359
if err != nil {
6460
t.Fatal(err)
6561
}
66-
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) {
67-
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1})
62+
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
63+
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
64+
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
6865
}
6966
}
7067

@@ -92,8 +89,8 @@ func TestPeriodicPause(t *testing.T) {
9289
// tb will collect 3 hours of revisions but not compact since paused
9390
n := int(time.Hour / checkCompactionInterval)
9491
for i := 0; i < 3*n; i++ {
95-
fc.Advance(checkCompactionInterval)
9692
rg.Wait(1)
93+
fc.Advance(checkCompactionInterval)
9794
}
9895
// tb ends up waiting for the clock
9996

@@ -106,14 +103,15 @@ func TestPeriodicPause(t *testing.T) {
106103
// tb resumes to being blocked on the clock
107104
tb.Resume()
108105

109-
// unblock clock, will kick off a compaction at hour 3
106+
// unblock clock, will kick off a compaction at hour 3:05
107+
rg.Wait(1)
110108
fc.Advance(checkCompactionInterval)
111109
a, err := compactable.Wait(1)
112110
if err != nil {
113111
t.Fatal(err)
114112
}
115-
// compact the revision from hour 2
116-
wreq := &pb.CompactionRequest{Revision: int64(2*n + 1)}
113+
// compact the revision from hour 2:05
114+
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
117115
if !reflect.DeepEqual(a[0].Params[0], wreq) {
118116
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
119117
}

0 commit comments

Comments
 (0)