Skip to content

Commit ce08f50

Browse files
sideshowcoderMichael Nitschinger
authored andcommitted
Support Libmemcached ketama weighted
- Add support for alternative Ketama Node key format Libmemcached uses the format for `[hostname or ip][port unless default]-[repetition]` while spymemcached has been using `[hostname]/[ip]:[port]-[repetition]` the added `KetamaNodeKeyFormat` allows to choose either format while retaining the caching optimization. - Add support for weighted ketama Straight port of the weighting based on Libmemcached, configured via passing a map of node socketaddress to weight to the configuration. This code is only active if the weight is actually configured otherwise the old ketama code is used. - Split testLibKetamaCompatTwo into 2 pieces as the length breaks compiling on Eclipse and Java 8 "In java a methods can't have more than 65535 bytes." http://stackoverflow.com/questions/12257398/how-to-fix-the-code-of-method-is-exceeding-the-65535-bytes-limit Change-Id: I0263b9afc513f9a135d5d17318b3fe6bd4593437 Reviewed-on: http://review.couchbase.org/47624 Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com> Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
1 parent 53d14b3 commit ce08f50

File tree

7 files changed

+631
-288
lines changed

7 files changed

+631
-288
lines changed

src/main/java/net/spy/memcached/KetamaConnectionFactory.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222

2323
package net.spy.memcached;
2424

25+
import java.net.InetSocketAddress;
26+
import java.util.HashMap;
2527
import java.util.List;
28+
import java.util.Map;
2629

