Skip to content

Commit a982d0a

Browse files
daschlMichael Nitschinger
authored andcommitted
SPY-172: Wakeup the selector if idle.
Motivation ---------- Waking the selector up from time to time and providing implementations a chance to run certain checks helps to improve robustness in certain situations. Modifications ------------- The wakeup time is configurable through a system property, but is low impact even if set to a smaller value. If the added queue is empty (which means the selector has been woken up but no op has been added) a custom method is called where implementations can run custom code like idle polls. Result ------ Better handling in idle situations. Change-Id: I43ea722b8a4fc28be4f997674ea85f73f2c66a50 Reviewed-on: http://review.couchbase.org/37725 Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com> Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
1 parent 1a92696 commit a982d0a

File tree

2 files changed

+112
-3
lines changed

2 files changed

+112
-3
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public class MemcachedConnection extends SpyThread {
9292
*/
9393
private static final int EXCESSIVE_EMPTY = 0x1000000;
9494

95+
/**
96+
* The default wakeup delay if not overriden by a system property.
97+
*/
98+
private static final int DEFAULT_WAKEUP_DELAY = 1000;
99+
95100
/**
96101
* If an operation gets cloned more than this ceiling, cancel it for
97102
* safety reasons.
@@ -230,6 +235,11 @@ public class MemcachedConnection extends SpyThread {
230235
*/
231236
protected final MetricType metricType;
232237

238+
/**
239+
* The selector wakeup delay, defaults to 1000ms.
240+
*/
241+
private final int wakeupDelay;
242+
233243
/**
234244
* Construct a {@link MemcachedConnection}.
235245
*
@@ -266,6 +276,9 @@ public MemcachedConnection(final int bufSize, final ConnectionFactory f,
266276
verifyAliveOnConnect = false;
267277
}
268278

279+
wakeupDelay = Integer.parseInt( System.getProperty("net.spy.wakeupDelay",
280+
Integer.toString(DEFAULT_WAKEUP_DELAY)));
281+
269282
List<MemcachedNode> connections = createConnections(a);
270283
locator = f.createLocator(connections);
271284

@@ -396,7 +409,7 @@ public void handleIO() throws IOException {
396409
handleInputQueue();
397410
getLogger().debug("Done dealing with queue.");
398411

399-
long delay = 0;
412+
long delay = 1000;
400413
if (!reconnectQueue.isEmpty()) {
401414
long now = System.currentTimeMillis();
402415
long then = reconnectQueue.firstKey();
@@ -405,9 +418,12 @@ public void handleIO() throws IOException {
405418
getLogger().debug("Selecting with delay of %sms", delay);
406419
assert selectorsMakeSense() : "Selectors don't make sense.";
407420
int selected = selector.select(delay);
408-
//Set<SelectionKey> selectedKeys = selector.selectedKeys();
409421

410-
if (selector.selectedKeys().isEmpty() && !shutDown) {
422+
if (shutDown) {
423+
return;
424+
} else if (selected == 0 && addedQueue.isEmpty()) {
425+
handleWokenUpSelector();
426+
} else if (selector.selectedKeys().isEmpty()) {
411427
handleEmptySelects();
412428
} else {
413429
getLogger().debug("Selected %d, selected %d keys", selected,
@@ -425,6 +441,23 @@ public void handleIO() throws IOException {
425441
handleOperationalTasks();
426442
}
427443

444+
/**
445+
* Helper method which gets called if the selector is woken up because of the
446+
* timeout setting, if has been interrupted or if happens during regular
447+
* write operation phases.
448+
*
449+
* <p>This method can be overriden by child implementations to handle custom
450+
* behavior on a manually woken selector, like sending pings through the
451+
* channels to make sure they are alive.</p>
452+
*
453+
* <p>Note that there is no guarantee that this method is at all or in the
454+
* regular interval called, so all overriding implementations need to take
455+
* that into account. Also, it needs to take into account that it may be
456+
* called very often under heavy workloads, so it should not perform extensive
457+
* tasks in the same thread.</p>
458+
*/
459+
protected void handleWokenUpSelector() { }
460+
428461
/**
429462
* Helper method for {@link #handleIO()} to encapsulate everything that
430463
* needs to be checked on a regular basis that has nothing to do directly
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright (C) 2006-2009 Dustin Sallings
3+
* Copyright (C) 2009-2014 Couchbase, Inc.
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to deal
7+
* in the Software without restriction, including without limitation the rights
8+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
* copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20+
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
21+
* IN THE SOFTWARE.
22+
*/
23+
package net.spy.memcached;
24+
25+
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
26+
import org.junit.Test;
27+
28+
import java.io.IOException;
29+
import java.net.InetSocketAddress;
30+
import java.util.Arrays;
31+
import java.util.Collection;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static org.junit.Assert.assertTrue;
38+
39+
/**
40+
* Verifies the functionality of the {@link MemcachedConnection} that the
41+
* selector gets woken up automatically if idle.
42+
*/
43+
public class WokenUpOnIdleTest {
44+
45+
@Test
46+
public void shouldWakeUpOnIdle() throws Exception {
47+
CountDownLatch latch = new CountDownLatch(3);
48+
MemcachedConnection connection = new InstrumentedConnection(
49+
latch,
50+
1024,
51+
new BinaryConnectionFactory(),
52+
Arrays.asList(new InetSocketAddress(11210)),
53+
Collections.<ConnectionObserver>emptyList(),
54+
FailureMode.Redistribute,
55+
new BinaryOperationFactory()
56+
);
57+
58+
assertTrue(latch.await(5, TimeUnit.SECONDS));
59+
}
60+
61+
static class InstrumentedConnection extends MemcachedConnection {
62+
final CountDownLatch latch;
63+
InstrumentedConnection(CountDownLatch latch, int bufSize, ConnectionFactory f,
64+
List<InetSocketAddress> a, Collection<ConnectionObserver> obs,
65+
FailureMode fm, OperationFactory opfactory) throws IOException {
66+
super(bufSize, f, a, obs, fm, opfactory);
67+
this.latch = latch;
68+
}
69+
70+
@Override
71+
protected void handleWokenUpSelector() {
72+
latch.countDown();
73+
}
74+
}
75+
76+
}

0 commit comments

Comments
 (0)