Skip to content

Commit 6424016

Browse files
committed
Fix #62 by using a Future to time out the request.
1 parent 8271420 commit 6424016

File tree

2 files changed

+115
-24
lines changed

2 files changed

+115
-24
lines changed

src/main/java/net/juniper/netconf/NetconfSession.java

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,14 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.ExecutorService;
44+
import java.util.concurrent.Executors;
4245
import java.util.concurrent.TimeUnit;
46+
import java.util.concurrent.TimeoutException;
47+
import java.util.concurrent.atomic.AtomicReference;
48+
49+
import static java.util.Optional.ofNullable;
4350

4451
/**
4552
* A {@code NetconfSession} is obtained by first building a
@@ -59,6 +66,7 @@
5966
public class NetconfSession {
6067

6168
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(NetconfSession.class);
69+
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
6270

6371
private final Channel netconfChannel;
6472
private String serverCapability;
@@ -127,30 +135,49 @@ private void sendHello(String hello) throws IOException {
127135
}
128136

129137
@VisibleForTesting
130-
String getRpcReply(String rpc) throws IOException {
138+
String getRpcReply(final String rpc) throws IOException {
131139
// write the rpc to the device
132140
sendRpcRequest(rpc);
133141

134-
final char[] buffer = new char[BUFFER_SIZE];
135-
final StringBuilder rpcReply = new StringBuilder();
136-
final long startTime = System.nanoTime();
137-
final Reader in = new InputStreamReader(stdInStreamFromDevice, Charsets.UTF_8);
138-
boolean timeoutNotExceeded = true;
139-
int promptPosition;
140-
while ((promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT)) < 0 &&
141-
(timeoutNotExceeded = (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) < commandTimeout))) {
142-
int charsRead = in.read(buffer, 0, buffer.length);
143-
if (charsRead < 0) throw new NetconfException("Input Stream has been closed during reading.");
144-
rpcReply.append(buffer, 0, charsRead);
145-
}
146-
147-
if (!timeoutNotExceeded)
142+
final AtomicReference<Thread> threadReference = new AtomicReference<>();
143+
try {
144+
return singleThreadExecutor.submit(() -> {
145+
try {
146+
147+
threadReference.set(Thread.currentThread());
148+
final char[] buffer = new char[BUFFER_SIZE];
149+
final StringBuilder rpcReply = new StringBuilder();
150+
final Reader in = new InputStreamReader(stdInStreamFromDevice, Charsets.UTF_8);
151+
int promptPosition;
152+
while ((promptPosition = rpcReply.indexOf(NetconfConstants.DEVICE_PROMPT)) < 0) {
153+
int charsRead = in.read(buffer, 0, buffer.length);
154+
if (charsRead < 0) throw new NetconfException("Input Stream has been closed during reading.");
155+
rpcReply.append(buffer, 0, charsRead);
156+
}
157+
158+
log.debug("Received Netconf RPC-Reply\n{}", rpcReply);
159+
rpcReply.setLength(promptPosition);
160+
return rpcReply.toString();
161+
162+
} catch (final Exception e) {
163+
log.warn("Error reading from input stream", e);
164+
throw e;
165+
}
166+
})
167+
.get(commandTimeout, TimeUnit.MILLISECONDS);
168+
} catch (final InterruptedException e) {
169+
Thread.currentThread().interrupt();
170+
throw new NetconfException("Thread interrupted whilst waiting for RPC reply", e);
171+
} catch (final ExecutionException e) {
172+
if(e.getCause() instanceof NetconfException) {
173+
throw (NetconfException) e.getCause();
174+
}
175+
throw new NetconfException("Unexpected exception whilst waiting for RPC reply", e);
176+
} catch (final TimeoutException e) {
177+
// Make sure the thread isn't still running
178+
ofNullable(threadReference.get()).ifPresent(Thread::interrupt);
148179
throw new SocketTimeoutException("Command timeout limit was exceeded: " + commandTimeout);
149-
// fixing the rpc reply by removing device prompt
150-
log.debug("Received Netconf RPC-Reply\n{}", rpcReply);
151-
rpcReply.setLength(promptPosition);
152-
153-
return rpcReply.toString();
180+
}
154181
}
155182

156183
private BufferedReader getRpcReplyRunning(String rpc) throws IOException {

src/test/java/net/juniper/netconf/NetconfSessionTest.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.junit.jupiter.api.AfterEach;
88
import org.junit.jupiter.api.BeforeEach;
99
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.Timeout;
1011
import org.mockito.Mock;
1112
import org.mockito.MockitoAnnotations;
1213
import org.slf4j.Logger;
@@ -25,17 +26,17 @@
2526
import java.net.SocketTimeoutException;
2627
import java.nio.charset.StandardCharsets;
2728
import java.nio.file.Files;
29+
import java.time.Duration;
30+
import java.time.Instant;
31+
import java.util.concurrent.Semaphore;
32+
import java.util.concurrent.TimeUnit;
2833

2934
import static org.assertj.core.api.Assertions.assertThat;
3035
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3136
import static org.junit.jupiter.api.Assertions.assertThrows;
32-
import static org.mockito.ArgumentMatchers.any;
33-
import static org.mockito.ArgumentMatchers.anyInt;
3437
import static org.mockito.Mockito.anyString;
35-
import static org.mockito.Mockito.doAnswer;
3638
import static org.mockito.Mockito.doCallRealMethod;
3739
import static org.mockito.Mockito.eq;
38-
import static org.mockito.Mockito.mock;
3940
import static org.mockito.Mockito.when;
4041

4142
public class NetconfSessionTest {
@@ -489,4 +490,67 @@ private String createHelloMessage() {
489490
+ " <session-id>27700</session-id>\n"
490491
+ "</hello>";
491492
}
493+
494+
495+
@Test
496+
@Timeout(value = 2, unit = TimeUnit.SECONDS)
497+
void ifTheDeviceDoesNotRespondAnExceptionWillBeThrown() {
498+
final Duration commandTimeoutDuration = Duration.ofSeconds(1);
499+
500+
final Instant startTime = Instant.now();
501+
assertThatThrownBy(() -> createNetconfSession((int) commandTimeoutDuration.toMillis()))
502+
.isInstanceOf(SocketTimeoutException.class)
503+
.hasMessageStartingWith("Command timeout limit was exceeded");
504+
505+
final Duration executeRpcDuration = Duration.between(startTime, Instant.now());
506+
// This should have taken about 1 second to time out
507+
assertThat(executeRpcDuration)
508+
.isGreaterThanOrEqualTo(commandTimeoutDuration);
509+
}
510+
511+
@Test
512+
@Timeout(value = 2, unit = TimeUnit.SECONDS)
513+
void ifTheDeviceDoesNotRespondTheSessionCanStillBeUsed() throws Exception {
514+
515+
final Semaphore semaphore = new Semaphore(0);
516+
517+
final Duration commandTimeoutDuration = Duration.ofSeconds(1);
518+
519+
new Thread(() -> {
520+
try {
521+
// This is the "hello" from the device, in response to the "Hello" to the initial client ""hello"
522+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
523+
outPipe.write(DEVICE_PROMPT_BYTE);
524+
outPipe.flush();
525+
526+
// Don't send any response until it's required
527+
semaphore.acquire();
528+
// Now send a second response
529+
outPipe.write(FAKE_RPC_REPLY.getBytes(StandardCharsets.UTF_8));
530+
outPipe.write(DEVICE_PROMPT_BYTE);
531+
outPipe.flush();
532+
outPipe.close();
533+
} catch (final Exception e) {
534+
log.error("Error in background thread", e);
535+
}
536+
}).start();
537+
final NetconfSession netconfSession = createNetconfSession((int) commandTimeoutDuration.toMillis());
538+
// We've now received a "FAKE_RPC_REPLY"
539+
540+
// Now send a request, but we're expecting a timeout as the device won't send it yet
541+
final Instant startTime = Instant.now();
542+
assertThatThrownBy(() -> netconfSession.getRpcReply("<some-command/>"))
543+
.isInstanceOf(SocketTimeoutException.class)
544+
.hasMessageStartingWith("Command timeout limit was exceeded");
545+
final Duration executeRpcDuration = Duration.between(startTime, Instant.now());
546+
547+
// This should have taken about 1 second to time out
548+
assertThat(executeRpcDuration)
549+
.isGreaterThanOrEqualTo(commandTimeoutDuration);
550+
551+
// Try again - we should get a reply
552+
semaphore.release(); // Ensure the device sends a response
553+
final String rpcReply = netconfSession.getRpcReply("<some-command/>");
554+
assertThat(rpcReply).isEqualTo(FAKE_RPC_REPLY);
555+
}
492556
}

0 commit comments

Comments
 (0)