2730
/**
2831
* ConnectionFactory instance that sets up a ketama compatible connection.
@@ -40,16 +43,40 @@
4043
*/
4144
public class KetamaConnectionFactory extends DefaultConnectionFactory {
4245

46+
private final KetamaNodeKeyFormatter.Format ketamaNodeKeyFormat;
47+
private Map<InetSocketAddress, Integer> weights;
48+
4349
/**
4450
* Create a KetamaConnectionFactory with the given maximum operation
4551
* queue length, and the given read buffer size.
4652
*
4753
* @param opQueueMaxBlockTime the maximum time to block waiting for op
4854
* queue operations to complete, in milliseconds
4955
*/
50-
public KetamaConnectionFactory(int qLen, int bufSize,
56+
public KetamaConnectionFactory(int qLen, int bufSize,
5157
long opQueueMaxBlockTime) {
52-
super(qLen, bufSize, DefaultHashAlgorithm.KETAMA_HASH);
58+
this(qLen, bufSize, opQueueMaxBlockTime,
59+
DefaultHashAlgorithm.KETAMA_HASH,
60+
KetamaNodeKeyFormatter.Format.SPYMEMCACHED,
61+
new HashMap<InetSocketAddress, Integer>());
62+
}
63+
64+
/** Create a KetamaConnectionFactory with the maximum operation queue length,
65+
* the given read buffer size, the maximum time to block waiting operations,
66+
* a specific hash algorithm, a set ring key format, and a given set of
67+
* weights.
68+
*
69+
*/
70+
public KetamaConnectionFactory(
71+
int qLen,
72+
int bufSize,
73+
long opQueueMaxBlockTime,
74+
HashAlgorithm hash,
75+
KetamaNodeKeyFormatter.Format nodeKeyFormat,
76+
Map<InetSocketAddress, Integer> weights) {
77+
super(qLen, bufSize, hash);
78+
this.ketamaNodeKeyFormat = nodeKeyFormat;
79+
this.weights = weights;
5380
}
5481

5582
/**
@@ -67,6 +94,21 @@ public KetamaConnectionFactory() {
6794
*/
6895
@Override
6996
public NodeLocator createLocator(List<MemcachedNode> nodes) {
70-
return new KetamaNodeLocator(nodes, getHashAlg());
97+
return new KetamaNodeLocator(nodes, getHashAlg(),
98+
getKetamaNodeKeyFormat(), getWeights());
99+
}
100+
101+
/**
102+
* @return the ketamaNodeKeyFormat
103+
*/
104+
public KetamaNodeKeyFormatter.Format getKetamaNodeKeyFormat() {
105+
return ketamaNodeKeyFormat;
106+
}
107+
108+
/**
109+
* @return the ketama node weights
110+
*/
111+
public Map<InetSocketAddress, Integer> getWeights() {
112+
return weights;
71113
}
72114
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
* Copyright (C) 2009-2015 Couchbase, Inc.
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy
5+
* of this software and associated documentation files (the "Software"), to deal
6+
* in the Software without restriction, including without limitation the rights
7+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
* copies of the Software, and to permit persons to whom the Software is
9+
* furnished to do so, subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in
12+
* all copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
20+
* IN THE SOFTWARE.
21+
*/
22+
23+
package net.spy.memcached;
24+
25+
import java.net.InetSocketAddress;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
/**
30+
* Known key formats used in Ketama for assigning nodes around the ring
31+
*/
32+
33+
public class KetamaNodeKeyFormatter {
34+
35+
public enum Format {
36+
/**
37+
* SPYMEMCACHED uses the format traditionally used by spymemcached to map
38+
* nodes to names. The format is HOSTNAME/IP:PORT-ITERATION
39+
*
40+
* <p>
41+
* This default implementation uses the socket-address of the MemcachedNode
42+
* and concatenates it with a hyphen directly against the repetition number
43+
* for example a key for a particular server's first repetition may look like:
44+
* <p>
45+
*
46+
* <p>
47+
* <code>myhost/10.0.2.1-0</code>
48+
* </p>
49+
*
50+
* <p>
51+
* for the second repetition
52+
* </p>
53+
*
54+
* <p>
55+
* <code>myhost/10.0.2.1-1</code>
56+
* </p>
57+
*
58+
* <p>
59+
* for a server where reverse lookups are failing the returned keys may look
60+
* like
61+
* </p>
62+
*
63+
* <p>
64+
* <code>/10.0.2.1-0</code> and <code>/10.0.2.1-1</code>
65+
* </p>
66+
*/
67+
SPYMEMCACHED,
68+
69+
/**
70+
* LIBMEMCACHED uses the format traditionally used by libmemcached to map
71+
* nodes to names. The format is HOSTNAME:[PORT]-ITERATION the PORT is not
72+
* part of the node identifier if it is the default memecached port (11211)
73+
*/
74+
LIBMEMCACHED
75+
}
76+
77+
private final Format format;
78+
79+
public Format getFormat() {
80+
return format;
81+
}
82+
83+
// Carrried over from the DefaultKetamaNodeLocatorConfiguration:
84+
// Internal lookup map to try to carry forward the optimisation that was
85+
// previously in KetamaNodeLocator
86+
private Map<MemcachedNode, String> nodeKeys = new HashMap<MemcachedNode, String>();
87+
88+
public KetamaNodeKeyFormatter() {
89+
this(Format.SPYMEMCACHED);
90+
}
91+
92+
public KetamaNodeKeyFormatter(Format format) {
93+
this.format = format;
94+
}
95+
96+
/**
97+
* Returns a uniquely identifying key, suitable for hashing by the
98+
* KetamaNodeLocator algorithm.
99+
*
100+
* @param node The MemcachedNode to use to form the unique identifier
101+
* @param repetition The repetition number for the particular node in question
102+
* (0 is the first repetition)
103+
* @return The key that represents the specific repetition of the node
104+
*/
105+
public String getKeyForNode(MemcachedNode node, int repetition) {
106+
// Carrried over from the DefaultKetamaNodeLocatorConfiguration:
107+
// Internal Using the internal map retrieve the socket addresses
108+
// for given nodes.
109+
// I'm aware that this code is inherently thread-unsafe as
110+
// I'm using a HashMap implementation of the map, but the worst
111+
// case ( I believe) is we're slightly in-efficient when
112+
// a node has never been seen before concurrently on two different
113+
// threads, so it the socketaddress will be requested multiple times!
114+
// all other cases should be as fast as possible.
115+
String nodeKey = nodeKeys.get(node);
116+
if (nodeKey == null) {
117+
switch(this.format) {
118+
case LIBMEMCACHED:
119+
InetSocketAddress address = (InetSocketAddress)node.getSocketAddress();
120+
nodeKey = address.getHostName();
121+
if (address.getPort() != 11211) {
122+
nodeKey += ":" + address.getPort();
123+
}
124+
break;
125+
case SPYMEMCACHED:
126+
nodeKey = String.valueOf(node.getSocketAddress());
127+
if (nodeKey.startsWith("/")) {
128+
nodeKey = nodeKey.substring(1);
129+
}
130+
break;
131+
default:
132+
assert false;
133+
}
134+
nodeKeys.put(node, nodeKey);
135+
}
136+
return nodeKey + "-" + repetition;
137+
}
138+
}

src/main/java/net/spy/memcached/KetamaNodeLocator.java

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727
import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration;
2828
import net.spy.memcached.util.KetamaNodeLocatorConfiguration;
2929

30+
import java.net.InetSocketAddress;
3031
import java.util.ArrayList;
3132
import java.util.Collection;
33+
import java.util.HashMap;
3234
import java.util.Iterator;
3335
import java.util.List;
3436
import java.util.Map;
@@ -51,6 +53,8 @@ public final class KetamaNodeLocator extends SpyObject implements NodeLocator {
5153
private volatile Collection<MemcachedNode> allNodes;
5254

5355
private final HashAlgorithm hashAlg;
56+
private final Map<InetSocketAddress, Integer> weights;
57+
private final boolean isWeightedKetama;
5458
private final KetamaNodeLocatorConfiguration config;
5559

5660
/**
@@ -63,7 +67,26 @@ public final class KetamaNodeLocator extends SpyObject implements NodeLocator {
6367
* consistent hash continuum
6468
*/
6569
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
66-
this(nodes, alg, new DefaultKetamaNodeLocatorConfiguration());
70+
this(nodes, alg, KetamaNodeKeyFormatter.Format.SPYMEMCACHED, new HashMap<InetSocketAddress, Integer>());
71+
}
72+
73+
/**
74+
* Create a new KetamaNodeLocator with specific nodes, hash, node key format,
75+
* and weight
76+
*
77+
* @param nodes The List of nodes to use in the Ketama consistent hash
78+
* continuum
79+
* @param alg The hash algorithm to use when choosing a node in the Ketama
80+
* consistent hash continuum
81+
* @param nodeKeyFormat the format used to name the nodes in Ketama, either
82+
* SPYMEMCACHED or LIBMEMCACHED
83+
* @param weights node weights for ketama, a map from InetSocketAddress to
84+
* weight as Integer
85+
*/
86+
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
87+
KetamaNodeKeyFormatter.Format nodeKeyFormat,
88+
Map<InetSocketAddress, Integer> weights) {
89+
this(nodes, alg, weights, new DefaultKetamaNodeLocatorConfiguration(new KetamaNodeKeyFormatter(nodeKeyFormat)));
6790
}
6891

6992
/**
@@ -78,21 +101,44 @@ public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) {
78101
*/
79102
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
80103
KetamaNodeLocatorConfiguration conf) {
104+
this(nodes, alg, new HashMap<InetSocketAddress, Integer>(), conf);
105+
}
106+
107+
/**
108+
* Create a new KetamaNodeLocator with specific nodes, hash, node key format,
109+
* and weight
110+
*
111+
* @param nodes The List of nodes to use in the Ketama consistent hash
112+
* continuum
113+
* @param alg The hash algorithm to use when choosing a node in the Ketama
114+
* consistent hash continuum
115+
* @param weights node weights for ketama, a map from InetSocketAddress to
116+
* weight as Integer
117+
* @param configuration node locator configuration
118+
*/
119+
public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg,
120+
Map<InetSocketAddress, Integer> nodeWeights,
121+
KetamaNodeLocatorConfiguration configuration) {
81122
super();
82123
allNodes = nodes;
83124
hashAlg = alg;
84-
config = conf;
125+
config = configuration;
126+
weights = nodeWeights;
127+
isWeightedKetama = !weights.isEmpty();
85128
setKetamaNodes(nodes);
86129
}
87130

