18
18
package org .apache .hadoop .hbase .io .hfile ;
19
19
20
20
import static org .junit .Assert .assertEquals ;
21
+ import static org .junit .Assert .assertFalse ;
22
+ import static org .junit .Assert .assertTrue ;
23
+ import static org .mockito .ArgumentMatchers .any ;
24
+ import static org .mockito .ArgumentMatchers .anyInt ;
25
+ import static org .mockito .ArgumentMatchers .anyString ;
26
+ import static org .mockito .Mockito .mock ;
27
+ import static org .mockito .Mockito .when ;
21
28
22
- import com .thimbleware .jmemcached .CacheElement ;
23
- import com .thimbleware .jmemcached .CacheImpl ;
24
- import com .thimbleware .jmemcached .Key ;
25
- import com .thimbleware .jmemcached .LocalCacheElement ;
26
- import com .thimbleware .jmemcached .MemCacheDaemon ;
27
- import com .thimbleware .jmemcached .storage .hash .ConcurrentLinkedHashMap ;
28
- import com .thimbleware .jmemcached .storage .hash .ConcurrentLinkedHashMap .EvictionPolicy ;
29
+ import java .io .IOException ;
29
30
import java .net .InetSocketAddress ;
31
+ import java .util .List ;
32
+ import java .util .concurrent .ConcurrentHashMap ;
33
+ import java .util .concurrent .ConcurrentMap ;
34
+ import java .util .concurrent .CountDownLatch ;
35
+ import java .util .concurrent .ForkJoinPool ;
36
+ import java .util .concurrent .ThreadLocalRandom ;
37
+ import net .spy .memcached .CachedData ;
38
+ import net .spy .memcached .ConnectionFactory ;
39
+ import net .spy .memcached .FailureMode ;
40
+ import net .spy .memcached .MemcachedClient ;
41
+ import net .spy .memcached .internal .OperationFuture ;
42
+ import net .spy .memcached .ops .Operation ;
43
+ import net .spy .memcached .ops .OperationState ;
44
+ import net .spy .memcached .ops .OperationStatus ;
45
+ import net .spy .memcached .transcoders .Transcoder ;
30
46
import org .apache .hadoop .conf .Configuration ;
31
47
import org .apache .hadoop .hbase .HBaseClassTestRule ;
32
- import org .apache .hadoop .hbase .HBaseTestingUtility ;
33
48
import org .apache .hadoop .hbase .HConstants ;
34
49
import org .apache .hadoop .hbase .Waiter ;
35
50
import org .apache .hadoop .hbase .io .hfile .CacheTestUtils .HFileBlockPair ;
36
51
import org .apache .hadoop .hbase .testclassification .IOTests ;
37
52
import org .apache .hadoop .hbase .testclassification .SmallTests ;
38
- import org .junit .AfterClass ;
39
53
import org .junit .Before ;
40
- import org .junit .BeforeClass ;
41
54
import org .junit .ClassRule ;
42
55
import org .junit .Test ;
43
56
import org .junit .experimental .categories .Category ;
@@ -49,76 +62,94 @@ public class TestMemcachedBlockCache {
49
62
public static final HBaseClassTestRule CLASS_RULE =
50
63
HBaseClassTestRule .forClass (TestMemcachedBlockCache .class );
51
64
52
- static MemCacheDaemon <? extends CacheElement > MEMCACHED ;
53
- static MemcachedBlockCache CACHE ;
65
+ private MemcachedBlockCache cache ;
54
66
55
- @ Before
56
- public void before () throws Exception {
57
- MEMCACHED .getCache ().flush_all ();
58
- assertEquals ("Memcache is not empty" , MEMCACHED .getCache ().getCurrentItems (), 0 );
59
- }
67
+ private ConcurrentMap <String , CachedData > backingMap ;
60
68
61
- @ BeforeClass
62
- public static void setup () throws Exception {
63
- int port = HBaseTestingUtility .randomFreePort ();
64
- MEMCACHED = createDaemon (port );
69
+ @ Before
70
+ public void setup () throws Exception {
71
+ int port = ThreadLocalRandom .current ().nextInt (1024 , 65536 );
65
72
Configuration conf = new Configuration ();
66
73
conf .set ("hbase.cache.memcached.servers" , "localhost:" + port );
67
- CACHE = new MemcachedBlockCache ( conf );
68
- }
74
+ backingMap = new ConcurrentHashMap <>( );
75
+ cache = new MemcachedBlockCache ( conf ) {
69
76
70
- @ AfterClass
71
- public static void tearDown () throws Exception {
72
- if (MEMCACHED != null ) {
73
- MEMCACHED .stop ();
74
- }
77
+ private <T > OperationFuture <T > createFuture (String key , long opTimeout , T result ) {
78
+ OperationFuture <T > future =
79
+ new OperationFuture <>(key , new CountDownLatch (0 ), opTimeout , ForkJoinPool .commonPool ());
80
+ Operation op = mock (Operation .class );
81
+ when (op .getState ()).thenReturn (OperationState .COMPLETE );
82
+ future .setOperation (op );
83
+ future .set (result , new OperationStatus (true , "" ));
84
+
85
+ return future ;
86
+ }
87
+
88
+ @ Override
89
+ protected MemcachedClient createMemcachedClient (ConnectionFactory factory ,
90
+ List <InetSocketAddress > serverAddresses ) throws IOException {
91
+ assertEquals (FailureMode .Redistribute , factory .getFailureMode ());
92
+ assertTrue (factory .isDaemon ());
93
+ assertFalse (factory .useNagleAlgorithm ());
94
+ assertEquals (MAX_SIZE , factory .getReadBufSize ());
95
+ assertEquals (1 , serverAddresses .size ());
96
+ assertEquals ("localhost" , serverAddresses .get (0 ).getHostName ());
97
+ assertEquals (port , serverAddresses .get (0 ).getPort ());
98
+ MemcachedClient client = mock (MemcachedClient .class );
99
+ when (client .set (anyString (), anyInt (), any (), any ())).then (inv -> {
100
+ String key = inv .getArgument (0 );
101
+ HFileBlock block = inv .getArgument (2 );
102
+ Transcoder <HFileBlock > tc = inv .getArgument (3 );
103
+ CachedData cd = tc .encode (block );
104
+ backingMap .put (key , cd );
105
+ return createFuture (key , factory .getOperationTimeout (), true );
106
+ });
107
+ when (client .delete (anyString ())).then (inv -> {
108
+ String key = inv .getArgument (0 );
109
+ backingMap .remove (key );
110
+ return createFuture (key , factory .getOperationTimeout (), true );
111
+ });
112
+ when (client .get (anyString (), any ())).then (inv -> {
113
+ String key = inv .getArgument (0 );
114
+ Transcoder <HFileBlock > tc = inv .getArgument (1 );
115
+ CachedData cd = backingMap .get (key );
116
+ return tc .decode (cd );
117
+ });
118
+ return client ;
119
+ }
120
+ };
75
121
}
76
122
77
123
@ Test
78
124
public void testCache () throws Exception {
79
- final int NUM_BLOCKS = 10 ;
125
+ final int numBlocks = 10 ;
80
126
HFileBlockPair [] blocks =
81
- CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , NUM_BLOCKS );
82
- for (int i = 0 ; i < NUM_BLOCKS ; i ++) {
83
- CACHE .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
127
+ CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , numBlocks );
128
+ for (int i = 0 ; i < numBlocks ; i ++) {
129
+ cache .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
130
+ }
131
+ Waiter .waitFor (new Configuration (), 10000 , () -> backingMap .size () == numBlocks );
132
+ for (int i = 0 ; i < numBlocks ; i ++) {
133
+ HFileBlock actual = (HFileBlock ) cache .getBlock (blocks [i ].getBlockName (), false , false , true );
134
+ HFileBlock expected = blocks [i ].getBlock ();
135
+ assertEquals (expected .getBlockType (), actual .getBlockType ());
136
+ assertEquals (expected .getSerializedLength (), actual .getSerializedLength ());
84
137
}
85
- Waiter .waitFor (new Configuration (), 10000 ,
86
- () -> MEMCACHED .getCache ().getCurrentItems () == NUM_BLOCKS );
87
138
}
88
139
89
140
@ Test
90
141
public void testEviction () throws Exception {
91
- final int NUM_BLOCKS = 10 ;
142
+ final int numBlocks = 10 ;
92
143
HFileBlockPair [] blocks =
93
- CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , NUM_BLOCKS );
94
- for (int i = 0 ; i < NUM_BLOCKS ; i ++) {
95
- CACHE .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
96
- }
97
- Waiter .waitFor (new Configuration (), 10000 ,
98
- () -> MEMCACHED .getCache ().getCurrentItems () == NUM_BLOCKS );
99
- for (int i = 0 ; i < NUM_BLOCKS ; i ++) {
100
- CACHE .evictBlock (blocks [i ].getBlockName ());
144
+ CacheTestUtils .generateHFileBlocks (HConstants .DEFAULT_BLOCKSIZE , numBlocks );
145
+ for (int i = 0 ; i < numBlocks ; i ++) {
146
+ cache .cacheBlock (blocks [i ].getBlockName (), blocks [i ].getBlock ());
101
147
}
102
- Waiter .waitFor (new Configuration (), 10000 , () -> MEMCACHED .getCache ().getCurrentItems () == 0 );
103
- }
104
-
105
- private static MemCacheDaemon <? extends CacheElement > createDaemon (int port ) {
106
- InetSocketAddress addr = new InetSocketAddress ("localhost" , port );
107
- MemCacheDaemon <LocalCacheElement > daemon = new MemCacheDaemon <LocalCacheElement >();
108
- ConcurrentLinkedHashMap <Key , LocalCacheElement > cacheStorage =
109
- ConcurrentLinkedHashMap .create (EvictionPolicy .LRU , 1000 , 1024 * 1024 );
110
- daemon .setCache (new CacheImpl (cacheStorage ));
111
- daemon .setAddr (addr );
112
- daemon .setVerbose (true );
113
- daemon .start ();
114
- while (!daemon .isRunning ()) {
115
- try {
116
- Thread .sleep (100 );
117
- } catch (InterruptedException e ) {
118
- Thread .currentThread ().interrupt ();
119
- }
148
+ Waiter .waitFor (new Configuration (), 10000 , () -> backingMap .size () == numBlocks );
149
+ for (int i = 0 ; i < numBlocks ; i ++) {
150
+ cache .evictBlock (blocks [i ].getBlockName ());
120
151
}
121
- return daemon ;
152
+ Waiter . waitFor ( new Configuration (), 10000 , () -> backingMap . size () == 0 ) ;
122
153
}
123
154
124
155
}
0 commit comments