Skip to content

Commit da9f14e

Browse files
committed
compactor: simplify interval logic on periodic compactor
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
1 parent 501c80f commit da9f14e

File tree

2 files changed

+105
-95
lines changed

2 files changed

+105
-95
lines changed

compactor/periodic.go

+65-63
Original file line numberDiff line numberDiff line change
@@ -61,81 +61,85 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact
6161
return t
6262
}
6363

64+
/*
65+
Compaction period 1-hour:
66+
1. compute compaction period, which is 1-hour
67+
2. record revisions for every 1/10 of 1-hour (6-minute)
68+
3. keep recording revisions with no compaction for first 1-hour
69+
4. do compact with revs[0]
70+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
71+
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
72+
73+
Compaction period 24-hour:
74+
1. compute compaction period, which is 1-hour
75+
2. record revisions for every 1/10 of 1-hour (6-minute)
76+
3. keep recording revisions with no compaction for first 24-hour
77+
4. do compact with revs[0]
78+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
79+
- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
80+
81+
Compaction period 59-min:
82+
1. compute compaction period, which is 59-min
83+
2. record revisions for every 1/10 of 59-min (5.9-min)
84+
3. keep recording revisions with no compaction for first 59-min
85+
4. do compact with revs[0]
86+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
87+
- failure? update revs, and retry after 1/10 of 59-min (5.9-min)
88+
89+
Compaction period 5-sec:
90+
1. compute compaction period, which is 5-sec
91+
2. record revisions for every 1/10 of 5-sec (0.5-sec)
92+
3. keep recording revisions with no compaction for first 5-sec
93+
4. do compact with revs[0]
94+
- success? contiue on for-loop and move sliding window; revs = revs[1:]
95+
- failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
96+
*/
97+
98+
// Run runs periodic compactor.
6499
func (t *Periodic) Run() {
65-
fetchInterval := t.getFetchInterval()
100+
compactInterval := t.getCompactInterval()
66101
retryInterval := t.getRetryInterval()
67-
retentions := int(t.period/fetchInterval) + 1 // number of revs to keep for t.period
68-
notify := make(chan struct{}, 1)
102+
retentions := t.getRetentions()
69103

70-
// periodically updates t.revs and notify to the other goroutine
71104
go func() {
105+
lastSuccess := t.clock.Now()
106+
baseInterval := t.period
72107
for {
73-
rev := t.rg.Rev()
74-
t.mu.Lock()
75-
t.revs = append(t.revs, rev)
108+
t.revs = append(t.revs, t.rg.Rev())
76109
if len(t.revs) > retentions {
77110
t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
78111
}
79-
t.mu.Unlock()
80-
81-
select {
82-
case notify <- struct{}{}:
83-
default:
84-
// compaction can take time more than interval
85-
}
86112

87113
select {
88114
case <-t.ctx.Done():
89115
return
90-
case <-t.clock.After(fetchInterval):
91-
}
92-
}
93-
}()
94-
95-
// run compaction triggered by the other goroutine thorough the notify channel
96-
// or internal periodic retry
97-
go func() {
98-
var lastCompactedRev int64
99-
for {
100-
select {
101-
case <-t.ctx.Done():
102-
return
103-
case <-notify:
104-
// from the other goroutine
105116
case <-t.clock.After(retryInterval):
106-
// for retry
107-
// when t.rev is not updated, this event will be ignored later,
108-
// so we don't need to think about race with <-notify.
117+
t.mu.Lock()
118+
p := t.paused
119+
t.mu.Unlock()
120+
if p {
121+
continue
122+
}
109123
}
110124

111-
t.mu.Lock()
112-
p := t.paused
113-
rev := t.revs[0]
114-
len := len(t.revs)
115-
t.mu.Unlock()
116-
if p {
125+
if t.clock.Now().Sub(lastSuccess) < baseInterval {
117126
continue
118127
}
119128

120-
// it's too early to start working
121-
if len != retentions {
122-
continue
123-
}
124-
125-
// if t.revs is not updated, we can ignore the event.
126-
// it's not the first time to try comapction in this interval.
127-
if rev == lastCompactedRev {
128-
continue
129+
// wait up to initial given period
130+
if baseInterval == t.period {
131+
baseInterval = compactInterval
129132
}
133+
rev := t.revs[0]
130134

131135
plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
132136
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
133137
if err == nil || err == mvcc.ErrCompacted {
138+
lastSuccess = t.clock.Now()
134139
plog.Noticef("Finished auto-compaction at revision %d", rev)
135-
lastCompactedRev = rev
136140
} else {
137141
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
138-
plog.Noticef("Retry after %s", retryInterval)
142+
plog.Noticef("Retry after %v", retryInterval)
139143
}
140144
}
141145
}()
@@ -145,43 +149,41 @@ func (t *Periodic) Run() {
145149
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
146150
// if given compaction period x is >1-hour, compact every hour.
147151
// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
148-
func (t *Periodic) getFetchInterval() time.Duration {
152+
func (t *Periodic) getCompactInterval() time.Duration {
149153
itv := t.period
150154
if itv > time.Hour {
151155
itv = time.Hour
152156
}
153157
return itv
154158
}
155159

160+
func (t *Periodic) getRetentions() int {
161+
return int(t.period / t.getRetryInterval())
162+
}
163+
156164
const retryDivisor = 10
157165

158166
func (t *Periodic) getRetryInterval() time.Duration {
159-
itv := t.period / retryDivisor
160-
// we don't want to too aggressive retries
161-
// and also jump between 6-minute through 60-minute
162-
if itv < (6 * time.Minute) { // t.period is less than hour
163-
// if t.period is less than 6-minute,
164-
// retry interval is t.period.
165-
// if we divide byretryDivisor, it's too aggressive
166-
if t.period < 6*time.Minute {
167-
itv = t.period
168-
} else {
169-
itv = 6 * time.Minute
170-
}
167+
itv := t.period
168+
if itv > time.Hour {
169+
itv = time.Hour
171170
}
172-
return itv
171+
return itv / retryDivisor
173172
}
174173

174+
// Stop stops periodic compactor.
175175
func (t *Periodic) Stop() {
176176
t.cancel()
177177
}
178178

179+
// Pause pauses periodic compactor.
179180
func (t *Periodic) Pause() {
180181
t.mu.Lock()
181182
defer t.mu.Unlock()
182183
t.paused = true
183184
}
184185

186+
// Resume resumes periodic compactor.
185187
func (t *Periodic) Resume() {
186188
t.mu.Lock()
187189
defer t.mu.Unlock()

compactor/periodic_test.go

+40-32
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,30 @@ func TestPeriodicHourly(t *testing.T) {
3636

3737
tb.Run()
3838
defer tb.Stop()
39-
// simulate 5 hours
4039

41-
for i := 0; i < 5; i++ {
40+
n1 := tb.getRetentions()
41+
42+
// compaction doesn't happen til 2 hours elapses
43+
for i := 0; i < n1; i++ {
4244
rg.Wait(1)
43-
fc.Advance(time.Hour)
44-
// compaction doesn't happen til 2 hours elapses.
45-
if i < retentionHours {
46-
continue
47-
}
48-
// after 2 hours, compaction happens at every interval.
49-
// at i = 3, t.revs = [1(2h-ago,T=0h), 2(1h-ago,T=1h), 3(now,T=2h)] (len=3) (rev starts from 1)
45+
fc.Advance(tb.getRetryInterval())
46+
}
47+
48+
// simulate 5 hours
49+
// now compactor kicks in, every hour
50+
for i := 0; i < 5; i++ {
5051
a, err := compactable.Wait(1)
5152
if err != nil {
5253
t.Fatal(err)
5354
}
54-
expectedRevision := int64(i - 1)
55+
56+
expectedRevision := int64(i + 1)
5557
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
5658
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
5759
}
60+
61+
rg.Wait(1)
62+
fc.Advance(tb.getCompactInterval())
5863
}
5964
}
6065

@@ -70,25 +75,29 @@ func TestPeriodicMinutes(t *testing.T) {
7075
tb.Run()
7176
defer tb.Stop()
7277

73-
// simulate 115 (23 * 5) minutes
74-
for i := 0; i < 5; i++ {
78+
n1 := tb.getRetentions()
79+
80+
// compaction doesn't happen til 2 hours elapses
81+
for i := 0; i < n1; i++ {
7582
rg.Wait(1)
76-
fc.Advance(retentionDuration)
83+
fc.Advance(tb.getRetryInterval())
84+
}
7785

78-
// notting happens at T=0
79-
if i == 0 {
80-
continue
81-
}
82-
// from T=23m (i=1), compaction happens at every interval
86+
// simulate 115 (23 * 5) minutes
87+
// compaction happens at every interval
88+
for i := 0; i < 5; i++ {
8389
a, err := compactable.Wait(1)
8490
if err != nil {
8591
t.Fatal(err)
8692
}
87-
expectedRevision := int64(i)
93+
94+
expectedRevision := int64(i + 1)
8895
if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
8996
t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
9097
}
9198

99+
rg.Wait(1)
100+
fc.Advance(tb.getCompactInterval())
92101
}
93102
}
94103

@@ -102,18 +111,14 @@ func TestPeriodicPause(t *testing.T) {
102111
tb.Run()
103112
tb.Pause()
104113

114+
n1 := tb.getRetentions()
115+
105116
// tb will collect 3 hours of revisions but not compact since paused
106-
// T=0
107-
rg.Wait(1) // t.revs = [1]
108-
fc.Advance(time.Hour)
109-
// T=1h
110-
rg.Wait(1) // t.revs = [1, 2]
111-
fc.Advance(time.Hour)
112-
// T=2h
113-
rg.Wait(1) // t.revs = [2, 3]
114-
fc.Advance(time.Hour)
115-
// T=3h
116-
rg.Wait(1) // t.revs = [3, 4]
117+
for i := 0; i < n1*3; i++ {
118+
rg.Wait(1)
119+
fc.Advance(tb.getRetryInterval())
120+
}
121+
// t.revs = [21 22 23 24 25 26 27 28 29 30]
117122

118123
select {
119124
case a := <-compactable.Chan():
@@ -123,16 +128,19 @@ func TestPeriodicPause(t *testing.T) {
123128

124129
// tb resumes to being blocked on the clock
125130
tb.Resume()
131+
rg.Wait(1)
126132

127133
// unblock clock, will kick off a compaction at T=3h6m by retry
128-
fc.Advance(time.Minute * 6)
134+
fc.Advance(tb.getRetryInterval())
135+
129136
// T=3h6m
130137
a, err := compactable.Wait(1)
131138
if err != nil {
132139
t.Fatal(err)
133140
}
141+
134142
// compact the revision from T=3h
135-
wreq := &pb.CompactionRequest{Revision: int64(3)}
143+
wreq := &pb.CompactionRequest{Revision: int64(22)}
136144
if !reflect.DeepEqual(a[0].Params[0], wreq) {
137145
t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
138146
}

0 commit comments

Comments
 (0)