88131
private KetamaNodeLocator(TreeMap<Long, MemcachedNode> smn,
89132
Collection<MemcachedNode> an, HashAlgorithm alg,
133+
Map<InetSocketAddress, Integer> nodeWeights,
90134
KetamaNodeLocatorConfiguration conf) {
91135
super();
92136
ketamaNodes = smn;
93137
allNodes = an;
94138
hashAlg = alg;
95139
config = conf;
140+
weights = nodeWeights;
141+
isWeightedKetama = !weights.isEmpty();
96142
}
97143

98144
public Collection<MemcachedNode> getAll() {
@@ -147,7 +193,7 @@ public NodeLocator getReadonlyCopy() {
147193
an.add(new MemcachedNodeROImpl(n));
148194
}
149195

150-
return new KetamaNodeLocator(smn, an, hashAlg, config);
196+
return new KetamaNodeLocator(smn, an, hashAlg, weights, config);
151197
}
152198

153199
@Override
@@ -171,30 +217,62 @@ protected TreeMap<Long, MemcachedNode> getKetamaNodes() {
171217
*/
172218
protected void setKetamaNodes(List<MemcachedNode> nodes) {
173219
TreeMap<Long, MemcachedNode> newNodeMap =
174-
new TreeMap<Long, MemcachedNode>();
220+
new TreeMap<Long, MemcachedNode>();
175221
int numReps = config.getNodeRepetitions();
222+
int nodeCount = nodes.size();
223+
int totalWeight = 0;
224+
225+
if (isWeightedKetama) {
226+
for (MemcachedNode node : nodes) {
227+
totalWeight += weights.get(node.getSocketAddress());
228+
}
229+
}
230+
176231
for (MemcachedNode node : nodes) {
177-
// Ketama does some special work with md5 where it reuses chunks.
178-
if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
179-
for (int i = 0; i < numReps / 4; i++) {
180-
byte[] digest =
181-
DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, i));
182-
for (int h = 0; h < 4; h++) {
183-
Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24)
184-
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
185-
| ((long) (digest[1 + h * 4] & 0xFF) << 8)
186-
| (digest[h * 4] & 0xFF);
187-
newNodeMap.put(k, node);
188-
getLogger().debug("Adding node %s in position %d", node, k);
232+
if (isWeightedKetama) {
233+
234+
int thisWeight = weights.get(node.getSocketAddress());
235+
float percent = (float)thisWeight / (float)totalWeight;
236+
int pointerPerServer = (int)((Math.floor((float)(percent * (float)config.getNodeRepetitions() / 4 * (float)nodeCount + 0.0000000001))) * 4);
237+
for (int i = 0; i < pointerPerServer / 4; i++) {
238+
for(long position : ketamaNodePositionsAtIteration(node, i)) {
239+
newNodeMap.put(position, node);
240+
getLogger().debug("Adding node %s with weight %s in position %d", node, thisWeight, position);
241+
}
189242
}
190-
}
191243
} else {
192-
for (int i = 0; i < numReps; i++) {
193-
newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
194-
}
244+
// Ketama does some special work with md5 where it reuses chunks.
245+
// Check to be backwards compatible, the hash algorithm does not
246+
// matter for Ketama, just the placement should always be done using
247+
// MD5
248+
if (hashAlg == DefaultHashAlgorithm.KETAMA_HASH) {
249+
for (int i = 0; i < numReps / 4; i++) {
250+
for(long position : ketamaNodePositionsAtIteration(node, i)) {
251+
newNodeMap.put(position, node);
252+
getLogger().debug("Adding node %s in position %d", node, position);
253+
}
254+
}
255+
} else {
256+
for (int i = 0; i < numReps; i++) {
257+
newNodeMap.put(hashAlg.hash(config.getKeyForNode(node, i)), node);
258+
}
259+
}
195260
}
196261
}
197262
assert newNodeMap.size() == numReps * nodes.size();
198263
ketamaNodes = newNodeMap;
199264
}
265+
266+
private List<Long> ketamaNodePositionsAtIteration(MemcachedNode node, int iteration) {
267+
List<Long> positions = new ArrayList<Long>();
268+
byte[] digest = DefaultHashAlgorithm.computeMd5(config.getKeyForNode(node, iteration));
269+
for (int h = 0; h < 4; h++) {
270+
Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24)
271+
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
272+
| ((long) (digest[1 + h * 4] & 0xFF) << 8)
273+
| (digest[h * 4] & 0xFF);
274+
positions.add(k);
275+
}
276+
return positions;
277+
}
200278
}

0 commit comments

Comments
 (0)