16
16
* See the License for the specific language governing permissions and
17
17
* limitations under the License.
18
18
*/
19
+
19
20
package org .apache .hadoop .hbase .replication ;
20
21
22
+ import static org .apache .hadoop .hbase .replication .TestReplicationEndpoint .ReplicationEndpointForTest ;
23
+ import static org .hamcrest .CoreMatchers .is ;
24
+ import static org .hamcrest .MatcherAssert .assertThat ;
21
25
import static org .junit .Assert .assertEquals ;
22
26
import static org .junit .Assert .assertNotNull ;
23
27
import static org .junit .Assert .assertNull ;
28
+ import static org .junit .Assert .assertTrue ;
29
+ import static org .mockito .Matchers .anyBoolean ;
30
+ import static org .mockito .Matchers .anyString ;
31
+ import static org .mockito .Mockito .mock ;
32
+ import static org .mockito .Mockito .verify ;
33
+ import static org .mockito .Mockito .when ;
34
+ import static org .mockito .internal .verification .VerificationModeFactory .times ;
24
35
25
36
import java .io .IOException ;
37
+ import java .util .Collections ;
38
+ import java .util .List ;
39
+ import java .util .Map ;
40
+ import java .util .NavigableMap ;
41
+ import java .util .TreeMap ;
42
+ import java .util .UUID ;
26
43
import java .util .concurrent .ExecutorService ;
27
44
import java .util .concurrent .Executors ;
28
45
import java .util .concurrent .Future ;
29
- import java .util .concurrent .atomic .AtomicLong ;
30
46
31
47
import org .apache .commons .logging .Log ;
32
48
import org .apache .commons .logging .LogFactory ;
63
79
import org .junit .BeforeClass ;
64
80
import org .junit .Test ;
65
81
import org .junit .experimental .categories .Category ;
66
- import org .mockito .Mockito ;
67
82
import org .mockito .ArgumentCaptor ;
68
-
69
- import java .util .Collections ;
70
- import java .util .List ;
71
- import java .util .Map ;
72
- import java .util .NavigableMap ;
73
- import java .util .TreeMap ;
74
- import java .util .UUID ;
75
-
76
- import static org .apache .hadoop .hbase .replication .TestReplicationEndpoint .ReplicationEndpointForTest ;
77
- import static org .hamcrest .CoreMatchers .is ;
78
- import static org .hamcrest .MatcherAssert .assertThat ;
79
- import static org .junit .Assert .assertTrue ;
80
- import static org .mockito .Matchers .anyBoolean ;
81
- import static org .mockito .Matchers .anyString ;
82
- import static org .mockito .Mockito .mock ;
83
- import static org .mockito .Mockito .verify ;
84
- import static org .mockito .Mockito .when ;
85
- import static org .mockito .internal .verification .VerificationModeFactory .times ;
83
+ import org .mockito .Mockito ;
86
84
87
85
@ Category (MediumTests .class )
88
86
public class TestReplicationSource {
@@ -114,8 +112,12 @@ public static void setUpBeforeClass() throws Exception {
114
112
115
113
@ Before
116
114
public void setup () throws IOException {
117
- if (!FS .exists (logDir )) FS .mkdirs (logDir );
118
- if (!FS .exists (oldLogDir )) FS .mkdirs (oldLogDir );
115
+ if (!FS .exists (logDir )) {
116
+ FS .mkdirs (logDir );
117
+ }
118
+ if (!FS .exists (oldLogDir )) {
119
+ FS .mkdirs (oldLogDir );
120
+ }
119
121
120
122
ReplicationEndpointForTest .contructedCount .set (0 );
121
123
ReplicationEndpointForTest .startedCount .set (0 );
@@ -126,8 +128,12 @@ public void setup() throws IOException {
126
128
127
129
@ After
128
130
public void tearDown () throws IOException {
129
- if (FS .exists (oldLogDir )) FS .delete (oldLogDir , true );
130
- if (FS .exists (logDir )) FS .delete (logDir , true );
131
+ if (FS .exists (oldLogDir )) {
132
+ FS .delete (oldLogDir , true );
133
+ }
134
+ if (FS .exists (logDir )) {
135
+ FS .delete (logDir , true );
136
+ }
131
137
}
132
138
133
139
@ AfterClass
@@ -224,8 +230,9 @@ public boolean evaluate() throws Exception {
224
230
225
231
}
226
232
227
- private void appendEntries (WALProvider .Writer writer , int numEntries , boolean closeAfterAppends ) throws IOException {
228
- for (int i = 0 ; i < numEntries ; i ++) {
233
+ private void appendEntries (WALProvider .Writer writer , int numEntries , boolean closeAfterAppends )
234
+ throws IOException {
235
+ for (int i = 0 ; i < numEntries ; i ++) {
229
236
byte [] b = Bytes .toBytes (Integer .toString (i ));
230
237
KeyValue kv = new KeyValue (b ,b ,b );
231
238
WALEdit edit = new WALEdit ();
@@ -255,7 +262,7 @@ private long getPosition(WALFactory wals, Path log2, int numEntries) throws IOEx
255
262
return reader .getPosition ();
256
263
}
257
264
258
- private static class Mocks {
265
+ private static final class Mocks {
259
266
private final ReplicationSourceManager manager = mock (ReplicationSourceManager .class );
260
267
private final ReplicationQueues queues = mock (ReplicationQueues .class );
261
268
private final ReplicationPeers peers = mock (ReplicationPeers .class );
@@ -268,7 +275,8 @@ private Mocks() {
268
275
when (context .getReplicationPeer ()).thenReturn (peer );
269
276
}
270
277
271
- ReplicationSource createReplicationSourceWithMocks (ReplicationEndpoint endpoint ) throws IOException {
278
+ ReplicationSource createReplicationSourceWithMocks (ReplicationEndpoint endpoint )
279
+ throws IOException {
272
280
final ReplicationSource source = new ReplicationSource ();
273
281
endpoint .init (context );
274
282
source .init (conf , FS , manager , queues , peers , mock (Stoppable .class ),
@@ -317,7 +325,8 @@ public WALEntryFilter getWALEntryfilter() {
317
325
ArgumentCaptor <Path > pathCaptor = ArgumentCaptor .forClass (Path .class );
318
326
ArgumentCaptor <Long > positionCaptor = ArgumentCaptor .forClass (Long .class );
319
327
verify (mocks .manager , times (1 ))
320
- .logPositionAndCleanOldLogs (pathCaptor .capture (), anyString (), positionCaptor .capture (), anyBoolean (), anyBoolean ());
328
+ .logPositionAndCleanOldLogs (pathCaptor .capture (), anyString (), positionCaptor .capture (),
329
+ anyBoolean (), anyBoolean ());
321
330
assertTrue (endpoint .lastEntries .size () == 5 );
322
331
assertThat (pathCaptor .getValue (), is (log2 ));
323
332
assertThat (positionCaptor .getValue (), is (pos ));
@@ -352,7 +361,8 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfEmptyWALsRolled() throws Exc
352
361
ArgumentCaptor <Long > positionCaptor = ArgumentCaptor .forClass (Long .class );
353
362
354
363
verify (mocks .manager , times (1 ))
355
- .logPositionAndCleanOldLogs (pathCaptor .capture (), anyString (), positionCaptor .capture (), anyBoolean (), anyBoolean ());
364
+ .logPositionAndCleanOldLogs (pathCaptor .capture (), anyString (), positionCaptor .capture (),
365
+ anyBoolean (), anyBoolean ());
356
366
assertThat (pathCaptor .getValue (), is (log2 ));
357
367
assertThat (positionCaptor .getValue (), is (startPos ));
358
368
}
@@ -362,7 +372,8 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exc
362
372
Mocks mocks = new Mocks ();
363
373
// set table cfs to filter all cells out
364
374
final TableName replicatedTable = TableName .valueOf ("replicated_table" );
365
- final Map <TableName , List <String >> cfs = Collections .singletonMap (replicatedTable , Collections .<String >emptyList ());
375
+ final Map <TableName , List <String >> cfs =
376
+ Collections .singletonMap (replicatedTable , Collections .<String >emptyList ());
366
377
when (mocks .peer .getTableCFs ()).thenReturn (cfs );
367
378
368
379
WALFactory wals = new WALFactory (TEST_UTIL .getConfiguration (), null , "test" );
@@ -391,17 +402,17 @@ public void testSetLogPositionAndRemoveOldWALsEvenIfNoCfsReplicated() throws Exc
391
402
ArgumentCaptor <Path > pathCaptor = ArgumentCaptor .forClass (Path .class );
392
403
ArgumentCaptor <Long > positionCaptor = ArgumentCaptor .forClass (Long .class );
393
404
394
- // all old wals should be removed by updating wal position, even if no cfs replicated doesn't exist
405
+ // all old wals should be removed by updating wal position, even if all cells are filtered out.
395
406
verify (mocks .manager , times (1 ))
396
- .logPositionAndCleanOldLogs (pathCaptor .capture (), anyString (), positionCaptor .capture (), anyBoolean (), anyBoolean ());
407
+ .logPositionAndCleanOldLogs (pathCaptor .capture (), anyString (), positionCaptor .capture (),
408
+ anyBoolean (), anyBoolean ());
397
409
assertThat (pathCaptor .getValue (), is (log2 ));
398
410
assertThat (positionCaptor .getValue (), is (pos ));
399
411
}
400
412
401
413
/**
402
414
* Tests that recovered queues are preserved on a regionserver shutdown.
403
415
* See HBASE-18192
404
- * @throws Exception
405
416
*/
406
417
@ Test
407
418
public void testServerShutdownRecoveredQueue () throws Exception {
0 commit comments