Skip to content

Commit 1801854

Browse files
committed
compactor: simplify interval logic on periodic compactor
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
1 parent 501c80f commit 1801854

File tree

2 files changed

+132
-97
lines changed

2 files changed

+132
-97
lines changed

compactor/periodic.go

+65-63
Original file line numberDiff line numberDiff line change
@@ -61,81 +61,85 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact
6161
return t
6262
}
6363

64+
/*
65+
Compaction period 1-hour:
66+
1. compute compaction period, which is 1-hour
67+
2. record revisions for every 1/10 of 1-hour (6-minute)
68+
3. keep recording revisions with no compaction for first 1-hour
69+
4. do compact with revs[0]
70+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
71+
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
72+
73+
Compaction period 24-hour:
74+
1. compute compaction period, which is 1-hour
75+
2. record revisions for every 1/10 of 1-hour (6-minute)
76+
3. keep recording revisions with no compaction for first 24-hour
77+
4. do compact with revs[0]
78+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
79+
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
80+
81+
Compaction period 59-min:
82+
1. compute compaction period, which is 59-min
83+
2. record revisions for every 1/10 of 59-min (5.9-min)
84+
3. keep recording revisions with no compaction for first 59-min
85+
4. do compact with revs[0]
86+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
87+
- failure? update revs, and retry after 1/10 of 59-min (5.9-min)
88+
89+
Compaction period 5-sec:
90+
1. compute compaction period, which is 5-sec
91+
2. record revisions for every 1/10 of 5-sec (0.5-sec)
92+
3. keep recording revisions with no compaction for first 5-sec
93+
4. do compact with revs[0]
94+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
95+
- failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
96+
*/
97+
98+
// Run runs periodic compactor.
6499
func (t *Periodic) Run() {
65-
fetchInterval := t.getFetchInterval()
100+
compactInterval := t.getCompactInterval()
66101
retryInterval := t.getRetryInterval()
67-
retentions := int(t.period/fetchInterval) + 1 // number of revs to keep for t.period
68-
notify := make(chan struct{}, 1)
102+
retentions := t.getRetentions()
69103

70-
// periodically updates t.revs and notify to the other goroutine
71104
go func() {
105+
lastSuccess := t.clock.Now()
106+
baseInterval := t.period
72107
for {
73-
rev := t.rg.Rev()
74-
t.mu.Lock()
75-
t.revs = append(t.revs, rev)
108+
t.revs = append(t.revs, t.rg.Rev())
76109
if len(t.revs) > retentions {
77110
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
78111
}
79-
t.mu.Unlock()
80-
81-
select {
82-
case notify <- struct{}{}:
83-
default:
84-
// compaction can take time more than interval
85-
}
86112

87113
select {
88114
case <-t.ctx.Done():
89115
return
90-
case <-t.clock.After(fetchInterval):
91-
}
92-
}
93-
}()
94-
95-
// run compaction triggered by the other goroutine thorough the notify channel
96-
// or internal periodic retry
97-
go func() {
98-
var lastCompactedRev int64
99-
for {
100-
select {
101-
case <-t.ctx.Done():
102-
return
103-
case <-notify:
104-
// from the other goroutine
105116
case <-t.clock.After(retryInterval):
106-
// for retry
107-
// when t.rev is not updated, this event will be ignored later,
108-
// so we don't need to think about race with <-notify.
117+
t.mu.Lock()
118+
p := t.paused
119+
t.mu.Unlock()
120+
if p {
121+
continue
122+
}
109123
}
110124

111-
t.mu.Lock()
112-
p := t.paused
113-
rev := t.revs[0]
114-
len := len(t.revs)
115-
t.mu.Unlock()
116-
if p {
125+
if t.clock.Now().Sub(lastSuccess) < baseInterval {
117126
continue
118127
}
119128

120-
// it's too early to start working
121-
if len != retentions {
122-
continue
123-
}
124-
125-
// if t.revs is not updated, we can ignore the event.
126-
// it's not the first time to try comapction in this interval.
127-
if rev == lastCompactedRev {
128-
continue
129+
// wait up to initial given period
130+
if baseInterval == t.period {
131+
baseInterval = compactInterval
129132
}
133+
rev := t.revs[0]
130134

131135
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
132136
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
133137
if err == nil || err == mvcc.ErrCompacted {
138+
lastSuccess = t.clock.Now()
134139
plog.Noticef("Finished auto-compaction at revision %d", rev)
135-
lastCompactedRev = rev
136140
} else {
137141
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
138-
plog.Noticef("Retry after %s", retryInterval)
142+
plog.Noticef("Retry after %v", retryInterval)
139143
}
140144
}
141145
}()
@@ -145,43 +149,41 @@ func (t *Periodic) Run() {
145149
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
146150
// if given compaction period x is >1-hour, compact every hour.
147151
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
148-
func (t *Periodic) getFetchInterval() time.Duration {
152+
func (t *Periodic) getCompactInterval() time.Duration {
149153
itv := t.period
150154
if itv > time.Hour {
151155
itv = time.Hour
152156
}
153157
return itv
154158
}
155159

160+
func (t *Periodic) getRetentions() int {
161+
return int(t.period/t.getRetryInterval()) + 1
162+
}
163+
156164
const retryDivisor = 10
157165

158166
func (t *Periodic) getRetryInterval() time.Duration {
159-
itv := t.period / retryDivisor
160-
// we don't want to too aggressive retries
161-
// and also jump between 6-minute through 60-minute
162-
if itv < (6 * time.Minute) { // t.period is less than hour
163-
// if t.period is less than 6-minute,
164-
// retry interval is t.period.
165-
// if we divide byretryDivisor, it's too aggressive
166-
if t.period < 6*time.Minute {
167-
itv = t.period
168-
} else {
169-
itv = 6 * time.Minute
170-
}
167+
itv := t.period
168+
if itv > time.Hour {
169+
itv = time.Hour
171170
}
172-
return itv
171+
return itv / retryDivisor
173172
}
174173

174+
// Stop stops periodic compactor.
175175
func (t *Periodic) Stop() {
176176
t.cancel()
177177
}
178178

179+
// Pause pauses periodic compactor.
179180
func (t *Periodic) Pause() {
180181
t.mu.Lock()
181182
defer t.mu.Unlock()
182183
t.paused = true
183184
}
184185

186+
// Resume resumes periodic compactor.
185187
func (t *Periodic) Resume() {
186188
t.mu.Lock()
187189
defer t.mu.Unlock()

compactor/periodic_test.go

+67-34
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,48 @@ func TestPeriodicHourly(t *testing.T) {
3636

3737
tb.Run()
3838
defer tb.Stop()
39-
// simulate 5 hours
4039

41-
for i := 0; i < 5; i++ {
40+
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
41+
42+
// compaction doesn't happen til 2 hours elapse
43+
for i := 0; i < initialIntervals-1; i++ {
4244
rg.Wait(1)
43-
fc.Advance(time.Hour)
44-
// compaction doesn't happen til 2 hours elapses.
45-
if i < retentionHours {
46-
continue
45+
fc.Advance(tb.getRetryInterval())
46+
}
47+
48+
// very first compaction
49+
a, err := compactable.Wait(1)
50+
if err != nil {
51+
t.Fatal(err)
52+
}
53+
expectedRevision := int64(1)
54+
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
55+
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
56+
}
57+
58+
// simulate 3 hours
59+
// now compactor kicks in, every hour
60+
for i := 0; i < 3; i++ {
61+
// advance one hour, one revision for each interval
62+
for j := 0; j < intervalsPerPeriod; j++ {
63+
rg.Wait(1)
64+
fc.Advance(tb.getRetryInterval())
4765
}
48-
// after 2 hours, compaction happens at every interval.
49-
// at i = 3, t.revs = [1(2h-ago,T=0h), 2(1h-ago,T=1h), 3(now,T=2h)] (len=3) (rev starts from 1)
50-
a, err := compactable.Wait(1)
66+
67+
a, err = compactable.Wait(1)
5168
if err != nil {
5269
t.Fatal(err)
5370
}
54-
expectedRevision := int64(i - 1)
71+
72+
expectedRevision = int64((i + 1) * 10)
5573
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
5674
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
5775
}
5876
}
5977
}
6078

6179
func TestPeriodicMinutes(t *testing.T) {
62-
retentionMinutes := 23
80+
retentionMinutes := 5
6381
retentionDuration := time.Duration(retentionMinutes) * time.Minute
6482

6583
fc := clockwork.NewFakeClock()
@@ -70,25 +88,41 @@ func TestPeriodicMinutes(t *testing.T) {
7088
tb.Run()
7189
defer tb.Stop()
7290

73-
// simulate 115 (23 * 5) minutes
74-
for i := 0; i < 5; i++ {
91+
initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
92+
93+
// compaction doesn't happen til 5 hours elapses
94+
for i := 0; i < initialIntervals-1; i++ {
7595
rg.Wait(1)
76-
fc.Advance(retentionDuration)
96+
fc.Advance(tb.getRetryInterval())
97+
}
7798

78-
// notting happens at T=0
79-
if i == 0 {
80-
continue
99+
// very first compaction
100+
a, err := compactable.Wait(1)
101+
if err != nil {
102+
t.Fatal(err)
103+
}
104+
expectedRevision := int64(1)
105+
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
106+
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
107+
}
108+
109+
// compaction happens at every interval
110+
for i := 0; i < 5; i++ {
111+
// advance 5-minute, one revision for each interval
112+
for j := 0; j < intervalsPerPeriod; j++ {
113+
rg.Wait(1)
114+
fc.Advance(tb.getRetryInterval())
81115
}
82-
// from T=23m (i=1), compaction happens at every interval
116+
83117
a, err := compactable.Wait(1)
84118
if err != nil {
85119
t.Fatal(err)
86120
}
87-
expectedRevision := int64(i)
121+
122+
expectedRevision = int64((i + 1) * 10)
88123
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
89124
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
90125
}
91-
92126
}
93127
}
94128

@@ -102,18 +136,14 @@ func TestPeriodicPause(t *testing.T) {
102136
tb.Run()
103137
tb.Pause()
104138

139+
n := tb.getRetentions()
140+
105141
// tb will collect 3 hours of revisions but not compact since paused
106-
// T=0
107-
rg.Wait(1) // t.revs = [1]
108-
fc.Advance(time.Hour)
109-
// T=1h
110-
rg.Wait(1) // t.revs = [1, 2]
111-
fc.Advance(time.Hour)
112-
// T=2h
113-
rg.Wait(1) // t.revs = [2, 3]
114-
fc.Advance(time.Hour)
115-
// T=3h
116-
rg.Wait(1) // t.revs = [3, 4]
142+
for i := 0; i < n*3; i++ {
143+
rg.Wait(1)
144+
fc.Advance(tb.getRetryInterval())
145+
}
146+
// t.revs = [21 22 23 24 25 26 27 28 29 30]
117147

118148
select {
119149
case a := <-compactable.Chan():
@@ -123,16 +153,19 @@ func TestPeriodicPause(t *testing.T) {
123153

124154
// tb resumes to being blocked on the clock
125155
tb.Resume()
156+
rg.Wait(1)
126157

127158
// unblock clock, will kick off a compaction at T=3h6m by retry
128-
fc.Advance(time.Minute * 6)
159+
fc.Advance(tb.getRetryInterval())
160+
129161
// T=3h6m
130162
a, err := compactable.Wait(1)
131163
if err != nil {
132164
t.Fatal(err)
133165
}
134-
// compact the revision from T=3h
135-
wreq := &pb.CompactionRequest{Revision: int64(3)}
166+
167+
// compact the revision from hour 2:06
168+
wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
136169
if !reflect.DeepEqual(a[0].Params[0], wreq) {
137170
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
138171
}

0 commit comments

Comments
 (0)