Skip to content

Commit 0b4b66f

Browse files
committed
Fix #62 by using a Future to time out the request.
1 parent 8271420 commit 0b4b66f

File tree

2 files changed

+109
-24
lines changed

2 files changed

+109
-24
lines changed

src/main/java/net/juniper/netconf/NetconfSession.java

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,14 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
4245
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.TimeoutException;
47+
import java.util.concurrent.atomic.AtomicReference;
48+
49+
import static java.util.Optional.ofNullable;
4350

4451
/**
4552
* A {@code NetconfSession} is obtained by first building a
@@ -59,6 +66,7 @@
5966
public class NetconfSession {
6067

6168
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(NetconfSession.class);
69+
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
6270

6371
private final Channel netconfChannel;
6472
private String serverCapability;
@@ -127,30 +135,49 @@ private void sendHello(String hello) throws IOException {
127135
}
128136

129137
@VisibleForTesting
130-
String getRpcReply(String rpc) throws IOException {
138+
String getRpcReply(final String rpc) throws IOException {
131139
// write the rpc to the device
132140
sendRpcRequest(rpc);
133141

134-
final char[] buffer = new char[BUFFER_SIZE];
135-
final StringBuilder rpcReply = new StringBuilder();
136-
final long startTime = System.nanoTime();
137-
final Reader in = new InputStreamReader(stdInStreamFromDevice, Charsets.UTF_8);
138-
boolean timeoutNotExceeded = true;
139-
int promptPosition;
140-
while ((promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT)) < 0 &&
141-
(timeoutNotExceeded = (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) < commandTimeout))) {
142-
int charsRead = in.read(buffer, 0, buffer.length);
143-
if (charsRead < 0) throw new NetconfException("Input Stream has been closed during reading.");
144-
rpcReply.append(buffer, 0, charsRead);
145-
}
146-
147-
if (!timeoutNotExceeded)
142+
final AtomicReference<Thread> threadReference = new AtomicReference<>();
143+
try {
144+
return singleThreadExecutor.submit(() -> {
145+
try {
146+
147+
threadReference.set(Thread.currentThread());
148+
final char[] buffer = new char[BUFFER_SIZE];
149+
final StringBuilder rpcReply = new StringBuilder();
150+
final Reader in = new InputStreamReader(stdInStreamFromDevice, Charsets.UTF_8);
151+
int promptPosition;
152+
while ((promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT)) < 0) {
153+
int charsRead = in.read(buffer, 0, buffer.length);
154+
if (charsRead < 0) throw new NetconfException("Input Stream has been closed during reading.");
155+
rpcReply.append(buffer, 0, charsRead);
156+
}
157+
158+
log.debug("Received Netconf RPC-Reply\n{}", rpcReply);
159+
rpcReply.setLength(promptPosition);
160+
return rpcReply.toString();
161+
162+
} catch (final Exception e) {
163+
log.warn("Error reading from input stream", e);
164+
throw e;
165+
}
166+
})
167+
.get(commandTimeout, TimeUnit.MILLISECONDS);
168+
} catch (final InterruptedException e) {
169+
Thread.currentThread().interrupt();
170+
throw new NetconfException("Thread interrupted whilst waiting for RPC reply", e);
171+
} catch (final ExecutionException e) {
172+
if(e.getCause() instanceof NetconfException) {
173+
throw (NetconfException) e.getCause();
174+
}
175+
throw new NetconfException("Unexpected exception whilst waiting for RPC reply", e);
176+
} catch (final TimeoutException e) {
177+
// Make sure the thread isn't still running
178+
ofNullable(threadReference.get()).ifPresent(Thread::interrupt);
148179
throw new SocketTimeoutException("Command timeout limit was exceeded: " + commandTimeout);
149-
// fixing the rpc reply by removing device prompt
150-
log.debug("Received Netconf RPC-Reply\n{}", rpcReply);
151-
rpcReply.setLength(promptPosition);
152-
153-
return rpcReply.toString();
180+
}
154181
}
155182

156183
private BufferedReader getRpcReplyRunning(String rpc) throws IOException {

src/test/java/net/juniper/netconf/NetconfSessionTest.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.junit.jupiter.api.AfterEach;
88
import org.junit.jupiter.api.BeforeEach;
99
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.Timeout;
1011
import org.mockito.Mock;
1112
import org.mockito.MockitoAnnotations;
1213
import org.slf4j.Logger;
@@ -25,17 +26,16 @@
2526
import java.net.SocketTimeoutException;
2627
import java.nio.charset.StandardCharsets;
2728
import java.nio.file.Files;
29+
import java.time.Duration;
30+
import java.time.Instant;
31+
import java.util.concurrent.TimeUnit;
2832

2933
import static org.assertj.core.api.Assertions.assertThat;
3034
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3135
import static org.junit.jupiter.api.Assertions.assertThrows;
32-
import static org.mockito.ArgumentMatchers.any;
33-
import static org.mockito.ArgumentMatchers.anyInt;
3436
import static org.mockito.Mockito.anyString;
35-
import static org.mockito.Mockito.doAnswer;
3637
import static org.mockito.Mockito.doCallRealMethod;
3738
import static org.mockito.Mockito.eq;
38-
import static org.mockito.Mockito.mock;
3939
import static org.mockito.Mockito.when;
4040

4141
public class NetconfSessionTest {
@@ -489,4 +489,62 @@ private String createHelloMessage() {
489489
+ " <session-id>27700</session-id>\n"
490490
+ "</hello>";
491491
}
492+
493+
494+
@Test
495+
@Timeout(value = 2, unit = TimeUnit.SECONDS)
496+
void ifTheDeviceDoesNotRespondAnExceptionWillBeThrown() {
497+
final Duration commandTimeoutDuration = Duration.ofSeconds(1);
498+
499+
final Instant startTime = Instant.now();
500+
assertThatThrownBy(() -> createNetconfSession((int) commandTimeoutDuration.toMillis()))
501+
.isInstanceOf(SocketTimeoutException.class)
502+
.hasMessageStartingWith("Command timeout limit was exceeded");
503+
504+
final Duration executeRpcDuration = Duration.between(startTime, Instant.now());
505+
// This test should take over 1 second to time out, but hopefully less than 2 unless you have a very slow machine!
506+
assertThat(executeRpcDuration)
507+
.isGreaterThanOrEqualTo(commandTimeoutDuration)
508+
.isLessThanOrEqualTo(Duration.ofSeconds(2));
509+
}
510+
511+
@Test
512+
@Timeout(value = 2, unit = TimeUnit.SECONDS)
513+
void ifTheDeviceDoesNotRespondTheSessionCanStillBeUsed() throws Exception {
514+
final Duration commandTimeoutDuration = Duration.ofSeconds(1);
515+
516+
new Thread(() -> {
517+
try {
518+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
519+
outPipe.write(DEVICE_PROMPT_BYTE);
520+
outPipe.flush();
521+
// 1.5 seconds - enough for the first request to timeout, short enough for the second to complete
522+
Thread.sleep(1500);
523+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
524+
outPipe.write(DEVICE_PROMPT_BYTE);
525+
outPipe.flush();
526+
outPipe.close();
527+
} catch (final Exception e) {
528+
log.error("Error in background thread", e);
529+
}
530+
}).start();
531+
final NetconfSession netconfSession = createNetconfSession((int) commandTimeoutDuration.toMillis());
532+
// We've now received a "FAKE_RPC_REPLY"
533+
534+
// Now send a request, but we're not expecting a reply in a second
535+
final Instant startTime = Instant.now();
536+
assertThatThrownBy(() -> netconfSession.getRpcReply("<some-command/>"))
537+
.isInstanceOf(SocketTimeoutException.class)
538+
.hasMessageStartingWith("Command timeout limit was exceeded");
539+
final Duration executeRpcDuration = Duration.between(startTime, Instant.now());
540+
541+
// This test should take over 1 second to time out, but hopefully less than 2 unless you have a very slow machine!
542+
assertThat(executeRpcDuration)
543+
.isGreaterThanOrEqualTo(commandTimeoutDuration)
544+
.isLessThanOrEqualTo(Duration.ofSeconds(2));
545+
546+
// Try again - we should get a reply
547+
final String rpcReply = netconfSession.getRpcReply("<some-command/>");
548+
assertThat(rpcReply).isEqualTo(FAKE_RPC_REPLY);
549+
}
492550
}

0 commit comments

Comments
 (0)