Skip to content

Commit e1979df

Browse files
committed
SPY-194: Allow to optionally bound retry queue.
Motivation ---------- If the retry queue might grow very large this can lead to uncontrollable side effects (like heap growth), since this queue is not accessible from the outside of spymemcached. Modifications ------------- This change adds the system property "net.spy.retryQueueSize" which allows to optionally bound the retry queue to the given size. Note that to maximize backwards compatibility and to not introduce any regressions the actual queue has not been changed but rather the code which inserts into this queue is now checking for the limit. If the queue is full the operation is cancelled instead of sending it into the retry queue. Result ------ It is now possible to bound the retry queue. Change-Id: I86452ef8dcddcf2c4acfc065fd9db94510665ac3 Reviewed-on: http://review.couchbase.org/69938 Tested-by: Michael Nitschinger <michael@nitschinger.at> Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
1 parent e64b117 commit e1979df

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,11 @@ public class MemcachedConnection extends SpyThread {
9797
*/
9898
private static final int DEFAULT_WAKEUP_DELAY = 1000;
9999

100+
/**
101+
* By default, do not bound the retry queue.
102+
*/
103+
private static final int DEFAULT_RETRY_QUEUE_SIZE = -1;
104+
100105
/**
101106
* If an operation gets cloned more than this ceiling, cancel it for
102107
* safety reasons.
@@ -240,6 +245,11 @@ public class MemcachedConnection extends SpyThread {
240245
*/
241246
private final int wakeupDelay;
242247

248+
/**
249+
* Optionally bound the retry queue if set via system property.
250+
*/
251+
private final int retryQueueSize;
252+
243253
/**
244254
* Construct a {@link MemcachedConnection}.
245255
*
@@ -279,6 +289,10 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f,
279289
wakeupDelay = Integer.parseInt( System.getProperty("net.spy.wakeupDelay",
280290
Integer.toString(DEFAULT_WAKEUP_DELAY)));
281291

292+
retryQueueSize = Integer.parseInt(System.getProperty("net.spy.retryQueueSize",
293+
Integer.toString(DEFAULT_RETRY_QUEUE_SIZE)));
294+
getLogger().info("Setting retryQueueSize to " + retryQueueSize);
295+
282296
List<MemcachedNode> connections = createConnections(a);
283297
locator = f.createLocator(connections);
284298

@@ -878,7 +892,7 @@ private void readBufferAndLogMetrics(final Operation currentOp,
878892
assert op == currentOp : "Expected to pop " + currentOp + " got "
879893
+ op;
880894

881-
retryOps.add(currentOp);
895+
retryOperation(currentOp);
882896
metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);
883897
}
884898
}
@@ -1484,9 +1498,18 @@ public boolean isShutDown() {
14841498
/**
14851499
* Add a operation to the retry queue.
14861500
*
1501+
* If the retry queue size is bounded and the size of the queue is reaching
1502+
* that boundary, the operation is cancelled rather than added to the
1503+
* retry queue.
1504+
*
14871505
* @param op the operation to retry.
14881506
*/
14891507
public void retryOperation(Operation op) {
1508+
if (retryQueueSize >= 0 && retryOps.size() >= retryQueueSize) {
1509+
if (!op.isCancelled()) {
1510+
op.cancel();
1511+
}
1512+
}
14901513
retryOps.add(op);
14911514
}
14921515

0 commit comments

Comments
 (0)