From b1f9604408246a8cb83dbba2e0a20ab3548afcc7 Mon Sep 17 00:00:00 2001 From: Steve Hu Date: Sat, 8 Apr 2023 17:13:22 -0400 Subject: [PATCH] fixes #1701 simple connection pool ports from the 1.6.x (#1702) Co-authored-by: miklish --- .../client/simplepool/SimpleConnection.java | 61 +++ .../simplepool/SimpleConnectionHolder.java | 399 ++++++++++++++++++ .../simplepool/SimpleConnectionMaker.java | 33 ++ .../simplepool/SimpleConnectionPool.java | 57 +++ .../simplepool/SimpleURIConnectionPool.java | 334 +++++++++++++++ .../simplepool/mock/TestConnectionMaker.java | 103 +++++ .../client/simplepool/mock/TestRunner.java | 225 ++++++++++ .../mockexample/MockKeepAliveConnection.java | 63 +++ .../MockRandomlyClosingConnection.java | 64 +++ .../MockTimeoutLeakedConnection.java | 75 ++++ .../mockexample/TestKeepAliveConnection.java | 46 ++ .../mockexample/TestPoolSizeOverflow.java | 46 ++ .../TestRandomlyClosingConnection.java | 46 ++ .../TestTimeoutLeakedConnection.java | 46 ++ .../undertow/SimpleClientConnection.java | 58 +++ .../undertow/SimpleClientConnectionMaker.java | 198 +++++++++ .../consul/ConsulRecoveryManager.java | 9 +- .../networknt/consul/ConsulThreadMonitor.java | 22 +- .../consul/client/ConsulClientImpl.java | 22 +- 19 files changed, 1891 insertions(+), 16 deletions(-) create mode 100644 client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockKeepAliveConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockRandomlyClosingConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockTimeoutLeakedConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestKeepAliveConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestPoolSizeOverflow.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestRandomlyClosingConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestTimeoutLeakedConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java create mode 100644 client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java new file mode 100644 index 0000000000..5167b64fa7 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ + +package com.networknt.client.simplepool; + +/*** + * SimpleConnection is an interface that contains all the required functions and properties of + * a connection that are needed by the SimpleConnectionHolder, SimpleURIConnectionPool, and + * SimpleConnectionPool classes. + * + * Concrete HTTP network connections (like Undertow's ClientConnection class) should be wrapped in + * classes that implement the SimpleConnection interface. + * + */ +public interface SimpleConnection { + /** + * Tells whether or not the connection is open + * @return returns true iff the raw connection is open + */ + public boolean isOpen(); + + /** + * Returns the raw connection object. This must always be non-null + * @return returns the raw connection object + */ + public Object getRawConnection(); + + /** + * Tells whether the connection supports HTTP/2 connection multiplexing + * @return returns true iff the connection supports HTTP/2 connection multiplexing + */ + public boolean isMultiplexingSupported(); + + /** + * Returns the client side address of the connection + * @return the client side address of the connection + */ + public String getLocalAddress(); + + /** + * Safely closes the connection + */ + public void safeClose(); +} diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java new file mode 100644 index 0000000000..22191ae582 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java @@ -0,0 +1,399 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.Set; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; + +/*** + * A SimpleConnectionHolder is a simplified interface for a connection, that also keeps track of the connection's state. + * (In fact--in this document--the state of a connection and the state of its holder are used interchangeably) + * + * Connection States + * + * NOT_BORROWED_VALID - not borrowed and valid (i.e.: borrowed and not expired) + * BORROWED_VALID - borrowed and valid (i.e.: borrowed not expired) + * NOT_BORROWED_EXPIRED - not borrowed and expired + * BORROWED_EXPIRED - borrowed and expired + * CLOSED - closed + * + * BORROWABLE + * A connection is BORROWABLE if it is VALID (i.e.: it is not EXPIRED), and its borrows are below its MAX_BORROW + * + * State diagram for a connection + * + * | + * \/ + * [ NOT_BORROWED_VALID ] --(borrow)--> [ BORROWED_VALID ] + * | <-(restore)-- | + * | | + * (expire) (expire) + * | | + * \/ \/ + * [ NOT_BORROWED_EXPIRED ] <-(restore)-- [ BORROWED_EXPIRED ] + * | + * (close) (*) + * | + * \/ + * [ CLOSED ] + * + * (*) A connection can be closed explicitly by the connection pool, or it can be closed at any time by the OS + * If it is closed unexpectedly by the OS, then the state can jump directly to CLOSED regardless of what state + * it is currently in + * + * Connection Tokens + * Tokens are a mechanism used to track whether or not any threads are still using a connection object. When + * users need to borrow a connection, they are given a connection token. This token contains a reference to the + * connection as well as other metadata. + * + * When users are done with the connection, they must return the connection token to the connection pool. + * + * The correct use requires some discipline on the part of the connection pool user. Connection leaks can + * occur if a borrowed token is not returned. + * + * TODO: add a setting that sets the max time after expiry to give connections that still have unrestored tokens + * + * Time-freezing + * Calculates the state of the connection based on its internal properties at a specific point in time. + * + * Users must provide a fixed 'now' value for the current time. + * This freezes a single time value for all time-dependent properties. + * This is important when calculating an aggregate state based on the values of 2 or more time-dependent states. + * + * Not doing so (i.e.: not freezing the time) may allow inconsistent states to be reached. + */ +public final class SimpleConnectionHolder { + private static final Logger logger = LoggerFactory.getLogger(SimpleConnectionHolder.class); + + // how long a connection can be eligible to be borrowed + private final long EXPIRE_TIME; + + // the maximum number of borrowed tokens a connection can have at a time + private final int MAX_BORROWS; + + // the time this connection was created + private final long startTime; + + // the URI this connection is connected to + private final URI uri; + + /** + if true, this connection should be treated as CLOSED + note: CLOSED may be true before a connection is actually closed since there may be a delay + between setting close = false, and the network connection actually being fully closed + */ + private volatile boolean closed = false; + + /** + If the connection is HTTP/1.1, it can only be borrowed by 1 process at a time + If the connection is HTTP/2, it can be borrowed by an unlimited number of processes at a time + */ + private final SimpleConnectionMaker connectionMaker; + private final SimpleConnection connection; + + /** a Set containing all borrowed connection tokens */ + private final Set borrowedTokens = ConcurrentHashMap.newKeySet(); + + /*** + * Connections and ConnectionHolders are paired 1-1. For every connection there is a single ConnectionHolder and + * vice versa. + * + * This is why connections are created at the same time a ConnectionHolder is created (see SimpleConnectionHolder + * constructor). + * + * The connection holder acts as a simplified interface to the connection, and keeps track of how many + * processes are using it at any given time. The maximum number of processes using it at the same time + * is determined by the connections type: HTTP/1.1 (1 process at a time) or HTTP/2 (multiple processes at a time). + * + * @param expireTime how long a connection is eligible to be borrowed + * @param connectionCreateTimeout how long it can take a connection be created before an exception thrown + * @param isHttp2 if true, tries to upgrade to HTTP/2. if false, will try to open an HTTP/1.1 connection + * @param uri the URI the connection will try to connect to + * @param allCreatedConnections this Set will be passed to the callback thread that creates the connection. + * The connectionMaker will always add every successfully created connection + * to this Set. + * @param connectionMaker a class that SimpleConnectionHolder uses to create new SimpleConnection objects + */ + public SimpleConnectionHolder( + long expireTime, + long connectionCreateTimeout, + boolean isHttp2, + URI uri, + Set allCreatedConnections, + SimpleConnectionMaker connectionMaker) + { + this.connectionMaker = connectionMaker; + + this.uri = uri; + EXPIRE_TIME = expireTime; + + // for logging + long now = System.currentTimeMillis(); + + // create initial connection to uri + connection = connectionMaker.makeConnection(connectionCreateTimeout, isHttp2, uri, allCreatedConnections); + + // throw exception if connection creation failed + if(!connection.isOpen()) { + logger.debug("{} closed connection", logLabel(connection, now)); + throw new RuntimeException("[" + port(connection) + "] Error creating connection to " + uri.toString()); + + // start life-timer and determine connection type + } else { + startTime = System.currentTimeMillis(); + + // HTTP/1.1 connections have a MAX_BORROW of 1, while HTTP/2 connections can have > 1 MAX_BORROWS + MAX_BORROWS = connection().isMultiplexingSupported() ? Integer.MAX_VALUE : 1; + + logger.debug("{} New connection : {}", logLabel(connection, now), MAX_BORROWS > 1 ? "HTTP/2" : "HTTP/1.1"); + } + } + + private volatile boolean firstUse = true; + /** + * State Transition - Borrow + * + * @param connectionCreateTimeout the amount of time to wait for a connection to be created before throwing an exception + * @param now the time at which to evaluate whether there are borrowable connections or not + * @return returns a ConnectionToken representing this borrow of the connection + * @throws RuntimeException if connection closed or attempt to borrow after pool is full + */ + public synchronized ConnectionToken borrow(long connectionCreateTimeout, long now) throws RuntimeException { + /*** + * Connections can only be borrowed when the connection is in a BORROWABLE state. + * + * This will throw an IllegalStateException if borrow is called when the connection is not borrowable. + * This means that users need to check the state of the connection (i.e.: the state of the ConnectionHolder) + * before using it, e.g.: + * + * ConnectionToken connectionToken = null; + * long now = System.currentTimeMillis(); + * + * if(connectionHolder.borrowable(now)) + * connectionToken = connectionHolder.borrow(connectionCreateTimeout, now); + * + * Also note the use of a single consistent value for the current time ('now'). This ensures + * that the state returned in the 'if' statement will still be true in the 'borrow' statement + * (as long as the connection does not close between the 'if' and 'borrow'). + * + */ + ConnectionToken connectionToken; + + if(borrowable(now)) { + if (firstUse) { + firstUse = false; + connectionToken = new ConnectionToken(connection); + } else { + SimpleConnection reusedConnection = connectionMaker.reuseConnection(connectionCreateTimeout, connection); + connectionToken = new ConnectionToken(reusedConnection); + } + + // add connectionToken to the Set of borrowed tokens + borrowedTokens.add(connectionToken); + + logger.debug("{} borrow - connection now has {} borrows", logLabel(connection, now), borrowedTokens.size()); + + return connectionToken; + } + else { + if(closed()) + throw new RuntimeException("Connection was unexpectedly closed"); + else + throw new IllegalStateException("Attempt made to borrow connection outside of BORROWABLE state"); + } + } + + /** + * State Transition - Restore + * + * NOTE: A connection that unexpectedly closes may be removed from connection pool tracking before all of its + * ConnectionTokens have been restored. + * + * @param connectionToken + */ + public synchronized void restore(ConnectionToken connectionToken) { + borrowedTokens.remove(connectionToken); + + long now = System.currentTimeMillis(); + logger.debug("{} restore - connection now has {} borrows", logLabel(connection, now), borrowedTokens.size()); + } + + /** + * State Transition - Close + * + * @param now the time at which to evaluate whether this connection is closable or not + * @return true if the connection was closed and false otherwise + */ + public synchronized boolean safeClose(long now) { + logger.debug("{} close - closing connection with {} borrows...", logLabel(connection, now), borrowedTokens.size()); + + /** + Connection may still be open even if closed == true + However, for consistency, we treat the connection as closed as soon as closed == true, + even if IoUtils.safeClose(connection) has not completed closing the connection yet + */ + if(closed()) + return true; + + /** + Ensures that a connection is never closed unless the connection is in the NOT_BORROWED_EXPIRED state + This is vital to ensure that connections are never closed until after all processes that + borrowed them are no longer using them + */ + boolean notBorrowedExpired = !borrowed() && expired(now); + if(notBorrowedExpired != true) + throw new IllegalStateException(); + + closed = true; + connection.safeClose(); + return closed; + } + + /** + * State Property - isClosed + * + * @return true if the connection is closed and false otherwise + */ + public synchronized boolean closed() { + if(closed) + return closed; + + if(!connection.isOpen()) + closed = true; + + return closed; + } + + /** + * State Property - isExpired + * + * @param now the time at which to evaluate whether this connection has expired or not + * @return true if the connection has expired and false otherwise + */ + public synchronized boolean expired(long now) { + return now - startTime >= EXPIRE_TIME; + } + + /** + * State Property - isBorrowed + * + * @return true if the connection is currently borrowed and false otherwise + */ + public synchronized boolean borrowed() { + return borrowedTokens.size() > 0; + } + + /** + * State Property - isAtMaxBorrows + * + * @return true if the connection is at its maximum number of borrows, and false otherwise + */ + public synchronized boolean maxBorrowed() { + return borrowedTokens.size() >= MAX_BORROWS; + } + + /** + * State Property - isBorrowable + * + * @param now the time at which to evaluate the borrowability of this connection + * @return true if the connection is borrowable and false otherwise + */ + public synchronized boolean borrowable(long now) { + return connection.isOpen() && !expired(now) && !maxBorrowed(); + } + + /** + * Returns the SimpleConnection that SimpleConnectionHolder holds + * + * @return the SimpleConnection that SimpleConnectionHolder holds + */ + public SimpleConnection connection() { return connection; } + + public class ConnectionToken { + private final SimpleConnection connection; + private final SimpleConnectionHolder holder; + private final URI uri; + + ConnectionToken(SimpleConnection connection) { + this.connection = connection; + this.holder = SimpleConnectionHolder.this; + this.uri = SimpleConnectionHolder.this.uri; + } + + SimpleConnectionHolder holder() { return holder; } + SimpleConnection connection() { return connection; } + public Object getRawConnection() { return connection.getRawConnection(); } + public URI uri() { return uri; } + } + + /*** + * For logging + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + */ + private String logLabel(SimpleConnection connection, long now) { + return "[" + port(connection) + ": " + state(now) + "]"; + } + + /*** + * For logging + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + */ + private static String port(SimpleConnection connection) { + if(connection == null) return "NULL"; + String url = connection.getLocalAddress(); + int semiColon = url.lastIndexOf(":"); + if(semiColon == - 1) return "PORT?"; + return url.substring(url.lastIndexOf(":")+1); + } + + /*** + * For logging + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + */ + private enum State { CLOSED, BORROWABLE, NOT_BORROWABLE, NOT_BORROWED, VALID, BORROWED, EXPIRED } + private String state(long now) { + List stateList = new ArrayList<>(); + if(closed()) { stateList.add(State.CLOSED); } + if(borrowed()) { stateList.add(State.BORROWED);} else{ stateList.add(State.NOT_BORROWED);} + if(borrowable(now)) { stateList.add(State.BORROWABLE);} else{ if(!expired(now)) { stateList.add(State.NOT_BORROWABLE);}} + if(expired(now)) { stateList.add(State.EXPIRED);} /* else{ states.add(State.VALID);} */ + + StringBuilder state = new StringBuilder(); + for(int i = 0; i < stateList.size(); ++i) { + state.append(stateList.get(i)); + if(i+1 < stateList.size()) state.append(" "); + } + return stateList.size() > 0 ? state.toString() : "ILLEGAL_STATE"; + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java new file mode 100644 index 0000000000..2a9bef4542 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool; + +import java.net.URI; +import java.util.Set; + +/*** + * A factory that creates raw connections and wraps them in SimpleConnection objects. + * SimpleConnectionMakers are used by SimpleConnectionHolders to create connections. + * + */ +public interface SimpleConnectionMaker { + public SimpleConnection makeConnection(long createConnectionTimeout, boolean isHttp2, final URI uri, final Set allCreatedConnections); + public SimpleConnection reuseConnection(long createConnectionTimeout, SimpleConnection connection) throws RuntimeException; +} diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java new file mode 100644 index 0000000000..e28ff94876 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This is a class through which multiple URI Connection Pools can be accessed + */ +public final class SimpleConnectionPool { + private final Map pools = new ConcurrentHashMap<>(); + private final SimpleConnectionMaker connectionMaker; + private final long expireTime; + private final int poolSize; + + public SimpleConnectionPool(long expireTime, int poolSize, SimpleConnectionMaker connectionMaker) { + this.expireTime = expireTime; + this.poolSize = poolSize; + this.connectionMaker = connectionMaker; + } + + public SimpleConnectionHolder.ConnectionToken borrow(long createConnectionTimeout, boolean isHttp2, URI uri) + throws RuntimeException + { + if(!pools.containsKey(uri)) { + synchronized (pools) { + if (!pools.containsKey(uri)) + pools.put(uri, new SimpleURIConnectionPool(uri, expireTime, poolSize, connectionMaker)); + } + } + return pools.get(uri).borrow(createConnectionTimeout, isHttp2); + } + + public void restore(SimpleConnectionHolder.ConnectionToken connectionToken) { + if(pools.containsKey(connectionToken.uri())) + pools.get(connectionToken.uri()).restore(connectionToken); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java b/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java new file mode 100644 index 0000000000..e06ab5fb7f --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java @@ -0,0 +1,334 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; + +/*** + A connection pool for a single URI. + Connection pool contains 4 Sets of ConnectionHolders: + + 1. allCreatedConnections all connections created by connection makers are added to this set + 2. allKnownConnections: the set of all connections tracked by the connection pool + 3. Borrowable: connection that can be borrowed from + 4. Borrowed: connections that have borrowed tokens + 5. notBorrowedExpired: connections that have no borrowed tokens -- only these can be closed by the pool +*/ +public final class SimpleURIConnectionPool { + private static final Logger logger = LoggerFactory.getLogger(SimpleURIConnectionPool.class); + private final SimpleConnectionMaker connectionMaker; + private final long EXPIRY_TIME; + private final int poolSize; + private final URI uri; + + /** Connection Pool Sets + * These sets determine the mutable state of the connection pool + */ + /** The set of all connections created by the SimpleConnectionMaker for this uri */ + private final Set allCreatedConnections = ConcurrentHashMap.newKeySet(); + /** The set containing all connections known to this connection pool (It is not considered a state set) */ + private final Set allKnownConnections = new HashSet<>(); + /** + * State Sets + * The existence or non-existence of a connection in one of these sets means that the connection is or is not in + * one of these states. A connection can be in multiple state sets at a time (e.g.: a connection can be both borrowed and borrowable) */ + private final Set borrowable = new HashSet<>(); + private final Set borrowed = new HashSet<>(); + private final Set notBorrowedExpired = new HashSet<>(); + + public SimpleURIConnectionPool(URI uri, long expireTime, int poolSize, SimpleConnectionMaker connectionMaker) { + EXPIRY_TIME = expireTime; + this.uri = uri; + this.poolSize = poolSize; + this.connectionMaker = connectionMaker; + } + + /*** + * + * @param createConnectionTimeout + * @param isHttp2 + * @return + * @throws RuntimeException + */ + public synchronized SimpleConnectionHolder.ConnectionToken borrow(long createConnectionTimeout, boolean isHttp2) throws RuntimeException { + long now = System.currentTimeMillis(); + final SimpleConnectionHolder holder; + + readAllConnectionHolders(now); + + if(borrowable.size() > 0) { + holder = borrowable.toArray(new SimpleConnectionHolder[0])[ThreadLocalRandom.current().nextInt(borrowable.size())]; + } else { + if (allKnownConnections.size() < poolSize) { + holder = new SimpleConnectionHolder(EXPIRY_TIME, createConnectionTimeout, isHttp2, uri, allCreatedConnections, connectionMaker); + allKnownConnections.add(holder); + } else + throw new RuntimeException("An attempt was made to exceed the maximum size was of the " + uri.toString() + " connection pool"); + } + + SimpleConnectionHolder.ConnectionToken connectionToken = holder.borrow(createConnectionTimeout, now); + readConnectionHolder(holder, now, () -> allKnownConnections.remove(holder)); + + logger.debug(showConnections("borrow")); + + return connectionToken; + } + + /*** + * Restores borrowed connections + * + * NOTE: A connection that unexpectedly closes may be removed from connection pool tracking before all of its + * ConnectionTokens have been restored. This can result in seeing log messages about CLOSED connections + * being restored to the pool that are no longer tracked / known by the connection pool + * + * @param connectionToken the connection token that represents the borrowing of a connection by a thread + */ + public synchronized void restore(SimpleConnectionHolder.ConnectionToken connectionToken) { + if(connectionToken == null) + return; + + SimpleConnectionHolder holder = connectionToken.holder(); + long now = System.currentTimeMillis(); + + holder.restore(connectionToken); + readAllConnectionHolders(now); + + logger.debug(showConnections("restore")); + } + + /** + * A key method that orchestrates the update of the connection pool's state + * It is guaranteed to run every time a transition method is called on SimpleURIConnectionPool + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + * + * Note: + * 'knownConnectionHolders::remove' is just Java syntactic sugar for '() -> knownConnectionHolders.remove()' + * + * @param now the current time in ms + */ + private void readAllConnectionHolders(long now) + { + /** + * Sweep all known connections and update sets + * + * Remove any connections that have unexpectedly closed + * Move all remaining connections to appropriate sets based on their properties + */ + final Iterator knownConnectionHolders = allKnownConnections.iterator(); + while (knownConnectionHolders.hasNext()) { + SimpleConnectionHolder connection = knownConnectionHolders.next(); + + // remove connections that have unexpectedly closed + if (connection.closed()) { + logger.debug("[{}: CLOSED]: Connection unexpectedly closed - Removing from known-connections set", port(connection.connection())); + readConnectionHolder(connection, now, knownConnectionHolders::remove); + } + // else, move connections to correct sets + else { + readConnectionHolder(connection, now, knownConnectionHolders::remove); + + // close and remove connections if they are in a closeable set + if (notBorrowedExpired.contains(connection)) { + connection.safeClose(now); + readConnectionHolder(connection, now, knownConnectionHolders::remove); + } + } + } + + // find and close any leaked connections + findAndCloseLeakedConnections(); + } + + private interface RemoveFromAllKnownConnections { void remove(); } + /*** + * This method reads a connection and moves it to the correct sets based on its properties. + * It will also remove a connection from all sets (i.e.: stop tracking the connection) if it is closed. + * + * NOTE: Closing connections and modifying sets + * readConnectionHolder() and findAndCloseLeakedConnections() are the only two methods that close connections + * and modify sets. This can be helpful to know for debugging since the sets comprise the entirety of the + * mutable state of this SimpleURIConnectionPool objects + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + * + * @param connection + * @param now + * @param knownConnections a lambda expression to remove a closed-connection from allKnownConnections, either using + * an Iterator of allKnownConnections, or directly using allKnownConnections.remove() + */ + private void readConnectionHolder(SimpleConnectionHolder connection, long now, RemoveFromAllKnownConnections knownConnections) { + + /** + * Remove all references to closed connections + * After the connection is removed, the only reference to it will be in any unrestored ConnectionTokens, + * however, ConnectionTokens restored after the connection is closed will not be re-added to any sets + * (and will therefore be garbage collected) + */ + if(connection.closed()) { + logger.debug("[{}: CLOSED]: Connection closed - Stopping connection tracking", port(connection.connection())); + + allCreatedConnections.remove(connection.connection()); // connection.connection() returns a SimpleConnection + knownConnections.remove(); // this will remove the connection from allKnownConnections directly, or via Iterator + + borrowable.remove(connection); + borrowed.remove(connection); + notBorrowedExpired.remove(connection); + return; + } + + // if connection is open, move it to the correct state-sets based on its properties + boolean isExpired = connection.expired(now); + boolean isBorrowed = connection.borrowed(); + boolean isBorrowable = connection.borrowable(now); + boolean isNotBorrowedExpired = !isBorrowed && isExpired; + + updateSet(borrowable, isBorrowable, connection); + updateSet(borrowed, isBorrowed, connection); + updateSet(notBorrowedExpired, isNotBorrowedExpired, connection); + } + + /*** + * Takes a Set, a boolean, and a connectionHolder + * If the boolean is true, it will add the connectionHolder to the Set, otherwise, it will remove it from the Set + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + * + * @param set the set to potentially add or remove the connectionHolder from + * @param isMember if true, it will add connectionHolder to set, otherwise, it will remove connectionHolder from set + * @param connectionHolder the connectionHolder to add or remove from the set + */ + private void updateSet(Set set, boolean isMember, SimpleConnectionHolder connectionHolder) { + if(isMember) + set.add(connectionHolder); + else + set.remove(connectionHolder); + } + + /** + * Remove leaked connections + * A leaked connection is any connection that was created by a SimpleConnectionMaker, but was not returned by the + * SimpleConnectionHolder.borrow() method. This can happen if an error occurs (specifically, if an exception is + * thrown) during the creation of a SimpleConnectionHolder. A SimpleConnectionHolder can fail to instantiate + * (after it has created a new connection) if, for example: + * + * 1) the connection-creation callback thread finishes creating the connection after a timeout has occurred + * 2) the raw connection unexpectedly closes during the creation of its SimpleConnectionHolder + * + * NOTE: Closing connection and modifying sets + * readConnectionHolder() and findAndCloseLeakedConnections() are the only two methods that close connections + * and modify sets. This can be helpful to know for debugging since the sets comprise the entirety of the + * mutable state of this SimpleURIConnectionPool objects + */ + private void findAndCloseLeakedConnections() + { + // remove all connections that the connection pool is tracking, from the set of all created connections + for(SimpleConnectionHolder knownConnection: allKnownConnections) + allCreatedConnections.remove(knownConnection.connection()); + + // any remaining connections are leaks, and can now be safely closed + if(allCreatedConnections.size() > 0) { + logger.debug("{} untracked connection found", allCreatedConnections.size()); + + Iterator leakedConnections = allCreatedConnections.iterator(); + while(leakedConnections.hasNext()) { + SimpleConnection leakedConnection = leakedConnections.next(); + + if(leakedConnection.isOpen()) { + leakedConnection.safeClose(); + logger.debug("Connection closed {} -> {}", port(leakedConnection), uri.toString()); + } else + logger.debug("Connection was already closed {} -> {}", port(leakedConnection), uri.toString()); + + leakedConnections.remove(); + } + } + } + + + /*** + * For logging + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + * + * NOTE: Iteration Safety + * This method should not be used inside loops that iterate through elements of borrowable, borrowed, + * notBorrowedExpired, or allKnownConnections sets + */ + private String showConnections(String transitionName) { + return "After " + transitionName + " - " + + showConnections("BORROWABLE", borrowable) + + showConnections("BORROWED", borrowed) + + showConnections("NOT_BORROWED_EXPIRED", notBorrowedExpired) + + showConnections("TRACKED", allKnownConnections); + } + + /*** + * For logging + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + */ + private static String showConnections(String name, Set set) { + StringBuilder sb = new StringBuilder(); + sb.append("[").append(name).append(": "); + if(set.size() == 0) + sb.append("0"); + else { + int numCons = set.size(); + for (SimpleConnectionHolder holder : set) { + sb.append(port(holder.connection())); + if (--numCons > 0) sb.append(" "); + } + } + sb.append("] "); + return sb.toString(); + } + + /*** + * For logging + * + * NOTE: Thread Safety + * This method is private, and is only called either directly or transitively by synchronized + * methods in this class. + */ + private static String port(SimpleConnection connection) { + if(connection == null) return "NULL"; + String url = connection.getLocalAddress(); + int semiColon = url.lastIndexOf(":"); + if(semiColon == - 1) return "PORT?"; + return url.substring(url.lastIndexOf(":")+1); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java b/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java new file mode 100644 index 0000000000..09d9029abf --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java @@ -0,0 +1,103 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock; + +import com.networknt.client.simplepool.SimpleConnection; +import com.networknt.client.simplepool.SimpleConnectionMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.net.URI; +import java.util.Set; +import java.util.concurrent.*; + +public class TestConnectionMaker implements SimpleConnectionMaker { + private static final Logger logger = LoggerFactory.getLogger(TestConnectionMaker.class); + private Class connectionClass; + public TestConnectionMaker(Class clas) { + this.connectionClass = clas; + } + + @Override + public SimpleConnection makeConnection(long createConnectionTimeout, boolean isHttp2, URI uri, Set allConnections) + throws RuntimeException + { + SimpleConnection connection = instantiateConnection(createConnectionTimeout, isHttp2, allConnections); + return connection; + } + + @Override + public SimpleConnection reuseConnection(long createConnectionTimeout, SimpleConnection connection) throws RuntimeException { + if(connection == null) + return null; + if(!connection.isOpen()) + throw new RuntimeException("Reused-connection has been unexpectedly closed"); + return connection; + } + + private SimpleConnection instantiateConnection(long createConnectionTimeout, final boolean isHttp2, final Set allConnections) + throws RuntimeException + { + final ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(() -> { + executor.shutdown(); + + Constructor constructor = connectionClass.getConstructor(boolean.class); + SimpleConnection simpleConnection = constructor.newInstance(isHttp2); + + allConnections.add(simpleConnection); + logger.debug("allCreatedConnections: {}", logAllConnections(allConnections)); + + return simpleConnection; + }); + SimpleConnection connection; + try { + connection = future.get(createConnectionTimeout, TimeUnit.SECONDS); + } catch(Exception e) { + throw new RuntimeException("Connection creation timed-out"); + } + return connection; + } + + + /*** + * For logging + */ + private String logAllConnections(final Set allConnections) { + StringBuilder consList = new StringBuilder(); + consList.append("[ "); + for(SimpleConnection connection: allConnections) + consList.append(port(connection)).append(" "); + consList.append("]:"); + return consList.toString(); + } + + /*** + * For logging + */ + private static String port(SimpleConnection connection) { + if(connection == null) return "NULL"; + String url = connection.getLocalAddress(); + int semiColon = url.lastIndexOf(":"); + if(semiColon == - 1) return "PORT?"; + return url.substring(url.lastIndexOf(":")+1); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java b/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java new file mode 100644 index 0000000000..4e7fbd2b71 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java @@ -0,0 +1,225 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock; + +import com.networknt.client.simplepool.SimpleConnectionHolder; +import com.networknt.client.simplepool.SimpleConnectionMaker; +import com.networknt.client.simplepool.SimpleURIConnectionPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestRunner +{ + private static final Logger logger = LoggerFactory.getLogger(TestRunner.class); + + // Default Test Runner Settings + private long testLength = 120; // in seconds + private int numCallers = 2; + + // Default Connection Pool Setup + private URI uri = URI.create("https://mock-uri.com"); + private long expireTime = 10; // in seconds + private int poolSize = 100; + private Class simpleConnectionClass; + private SimpleConnectionMaker connectionMaker; + private SimpleURIConnectionPool pool; + + // Caller Thread setup + private long createConnectionTimeout = 5; // in seconds + private long borrowTime = 3; // in seconds + private long borrowJitter = 4; // in seconds + private long reconnectTime = 2; // in seconds + private long reconnectTimeJitter = 2; // in seconds + private int threadStartJitter = 3; // in seconds + private boolean isHttp2 = true; + + /** Test length in seconds. Default 120s */ + public TestRunner setTestLength(long testLength) { this.testLength = testLength; return this; } + /** Number of borrowing threads. Default 2 */ + public TestRunner setNumBorrowerThreads(int numCallers) { this.numCallers = numCallers; return this; } + /** Mock URI. Default https://mock-uri.com */ + public TestRunner setUri(URI uri) { this.uri = uri; return this; } + /** Maximum number of connections allowed in the connection pool. Default 100 */ + public TestRunner setConnectionPoolSize(int poolSize) { this.poolSize = poolSize; return this; } + /** Connection expiry time in seconds. Default 10s */ + public TestRunner setConnectionExpireTime(long expireTime) { this.expireTime = expireTime; return this; } + /** The SimpleConnection class used for connections -- must have a parameterless constructor. + * Note: executeTest() will throw an exception if this is not set. */ + public TestRunner setSimpleConnectionClass(Class simpleConnectionClass) { this.simpleConnectionClass = simpleConnectionClass; return this; } + /** Connection creation timeout in seconds. Default is 5s */ + public TestRunner setCreateConnectionTimeout(long createConnectionTimeout) { this.createConnectionTimeout = createConnectionTimeout; return this; } + /** Amount of time in seconds that borrower threads hold connections before restoring them. Default 3s */ + public TestRunner setBorrowTimeLength(long borrowTime) { this.borrowTime = borrowTime; return this; } + /** Max random additional time in seconds that borrower threads hold connections before restoring them. Default 4s */ + public TestRunner setBorrowTimeLengthJitter(long borrowJitter) { this.borrowJitter = borrowJitter; return this; } + /** Amount of time in seconds that borrower threads waits after returning a connection to borrow again. Default 2s */ + public TestRunner setWaitTimeBeforeReborrow(long reconnectTime) { this.reconnectTime = reconnectTime; return this; } + /** Max random additional time in seconds that borrower threads waits after returning a connection to borrow again. Default 2s */ + public TestRunner setWaitTimeBeforeReborrowJitter(long reconnectTimeJitter) { this.reconnectTimeJitter = reconnectTimeJitter; return this; } + /** Max random startup delay in seconds for borrower threads. Default 3s */ + public TestRunner setBorrowerThreadStartJitter(int threadStartJitter) { this.threadStartJitter = threadStartJitter; return this; } + /** Determines whether caller threads request HTTP/2 connections. HTTP/2 means multiple borrows per connection are allowed. Default true */ + public TestRunner setHttp2(boolean http2) { isHttp2 = http2; return this; } + + public void executeTest() throws RuntimeException { + if(simpleConnectionClass == null) + throw new RuntimeException("A SimpleConnection class must be set using setSimpleConnectionClass()"); + + try { + // create connection maker + connectionMaker = new TestConnectionMaker(simpleConnectionClass); + // create pool + pool = new SimpleURIConnectionPool(uri, expireTime * 1000, poolSize, connectionMaker); + + // flag used to stop threads + AtomicBoolean stopped = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(numCallers); + + logger.debug("> Creating and starting threads..."); + createAndStartCallers( + numCallers, threadStartJitter, pool, stopped, createConnectionTimeout, isHttp2, borrowTime, borrowJitter, reconnectTime, reconnectTimeJitter, latch); + logger.debug("> All threads created and started"); + + logger.debug("> SLEEP for {} seconds", testLength); + Thread.sleep(testLength * 1000); + + logger.debug("> WAKE"); + logger.debug("> Shutting down test..."); + stopped.set(true); + + logger.debug("> Thread-shutdown flag set. Waiting for threads to exit..."); + latch.await(); + + logger.debug("> Threads exited. Test completed"); + } catch (Exception e) { + logger.debug("> Test had errors", e); + } + } + + private void createAndStartCallers( + int numCallers, + int threadStartJitter, + SimpleURIConnectionPool pool, + AtomicBoolean stopped, + long createConnectionTimeout, + boolean isHttp2, + long borrowTime, + long borrowJitter, + long reconnectTime, + long reconnectTimeJitter, + CountDownLatch latch) throws InterruptedException + { + while(numCallers-- > 0) { + new CallerThread( + pool, stopped, createConnectionTimeout, isHttp2, borrowTime, borrowJitter, reconnectTime, reconnectTimeJitter, latch).start(); + if(threadStartJitter > 0) + Thread.sleep(ThreadLocalRandom.current().nextLong(threadStartJitter+1) * 1000); + } + } + + private static class CallerThread extends Thread { + private static final Logger logger = LoggerFactory.getLogger(CallerThread.class); + private final CountDownLatch latch; + private final AtomicBoolean stopped; + private final SimpleURIConnectionPool pool; + private final long createConnectionTimeout; + private final boolean isHttp2; + private final long borrowTime; + private final long borrowJitter; + private final long reconnectTime; + private final long reconnectTimeJitter; + + public CallerThread( + SimpleURIConnectionPool pool, + AtomicBoolean stopped, + long createConnectionTimeout, + boolean isHttp2, + long borrowTime, + long borrowJitter, + long reconnectTime, + long reconnectTimeJitter, + CountDownLatch latch) + { + this.latch = latch; + this.stopped = stopped; + this.pool = pool; + this.createConnectionTimeout = createConnectionTimeout; // this must be kept in seconds (not ms) + this.isHttp2 = isHttp2; + this.borrowTime = borrowTime; + this.borrowJitter = borrowJitter; + this.reconnectTime = reconnectTime; + this.reconnectTimeJitter = reconnectTimeJitter; + } + + @Override + public void run() { + logger.debug("{} Starting", Thread.currentThread().getName()); + while(!stopped.get()) { + SimpleConnectionHolder.ConnectionToken connectionToken = null; + try { + logger.debug("{} Borrowing connection", Thread.currentThread().getName()); + connectionToken = pool.borrow(createConnectionTimeout, isHttp2); + + } catch(Exception e) { + logger.debug("{} Connection issue occurred!", Thread.currentThread().getName(), e); + + } finally { + if(connectionToken != null) + borrowTime(borrowTime, borrowJitter); + + logger.debug("{} Returning connection", Thread.currentThread().getName()); + pool.restore(connectionToken); + + reborrowWaitTime(reconnectTime, reconnectTimeJitter); + } + } + latch.countDown(); + logger.debug("{} Thread exiting", Thread.currentThread().getName()); + } + + private void borrowTime(long borrowTime, long borrowJitter) { + wait("{} Borrowing connection for {} seconds...", borrowTime, borrowJitter); + } + + private void reborrowWaitTime(long reconnectTime, long reconnectTimeJitter) { + wait("{} Waiting for {} seconds to borrow connection again...", borrowTime, borrowJitter); + } + + private void wait(String logMessage, long waitTime, long waitTimeJitter) { + long waitTimeMs = waitTime * 1000; + long waitTimeJitterMs = waitTimeJitter * 1000; + try { + final long randomReconnectJitterMs; + if (waitTimeJitterMs > 0) + randomReconnectJitterMs = ThreadLocalRandom.current().nextLong(waitTimeJitterMs + 1); + else + randomReconnectJitterMs = 0; + logger.debug(logMessage, Thread.currentThread().getName(), (waitTimeMs + randomReconnectJitterMs)/1000); + Thread.sleep(waitTimeMs + randomReconnectJitterMs); + } catch(InterruptedException e) { + logger.debug("Thread interrupted", e); + } + } + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockKeepAliveConnection.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockKeepAliveConnection.java new file mode 100644 index 0000000000..a0135d6dcc --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockKeepAliveConnection.java @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.SimpleConnection; +import java.util.concurrent.ThreadLocalRandom; + +public class MockKeepAliveConnection implements SimpleConnection { + + private volatile boolean closed = false; + private boolean isHttp2 = true; + private String MOCK_ADDRESS = "MOCK_HOST_IP:" + ThreadLocalRandom.current().nextInt((int) (Math.pow(2, 15) - 1.0), (int) (Math.pow(2, 16) - 1.0)); + + /** + * This mock connection simulates a multiplexable connection that never dies unless closed + * + * Simulates a server connection with indefinite keep-alives enabled + */ + + public MockKeepAliveConnection(boolean isHttp2) { this.isHttp2 = isHttp2; } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public Object getRawConnection() { + throw new RuntimeException("Mock connection has no raw connection"); + } + + @Override + public boolean isMultiplexingSupported() { + return isHttp2; + } + + @Override + public String getLocalAddress() { + return MOCK_ADDRESS; + } + + @Override + public void safeClose() { + closed = true; + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockRandomlyClosingConnection.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockRandomlyClosingConnection.java new file mode 100644 index 0000000000..c3ca5da482 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockRandomlyClosingConnection.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.SimpleConnection; +import java.util.concurrent.ThreadLocalRandom; + +public class MockRandomlyClosingConnection implements SimpleConnection { + + private volatile boolean closed = false; + private boolean isHttp2 = true; + private String MOCK_ADDRESS = "MOCK_HOST_IP:" + ThreadLocalRandom.current().nextInt((int) (Math.pow(2, 15) - 1.0), (int) (Math.pow(2, 16) - 1.0)); + + /*** + * This mock connection simulates a multiplexable connection that has a 20% chance of taking longer than 5s + * to be created + */ + + public MockRandomlyClosingConnection(boolean isHttp2) { this.isHttp2 = isHttp2; } + + @Override + public boolean isOpen() { + if(ThreadLocalRandom.current().nextInt(20) == 0) + closed = true; + return !closed; + } + + @Override + public Object getRawConnection() { + throw new RuntimeException("Mock connection has no raw connection"); + } + + @Override + public boolean isMultiplexingSupported() { + return isHttp2; + } + + @Override + public String getLocalAddress() { + return MOCK_ADDRESS; + } + + @Override + public void safeClose() { + closed = true; + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockTimeoutLeakedConnection.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockTimeoutLeakedConnection.java new file mode 100644 index 0000000000..eb7d928546 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/MockTimeoutLeakedConnection.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.SimpleConnection; +import java.util.concurrent.ThreadLocalRandom; + +public class MockTimeoutLeakedConnection implements SimpleConnection { + + private volatile boolean closed = false; + private boolean isHttp2 = true; + private String MOCK_ADDRESS = "MOCK_HOST_IP:" + ThreadLocalRandom.current().nextInt((int) (Math.pow(2, 15) - 1.0), (int) (Math.pow(2, 16) - 1.0)); + + /*** + * This mock connection simulates a multiplexable connection that has a 20% chance of taking longer than 5s + * to be created + */ + + public MockTimeoutLeakedConnection(boolean isHttp2) { + this.isHttp2 = isHttp2; + randomCreationDelay(); + } + + private void randomCreationDelay() throws RuntimeException { + if(ThreadLocalRandom.current().nextInt(5) == 0) { + try { + Thread.sleep(10*1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public boolean isOpen() { + return !closed; + } + + @Override + public Object getRawConnection() { + throw new RuntimeException("Mock connection has no raw connection"); + } + + @Override + public boolean isMultiplexingSupported() { + return isHttp2; + } + + @Override + public String getLocalAddress() { + return MOCK_ADDRESS; + } + + @Override + public void safeClose() { + closed = true; + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestKeepAliveConnection.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestKeepAliveConnection.java new file mode 100644 index 0000000000..9678e9a7ee --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestKeepAliveConnection.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.mock.TestRunner; + +public class TestKeepAliveConnection +{ + public static void main(String[] args) { + new TestRunner() + // set connection properties + .setConnectionPoolSize(100) + .setSimpleConnectionClass(MockKeepAliveConnection.class) + .setCreateConnectionTimeout(5) + .setConnectionExpireTime(5) + .setHttp2(false) + + // configure borrower-thread properties + .setNumBorrowerThreads(4) + .setBorrowerThreadStartJitter(0) + .setBorrowTimeLength(5) + .setBorrowTimeLengthJitter(5) + .setWaitTimeBeforeReborrow(2) + .setWaitTimeBeforeReborrowJitter(2) + + .setTestLength(10*60) + .executeTest(); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestPoolSizeOverflow.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestPoolSizeOverflow.java new file mode 100644 index 0000000000..a888cbda80 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestPoolSizeOverflow.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.mock.TestRunner; + +public class TestPoolSizeOverflow +{ + public static void main(String[] args) { + new TestRunner() + // set connection properties + .setConnectionPoolSize(7) + .setSimpleConnectionClass(MockKeepAliveConnection.class) + .setCreateConnectionTimeout(5) + .setConnectionExpireTime(5) + .setHttp2(false) + + // configure borrower-thread properties + .setNumBorrowerThreads(8) + .setBorrowerThreadStartJitter(0) + .setBorrowTimeLength(2) + .setBorrowTimeLengthJitter(8) + .setWaitTimeBeforeReborrow(1) + .setWaitTimeBeforeReborrowJitter(1) + + .setTestLength(10*60) + .executeTest(); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestRandomlyClosingConnection.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestRandomlyClosingConnection.java new file mode 100644 index 0000000000..301e217397 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestRandomlyClosingConnection.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.mock.TestRunner; + +public class TestRandomlyClosingConnection +{ + public static void main(String[] args) { + new TestRunner() + // set connection properties + .setConnectionPoolSize(100) + .setSimpleConnectionClass(MockRandomlyClosingConnection.class) + .setCreateConnectionTimeout(5) + .setConnectionExpireTime(5) + .setHttp2(true) + + // configure borrower-thread properties + .setNumBorrowerThreads(8) + .setBorrowerThreadStartJitter(3) + .setBorrowTimeLength(5) + .setBorrowTimeLengthJitter(5) + .setWaitTimeBeforeReborrow(2) + .setWaitTimeBeforeReborrowJitter(2) + + .setTestLength(10*60) + .executeTest(); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestTimeoutLeakedConnection.java b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestTimeoutLeakedConnection.java new file mode 100644 index 0000000000..5140765737 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/mock/mockexample/TestTimeoutLeakedConnection.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.mock.mockexample; + +import com.networknt.client.simplepool.mock.TestRunner; + +public class TestTimeoutLeakedConnection +{ + public static void main(String[] args) { + new TestRunner() + // set connection properties + .setConnectionPoolSize(100) + .setSimpleConnectionClass(MockTimeoutLeakedConnection.class) + .setCreateConnectionTimeout(5) + .setConnectionExpireTime(5) + .setHttp2(true) + + // configure borrower-thread properties + .setNumBorrowerThreads(8) + .setBorrowerThreadStartJitter(3) + .setBorrowTimeLength(5) + .setBorrowTimeLengthJitter(5) + .setWaitTimeBeforeReborrow(2) + .setWaitTimeBeforeReborrowJitter(2) + + .setTestLength(10*60) + .executeTest(); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java new file mode 100644 index 0000000000..a75b8081d0 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.undertow; + +import com.networknt.client.simplepool.SimpleConnection; +import io.undertow.client.ClientConnection; +import org.xnio.IoUtils; + +public class SimpleClientConnection implements SimpleConnection { + private ClientConnection connection; + + public SimpleClientConnection(ClientConnection connection) { + this.connection = connection; + } + + @Override + public boolean isOpen() { + return connection.isOpen(); + } + + @Override + public Object getRawConnection() { + return connection; + } + + @Override + public boolean isMultiplexingSupported() { + return connection.isMultiplexingSupported(); + } + + @Override + public String getLocalAddress() { + return connection.getLocalAddress().toString(); + } + + @Override + public void safeClose() { + if(connection.isOpen()) + IoUtils.safeClose(connection); + } +} diff --git a/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java new file mode 100644 index 0000000000..7ebb10cba3 --- /dev/null +++ b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java @@ -0,0 +1,198 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @author miklish Michael N. Christoff + * + * testing / QA + * AkashWorkGit + * jaydeepparekh1311 + */ +package com.networknt.client.simplepool.undertow; + +import com.networknt.client.ClientConfig; +import com.networknt.client.Http2Client; +import com.networknt.client.simplepool.SimpleConnection; +import com.networknt.client.simplepool.SimpleConnectionMaker; +import io.undertow.Undertow; +import io.undertow.UndertowOptions; +import io.undertow.client.ClientCallback; +import io.undertow.client.ClientConnection; +import io.undertow.client.UndertowClient; +import io.undertow.connector.ByteBufferPool; +import io.undertow.protocols.ssl.UndertowXnioSsl; +import io.undertow.server.DefaultByteBufferPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xnio.*; +import org.xnio.ssl.XnioSsl; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class SimpleClientConnectionMaker implements SimpleConnectionMaker +{ + private static final Logger logger = LoggerFactory.getLogger(SimpleClientConnectionMaker.class); + private static final ByteBufferPool BUFFER_POOL = new DefaultByteBufferPool(true, ClientConfig.get().getBufferSize() * 1024); + private static SimpleClientConnectionMaker simpleClientConnectionMaker = null; + + public static SimpleConnectionMaker instance() { + if(simpleClientConnectionMaker == null) + simpleClientConnectionMaker = new SimpleClientConnectionMaker(); + return simpleClientConnectionMaker; + } + + @Override + public SimpleConnection makeConnection( + long createConnectionTimeout, + boolean isHttp2, + final URI uri, + final Set allCreatedConnections) throws RuntimeException + { + boolean isHttps = uri.getScheme().equalsIgnoreCase("https"); + XnioSsl ssl = getSSL(isHttps, isHttp2); + XnioWorker worker = getWorker(isHttp2); + OptionMap connectionOptions = getConnectionOptions(isHttp2); + InetSocketAddress bindAddress = null; + + final FutureResult result = new FutureResult<>(); + ClientCallback connectionCallback = new ClientCallback() { + @Override + public void completed(ClientConnection connection) { + logger.debug("New connection {} established with {}", port(connection), uri); + SimpleConnection simpleConnection = new SimpleClientConnection(connection); + + // note: its vital that allCreatedConnections and result contain the same SimpleConnection reference + allCreatedConnections.add(simpleConnection); + result.setResult(simpleConnection); + } + + @Override + public void failed(IOException e) { + logger.debug("Failed to establish new connection for uri: {}", uri); + result.setException(e); + } + }; + + UndertowClient undertowClient = UndertowClient.getInstance(); + undertowClient.connect(connectionCallback, bindAddress, uri, worker, ssl, BUFFER_POOL, connectionOptions); + + IoFuture future = result.getIoFuture(); + return safeConnect(createConnectionTimeout, future); + } + + public SimpleConnection reuseConnection(long createConnectionTimeout, SimpleConnection connection) throws RuntimeException + { + if(connection == null) + return null; + + if(!(connection.getRawConnection() instanceof ClientConnection)) + throw new IllegalArgumentException("Attempt to reuse wrong connection type. Must be of type ClientConnection"); + + if(!connection.isOpen()) + throw new RuntimeException("Reused-connection has been unexpectedly closed"); + + return connection; + } + + // PRIVATE METHODS + + private static OptionMap getConnectionOptions(boolean isHttp2) { + return isHttp2 ? OptionMap.create(UndertowOptions.ENABLE_HTTP2, true) : OptionMap.EMPTY; + } + + // TODO: Should worker be re-used? Note: Light-4J Http2Client re-uses it + private static AtomicReference WORKER = new AtomicReference<>(null); + private static XnioWorker getWorker(boolean isHttp2) + { + if(WORKER.get() != null) return WORKER.get(); + + Xnio xnio = Xnio.getInstance(Undertow.class.getClassLoader()); + try { + // if WORKER is null, then set new WORKER otherwise leave existing WORKER + WORKER.compareAndSet(null, xnio.createWorker(null, getWorkerOptionMap(isHttp2))); + + } catch (IOException e) { + throw new RuntimeException(e); + } + return WORKER.get(); + } + + private static OptionMap getWorkerOptionMap(boolean isHttp2) + { + OptionMap.Builder optionBuild = OptionMap.builder() + .set(Options.WORKER_IO_THREADS, 8) + .set(Options.TCP_NODELAY, true) + .set(Options.KEEP_ALIVE, true) + .set(Options.WORKER_NAME, isHttp2 ? "Callback-HTTP2" : "Callback-HTTP11"); + return optionBuild.getMap(); + } + + // TODO: Should SSL be re-used? Note: Light-4J Http2Client re-uses it + private static AtomicReference SSL = new AtomicReference<>(null); + private static XnioSsl getSSL(boolean isHttps, boolean isHttp2) + { + if(!isHttps) + return null; + if(SSL.get() != null) + return SSL.get(); + + try { + // TODO: Should this be OptionMap.EMPTY ?? + // if SSL is null, then set new SSL otherwise leave existing SSL + SSL.compareAndSet( + null, + new UndertowXnioSsl(getWorker(isHttp2).getXnio(), OptionMap.EMPTY, BUFFER_POOL, Http2Client.createSSLContext())); + } catch (Exception e) { + logger.error("Exception while creating new shared UndertowXnioSsl used to create connections", e); + throw new RuntimeException(e); + } + return SSL.get(); + } + + /*** + * Never returns null + * + * @param timeoutSeconds + * @param future + * @return + */ + private static SimpleConnection safeConnect(long timeoutSeconds, IoFuture future) + { + SimpleConnection connection = null; + + if(future.await(timeoutSeconds, TimeUnit.SECONDS) != org.xnio.IoFuture.Status.DONE) + throw new RuntimeException("Connection establishment timed out"); + + try { + connection = future.get(); + } catch (IOException e) { + throw new RuntimeException("Connection establishment generated I/O exception", e); + } + + if(connection == null) + throw new RuntimeException("Connection establishment failed (null) - Full connection terminated"); + + return connection; + } + + public static String port(ClientConnection connection) { + if(connection == null) return "NULL"; + String url = connection.getLocalAddress().toString(); + int semiColon = url.lastIndexOf(":"); + if(semiColon == - 1) return "PORT?"; + return url.substring(url.lastIndexOf(":")+1); + } +} diff --git a/consul/src/main/java/com/networknt/consul/ConsulRecoveryManager.java b/consul/src/main/java/com/networknt/consul/ConsulRecoveryManager.java index 22f5f1b467..f0d36b3fb8 100644 --- a/consul/src/main/java/com/networknt/consul/ConsulRecoveryManager.java +++ b/consul/src/main/java/com/networknt/consul/ConsulRecoveryManager.java @@ -25,11 +25,12 @@ public ConsulRecoveryManager(String serviceName) { startConsulThreadMonitor(); } - private static synchronized void startConsulThreadMonitor() { + private static void startConsulThreadMonitor() { if(monitorThreadStarted.get()) return; - monitorThreadStarted.set(true); - logger.debug("Starting Consul Thread Monitor..."); - consulThreadMonitor.start(); + if(monitorThreadStarted.compareAndSet(false, true)) { + logger.debug("Starting Consul Thread Monitor..."); + consulThreadMonitor.start(); + } } /** diff --git a/consul/src/main/java/com/networknt/consul/ConsulThreadMonitor.java b/consul/src/main/java/com/networknt/consul/ConsulThreadMonitor.java index 078e9a6db1..9f557edae9 100644 --- a/consul/src/main/java/com/networknt/consul/ConsulThreadMonitor.java +++ b/consul/src/main/java/com/networknt/consul/ConsulThreadMonitor.java @@ -10,33 +10,35 @@ public class ConsulThreadMonitor extends Thread { private static final Logger logger = LoggerFactory.getLogger(ConsulThreadMonitor.class); private static final ConsulConfig config = (ConsulConfig) Config.getInstance().getJsonObjectConfig(ConsulConstants.CONFIG_NAME, ConsulConfig.class); - private final ConcurrentHashMap heartbeats; + private final ConcurrentHashMap checkins; private boolean shutdownIfThreadFrozen = config.isShutdownIfThreadFrozen(); private static final long WAIT_S = ConsulUtils.getWaitInSecond(config.getWait()); private static final long TIMEOUT_BUFFER_S = ConsulUtils.getTimeoutBufferInSecond(config.getTimeoutBuffer()); private static final long LOOKUP_INTERVAL_S = config.getLookupInterval(); - private static final long MAX_TIME_BETWEEN_BEATS_MS = 2 * 1000 * ( LOOKUP_INTERVAL_S + WAIT_S + TIMEOUT_BUFFER_S ); + // MIN_TIME_BETWEEN_CHECKINS_MS accounts for queue-wait time to enter connection pool synchronized methods (for up to 12 queued threads) + private static final long MIN_TIME_BETWEEN_CHECKINS_MS = 12 * 10 * 1000; + private static final long MAX_TIME_BETWEEN_CHECKINS_MS = Math.max(2 * 1000 * ( LOOKUP_INTERVAL_S + WAIT_S + TIMEOUT_BUFFER_S ), MIN_TIME_BETWEEN_CHECKINS_MS); - public ConsulThreadMonitor(final ConcurrentHashMap heartbeats) { - this.heartbeats = heartbeats; + public ConsulThreadMonitor(final ConcurrentHashMap checkins) { + this.checkins = checkins; } public void run() { long now; while(true) { try { - Thread.sleep(MAX_TIME_BETWEEN_BEATS_MS); + Thread.sleep(MAX_TIME_BETWEEN_CHECKINS_MS); now = System.currentTimeMillis(); - for(Map.Entry beat : heartbeats.entrySet()) { - if(now - beat.getValue().longValue() > MAX_TIME_BETWEEN_BEATS_MS) { + for(Map.Entry checkin : checkins.entrySet()) { + if(now - checkin.getValue().longValue() > MAX_TIME_BETWEEN_CHECKINS_MS) { if(shutdownIfThreadFrozen) { - logger.error("Service {} has missed its check in... Shutting down host...", beat.getKey()); + logger.error("Service {} has missed its check in... Shutting down host...", checkin.getKey()); ConsulRecoveryManager.gracefulShutdown(); } else - logger.error("Service {} has missed its check in - Please restart host", beat.getKey()); + logger.error("Service {} has missed its check in - Please restart host", checkin.getKey()); } else - logger.debug("Service {} checked in on time", beat.getKey()); + logger.debug("Service {} checked in on time", checkin.getKey()); } } catch (InterruptedException i) { logger.error("Consul Monitor Thread Interrupted", i); } catch (Exception e) { logger.error("Consul Monitor Thread Exception", e); } diff --git a/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java b/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java index e6ca255dfe..c98a32db2a 100644 --- a/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java +++ b/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java @@ -17,6 +17,7 @@ package com.networknt.consul.client; import com.fasterxml.jackson.core.type.TypeReference; +import com.networknt.client.ClientConfig; import com.networknt.client.Http2Client; import com.networknt.config.Config; import com.networknt.consul.*; @@ -46,6 +47,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +// use SimpleURIConnectionPool as the connection pool +import com.networknt.client.simplepool.SimpleURIConnectionPool; +import com.networknt.client.simplepool.SimpleConnectionHolder; +// Use Undertow ClientConnection as raw connection +import com.networknt.client.simplepool.SimpleConnectionMaker; +import com.networknt.client.simplepool.undertow.SimpleClientConnectionMaker; + /** * A client that talks to Consul agent with REST API. * Client and connection are cached as instance variable in singleton class. @@ -63,6 +71,9 @@ public class ConsulClientImpl implements ConsulClient { private String wait = "600s"; private String timeoutBuffer = "5s"; + // connection pool + private SimpleURIConnectionPool pool = null; + /** * Construct ConsulClient with all parameters from consul.yml config file. The other two constructors are * just for backward compatibility. @@ -81,6 +92,11 @@ public ConsulClientImpl() { logger.error("Invalid URI " + consulUrl, e); throw new RuntimeException("Invalid URI " + consulUrl, e); } + + // create SimpleURIConnection pool + SimpleConnectionMaker undertowConnectionMaker = SimpleClientConnectionMaker.instance(); + pool = new SimpleURIConnectionPool( + uri, ClientConfig.get().getConnectionExpireTime(), ClientConfig.get().getConnectionPoolSize(), undertowConnectionMaker); } @Override @@ -194,10 +210,12 @@ public ConsulResponse> lookupHealthService(String serviceNam } logger.trace("Consul health service path = {}", path); + SimpleConnectionHolder.ConnectionToken connectionToken = null; try { if(logger.isDebugEnabled()) logger.debug("Getting connection from pool with {}", uri); // this will throw a Runtime Exception if creation of Consul connection fails - connection = client.borrowConnection(config.getConnectionTimeout(), uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap); + connectionToken = pool.borrow(config.getConnectionTimeout(), isHttp2()); + connection = (ClientConnection) connectionToken.getRawConnection(); if(logger.isDebugEnabled()) logger.debug("CONSUL CONNECTION ESTABLISHED: {} from pool and send request to {}", connection, path); AtomicReference reference = send(connection, Methods.GET, path, token, null); @@ -263,7 +281,7 @@ public ConsulResponse> lookupHealthService(String serviceNam return null; } finally { - client.returnConnection(connection); + pool.restore(connectionToken); } return newResponse;