Skip to content

Commit e277b3e

Browse files
author
Iwasaki Yudai
committed
compactor: Make periodic compactor runs every hour
Closes #7868.
1 parent d1660b5 commit e277b3e

File tree

2 files changed

+36
-26
lines changed

2 files changed

+36
-26
lines changed

compactor/compactor.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ var (
3030
)
3131

3232
const (
33-
checkCompactionInterval = 5 * time.Minute
33+
checkCompactionInterval = 5 * time.Minute
34+
executeCompactionInterval = time.Hour
3435
)
3536

3637
type Compactable interface {
@@ -41,6 +42,8 @@ type RevGetter interface {
4142
Rev() int64
4243
}
4344

45+
// Periodic compacts the log by purging revisions older than
46+
// the configured retention time. Compaction happenes every hour.
4447
type Periodic struct {
4548
clock clockwork.Clock
4649
periodInHour int
@@ -85,19 +88,20 @@ func (t *Periodic) Run() {
8588
continue
8689
}
8790
}
88-
if clock.Now().Sub(last) < time.Duration(t.periodInHour)*time.Hour {
91+
92+
if clock.Now().Sub(last) < executeCompactionInterval {
8993
continue
9094
}
9195

92-
rev := t.getRev(t.periodInHour)
96+
rev, remaining := t.getRev(t.periodInHour)
9397
if rev < 0 {
9498
continue
9599
}
96100

97101
plog.Noticef("Starting auto-compaction at revision %d", rev)
98102
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
99103
if err == nil || err == mvcc.ErrCompacted {
100-
t.revs = make([]int64, 0)
104+
t.revs = remaining
101105
last = clock.Now()
102106
plog.Noticef("Finished auto-compaction at revision %d", rev)
103107
} else {
@@ -124,10 +128,13 @@ func (t *Periodic) Resume() {
124128
t.paused = false
125129
}
126130

127-
func (t *Periodic) getRev(h int) int64 {
131+
func (t *Periodic) getRev(h int) (int64, []int64) {
128132
i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval)
129133
if i < 0 {
130-
return -1
134+
return -1, t.revs
135+
}
136+
if i >= len(t.revs) {
137+
i = len(t.revs) - 1
131138
}
132-
return t.revs[i]
139+
return t.revs[i], t.revs[i+1:]
133140
}

compactor/compactor_test.go

+22-19
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@ import (
2626
)
2727

2828
func TestPeriodic(t *testing.T) {
29+
retentionHours := 2
30+
2931
fc := clockwork.NewFakeClock()
3032
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
3133
compactable := &fakeCompactable{testutil.NewRecorderStream()}
3234
tb := &Periodic{
3335
clock: fc,
34-
periodInHour: 1,
36+
periodInHour: retentionHours,
3537
rg: rg,
3638
c: compactable,
3739
}
@@ -40,24 +42,24 @@ func TestPeriodic(t *testing.T) {
4042
defer tb.Stop()
4143

4244
n := int(time.Hour / checkCompactionInterval)
43-
// collect 3 hours of revisions
44-
for i := 0; i < 3; i++ {
45+
// collect 5 hours of revisions
46+
for i := 0; i < 5; i++ {
4547
// advance one hour, one revision for each interval
4648
for j := 0; j < n; j++ {
47-
fc.Advance(checkCompactionInterval)
4849
rg.Wait(1)
50+
fc.Advance(checkCompactionInterval)
4951
}
50-
// ready to acknowledge hour "i"
51-
// block until compactor calls clock.After()
52-
fc.BlockUntil(1)
53-
// unblock the After()
54-
fc.Advance(checkCompactionInterval)
55-
a, err := compactable.Wait(1)
56-
if err != nil {
57-
t.Fatal(err)
58-
}
59-
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1}) {
60-
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: int64(i*n) + 1})
52+
53+
// compaction doesn't happen til 2 hours elapses
54+
if i+1 >= retentionHours {
55+
a, err := compactable.Wait(1)
56+
if err != nil {
57+
t.Fatal(err)
58+
}
59+
expectedRevision := int64(1 + (i+1)*n - retentionHours*n)
60+
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
61+
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
62+
}
6163
}
6264
}
6365
}
@@ -79,8 +81,8 @@ func TestPeriodicPause(t *testing.T) {
7981
// tb will collect 3 hours of revisions but not compact since paused
8082
n := int(time.Hour / checkCompactionInterval)
8183
for i := 0; i < 3*n; i++ {
82-
fc.Advance(checkCompactionInterval)
8384
rg.Wait(1)
85+
fc.Advance(checkCompactionInterval)
8486
}
8587
// tb ends up waiting for the clock
8688

@@ -93,14 +95,15 @@ func TestPeriodicPause(t *testing.T) {
9395
// tb resumes to being blocked on the clock
9496
tb.Resume()
9597

96-
// unblock clock, will kick off a compaction at hour 3
98+
// unblock clock, will kick off a compaction at hour 3:05
99+
rg.Wait(1)
97100
fc.Advance(checkCompactionInterval)
98101
a, err := compactable.Wait(1)
99102
if err != nil {
100103
t.Fatal(err)
101104
}
102-
// compact the revision from hour 2
103-
wreq := &pb.CompactionRequest{Revision: int64(2*n + 1)}
105+
// compact the revision from hour 2:05
106+
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
104107
if !reflect.DeepEqual(a[0].Params[0], wreq) {
105108
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
106109
}

0 commit comments

Comments
 (0)