13
13
import com .clickhouse .client .api .data_formats .internal .SerializerUtils ;
14
14
import com .clickhouse .client .api .enums .ProxyType ;
15
15
import com .clickhouse .client .api .http .ClickHouseHttpProto ;
16
+ import io .micrometer .core .annotation .Timed ;
17
+ import org .apache .hc .client5 .http .AuthenticationStrategy ;
16
18
import org .apache .hc .client5 .http .ConnectTimeoutException ;
19
+ import org .apache .hc .client5 .http .classic .ExecChain ;
20
+ import org .apache .hc .client5 .http .classic .ExecChainHandler ;
17
21
import org .apache .hc .client5 .http .classic .methods .HttpPost ;
18
22
import org .apache .hc .client5 .http .config .ConnectionConfig ;
19
23
import org .apache .hc .client5 .http .config .RequestConfig ;
24
+ import org .apache .hc .client5 .http .impl .ChainElement ;
25
+ import org .apache .hc .client5 .http .impl .DefaultAuthenticationStrategy ;
26
+ import org .apache .hc .client5 .http .impl .DefaultClientConnectionReuseStrategy ;
27
+ import org .apache .hc .client5 .http .impl .DefaultSchemePortResolver ;
20
28
import org .apache .hc .client5 .http .impl .classic .CloseableHttpClient ;
29
+ import org .apache .hc .client5 .http .impl .classic .ConnectExec ;
21
30
import org .apache .hc .client5 .http .impl .classic .HttpClientBuilder ;
22
31
import org .apache .hc .client5 .http .impl .io .BasicHttpClientConnectionManager ;
23
32
import org .apache .hc .client5 .http .impl .io .ManagedHttpClientConnectionFactory ;
24
33
import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManager ;
25
34
import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManagerBuilder ;
26
35
import org .apache .hc .client5 .http .io .HttpClientConnectionManager ;
36
+ import org .apache .hc .client5 .http .io .ManagedHttpClientConnection ;
27
37
import org .apache .hc .client5 .http .protocol .HttpClientContext ;
28
38
import org .apache .hc .client5 .http .socket .ConnectionSocketFactory ;
29
39
import org .apache .hc .client5 .http .socket .LayeredConnectionSocketFactory ;
30
40
import org .apache .hc .client5 .http .socket .PlainConnectionSocketFactory ;
31
41
import org .apache .hc .client5 .http .ssl .SSLConnectionSocketFactory ;
42
+ import org .apache .hc .core5 .http .ClassicHttpRequest ;
32
43
import org .apache .hc .core5 .http .ClassicHttpResponse ;
33
44
import org .apache .hc .core5 .http .ConnectionRequestTimeoutException ;
34
45
import org .apache .hc .core5 .http .ContentType ;
35
46
import org .apache .hc .core5 .http .Header ;
36
47
import org .apache .hc .core5 .http .HttpEntity ;
48
+ import org .apache .hc .core5 .http .HttpException ;
37
49
import org .apache .hc .core5 .http .HttpHeaders ;
38
50
import org .apache .hc .core5 .http .HttpHost ;
39
51
import org .apache .hc .core5 .http .HttpRequest ;
45
57
import org .apache .hc .core5 .http .impl .io .DefaultHttpResponseParserFactory ;
46
58
import org .apache .hc .core5 .http .io .SocketConfig ;
47
59
import org .apache .hc .core5 .http .io .entity .EntityTemplate ;
60
+ import org .apache .hc .core5 .http .protocol .DefaultHttpProcessor ;
48
61
import org .apache .hc .core5 .http .protocol .HttpContext ;
62
+ import org .apache .hc .core5 .http .protocol .RequestTargetHost ;
63
+ import org .apache .hc .core5 .http .protocol .RequestUserAgent ;
49
64
import org .apache .hc .core5 .io .CloseMode ;
50
65
import org .apache .hc .core5 .io .IOCallback ;
51
66
import org .apache .hc .core5 .net .URIBuilder ;
81
96
import java .util .Objects ;
82
97
import java .util .Properties ;
83
98
import java .util .Set ;
99
+ import java .util .concurrent .ConcurrentLinkedQueue ;
84
100
import java .util .concurrent .TimeUnit ;
101
+ import java .util .concurrent .atomic .AtomicLong ;
85
102
import java .util .function .Function ;
86
103
87
104
public class HttpAPIClientHelper {
@@ -219,7 +236,7 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
219
236
220
237
221
238
int networkBufferSize = MapUtils .getInt (chConfiguration , "client_network_buffer_size" );
222
- ManagedHttpClientConnectionFactory connectionFactory = new ManagedHttpClientConnectionFactory (
239
+ MeteredManagedHttpClientConnectionFactory connectionFactory = new MeteredManagedHttpClientConnectionFactory (
223
240
Http1Config .custom ()
224
241
.setBufferSize (networkBufferSize )
225
242
.build (),
@@ -238,6 +255,9 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
238
255
Class <?> micrometerLoader = getClass ().getClassLoader ().loadClass ("com.clickhouse.client.api.metrics.MicrometerLoader" );
239
256
Method applyMethod = micrometerLoader .getDeclaredMethod ("applyPoolingMetricsBinder" , Object .class , String .class , PoolingHttpClientConnectionManager .class );
240
257
applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , phccm );
258
+
259
+ applyMethod = micrometerLoader .getDeclaredMethod ("applyConnectionMetricsBinder" , Object .class , String .class , MeteredManagedHttpClientConnectionFactory .class );
260
+ applyMethod .invoke (micrometerLoader , metricsRegistry , mGroupName , connectionFactory );
241
261
} catch (Exception e ) {
242
262
LOG .error ("Failed to register metrics" , e );
243
263
}
@@ -758,7 +778,6 @@ public void close() {
758
778
httpClient .close (CloseMode .IMMEDIATE );
759
779
}
760
780
761
-
762
781
/**
763
782
* This factory is used only when no ssl connections are required (no https endpoints).
764
783
* Internally http client would create factory and spend time if no supplied.
@@ -779,4 +798,34 @@ public Socket connectSocket(TimeValue connectTimeout, Socket socket, HttpHost ho
779
798
return null ;
780
799
}
781
800
}
801
+
802
+ public class MeteredManagedHttpClientConnectionFactory extends ManagedHttpClientConnectionFactory {
803
+ public MeteredManagedHttpClientConnectionFactory (Http1Config http1Config , CharCodingConfig charCodingConfig , DefaultHttpResponseParserFactory defaultHttpResponseParserFactory ) {
804
+ super (http1Config , charCodingConfig , defaultHttpResponseParserFactory );
805
+ }
806
+
807
+ ConcurrentLinkedQueue <Long > times = new ConcurrentLinkedQueue <>();
808
+
809
+
810
+ @ Override
811
+ public ManagedHttpClientConnection createConnection (Socket socket ) throws IOException {
812
+ long startT = System .currentTimeMillis ();
813
+ try {
814
+ return super .createConnection (socket );
815
+ } finally {
816
+ long endT = System .currentTimeMillis ();
817
+ times .add (endT - startT );
818
+ }
819
+ }
820
+
821
+ public long getTime () {
822
+ int count = times .size ();
823
+ long runningAverage = 0 ;
824
+ for (int i = 0 ; i < count ; i ++) {
825
+ runningAverage += times .poll ();
826
+ }
827
+
828
+ return count > 0 ? runningAverage / count : 0 ;
829
+ }
830
+ }
782
831
}
0 commit comments