@@ -26,12 +26,14 @@ import (
26
26
)
27
27
28
28
func TestPeriodic (t * testing.T ) {
29
+ retentionHours := 2
30
+
29
31
fc := clockwork .NewFakeClock ()
30
32
rg := & fakeRevGetter {testutil .NewRecorderStream (), 0 }
31
33
compactable := & fakeCompactable {testutil .NewRecorderStream ()}
32
34
tb := & Periodic {
33
35
clock : fc ,
34
- periodInHour : 1 ,
36
+ periodInHour : retentionHours ,
35
37
rg : rg ,
36
38
c : compactable ,
37
39
}
@@ -40,31 +42,26 @@ func TestPeriodic(t *testing.T) {
40
42
defer tb .Stop ()
41
43
42
44
n := int (time .Hour / checkCompactionInterval )
43
- // collect 3 hours of revisions
44
- for i := 0 ; i < 3 ; i ++ {
45
- // advance one (hour - checkCompactionInterval), one revision for each interval
46
- for j := 0 ; j < n - 1 ; j ++ {
47
- _ , err := rg .Wait (1 )
48
- if err != nil {
49
- t .Fatal (err )
50
- }
45
+ // collect 5 hours of revisions
46
+ for i := 0 ; i < 5 ; i ++ {
47
+ // advance one hour, one revision for each interval
48
+ for j := 0 ; j < n ; j ++ {
49
+ rg .Wait (1 )
51
50
fc .Advance (checkCompactionInterval )
52
51
}
53
- _ , err := rg .Wait (1 )
54
- if err != nil {
55
- t .Fatal (err )
52
+
53
+ // compaction doesn't happen til 2 hours elapses
54
+ if i + 1 < retentionHours {
55
+ continue
56
56
}
57
- // ready to acknowledge hour "i"
58
- // block until compactor calls clock.After()
59
- fc .BlockUntil (1 )
60
- // unblock the After()
61
- fc .Advance (checkCompactionInterval )
57
+
62
58
a , err := compactable .Wait (1 )
63
59
if err != nil {
64
60
t .Fatal (err )
65
61
}
66
- if ! reflect .DeepEqual (a [0 ].Params [0 ], & pb.CompactionRequest {Revision : int64 (i * n ) + 1 }) {
67
- t .Errorf ("compact request = %v, want %v" , a [0 ].Params [0 ], & pb.CompactionRequest {Revision : int64 (i * n ) + 1 })
62
+ expectedRevision := int64 (1 + (i + 1 )* n - retentionHours * n )
63
+ if ! reflect .DeepEqual (a [0 ].Params [0 ], & pb.CompactionRequest {Revision : expectedRevision }) {
64
+ t .Errorf ("compact request = %v, want %v" , a [0 ].Params [0 ], & pb.CompactionRequest {Revision : expectedRevision })
68
65
}
69
66
}
70
67
@@ -92,8 +89,8 @@ func TestPeriodicPause(t *testing.T) {
92
89
// tb will collect 3 hours of revisions but not compact since paused
93
90
n := int (time .Hour / checkCompactionInterval )
94
91
for i := 0 ; i < 3 * n ; i ++ {
95
- fc .Advance (checkCompactionInterval )
96
92
rg .Wait (1 )
93
+ fc .Advance (checkCompactionInterval )
97
94
}
98
95
// tb ends up waiting for the clock
99
96
@@ -106,14 +103,15 @@ func TestPeriodicPause(t *testing.T) {
106
103
// tb resumes to being blocked on the clock
107
104
tb .Resume ()
108
105
109
- // unblock clock, will kick off a compaction at hour 3
106
+ // unblock clock, will kick off a compaction at hour 3:05
107
+ rg .Wait (1 )
110
108
fc .Advance (checkCompactionInterval )
111
109
a , err := compactable .Wait (1 )
112
110
if err != nil {
113
111
t .Fatal (err )
114
112
}
115
- // compact the revision from hour 2
116
- wreq := & pb.CompactionRequest {Revision : int64 (2 * n + 1 )}
113
+ // compact the revision from hour 2:05
114
+ wreq := & pb.CompactionRequest {Revision : int64 (1 + 2 * n + 1 )}
117
115
if ! reflect .DeepEqual (a [0 ].Params [0 ], wreq ) {
118
116
t .Errorf ("compact request = %v, want %v" , a [0 ].Params [0 ], wreq .Revision )
119
117
}
0 commit comments