29
29
import org .apache .http .entity .StringEntity ;
30
30
import org .apache .http .impl .client .CloseableHttpClient ;
31
31
import org .apache .http .impl .client .HttpClients ;
32
+ import org .apache .http .impl .conn .PoolingHttpClientConnectionManager ;
32
33
import org .slf4j .Logger ;
33
34
import org .slf4j .LoggerFactory ;
34
35
50
51
*/
51
52
public class AsyncEventHandler implements EventHandler , Closeable {
52
53
54
+ // The following static values are public so that they can be tweaked if necessary.
55
+ // These are the recommended settings for http protocol. https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
56
+ // The maximum number of connections allowed across all routes.
57
+ private int maxTotalConnections = 200 ;
58
+ // The maximum number of connections allowed for a route
59
+ private int maxPerRoute = 20 ;
60
+ // Defines period of inactivity in milliseconds after which persistent connections must be re-validated prior to being leased to the consumer.
61
+ private int validateAfterInactivity = 5000 ;
62
+
53
63
private static final Logger logger = LoggerFactory .getLogger (AsyncEventHandler .class );
54
64
private static final ProjectConfigResponseHandler EVENT_RESPONSE_HANDLER = new ProjectConfigResponseHandler ();
55
65
@@ -58,13 +68,22 @@ public class AsyncEventHandler implements EventHandler, Closeable {
58
68
private final BlockingQueue <LogEvent > logEventQueue ;
59
69
60
70
public AsyncEventHandler (int queueCapacity , int numWorkers ) {
71
+ this (queueCapacity , numWorkers , 200 , 20 , 5000 );
72
+ }
73
+
74
+ public AsyncEventHandler (int queueCapacity , int numWorkers , int maxConnections , int connectionsPerRoute , int validateAfter ) {
61
75
if (queueCapacity <= 0 ) {
62
76
throw new IllegalArgumentException ("queue capacity must be > 0" );
63
77
}
64
78
79
+ this .maxTotalConnections = maxConnections ;
80
+ this .maxPerRoute = connectionsPerRoute ;
81
+ this .validateAfterInactivity = validateAfter ;
82
+
65
83
this .logEventQueue = new ArrayBlockingQueue <LogEvent >(queueCapacity );
66
84
this .httpClient = HttpClients .custom ()
67
85
.setDefaultRequestConfig (HttpClientUtils .DEFAULT_REQUEST_CONFIG )
86
+ .setConnectionManager (poolingHttpClientConnectionManager ())
68
87
.disableCookieManagement ()
69
88
.build ();
70
89
@@ -78,6 +97,15 @@ public AsyncEventHandler(int queueCapacity, int numWorkers) {
78
97
}
79
98
}
80
99
100
+ private PoolingHttpClientConnectionManager poolingHttpClientConnectionManager ()
101
+ {
102
+ PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager ();
103
+ poolingHttpClientConnectionManager .setMaxTotal (maxTotalConnections );
104
+ poolingHttpClientConnectionManager .setDefaultMaxPerRoute (maxPerRoute );
105
+ poolingHttpClientConnectionManager .setValidateAfterInactivity (validateAfterInactivity );
106
+ return poolingHttpClientConnectionManager ;
107
+ }
108
+
81
109
@ Override
82
110
public void dispatchEvent (LogEvent logEvent ) {
83
111
// attempt to enqueue the log event for processing
0 commit comments