4040import com .clickhouse .data .ClickHouseDataType ;
4141import com .clickhouse .data .ClickHouseFormat ;
4242import com .google .common .collect .ImmutableList ;
43- import com .google .common .collect .ImmutableSet ;
4443import net .jpountz .lz4 .LZ4Factory ;
4544import org .apache .hc .core5 .concurrent .DefaultThreadFactory ;
4645import org .apache .hc .core5 .http .ClassicHttpResponse ;
5655import java .lang .reflect .InvocationTargetException ;
5756import java .net .MalformedURLException ;
5857import java .net .URL ;
59- import java .nio .charset .StandardCharsets ;
6058import java .time .Duration ;
6159import java .time .ZoneId ;
6260import java .time .temporal .ChronoUnit ;
@@ -186,7 +184,6 @@ private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
186184 }
187185
188186 this .endpoints = tmpEndpoints .build ();
189- this .httpClientHelper = new HttpAPIClientHelper (this .configuration , metricsRegistry , initSslContext );
190187
191188 String retry = configuration .get (ClientConfigProperties .RETRY_ON_FAILURE .getKey ());
192189 this .retries = retry == null ? 0 : Integer .parseInt (retry );
@@ -197,6 +194,7 @@ private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
197194 this .lz4Factory = LZ4Factory .fastestJavaInstance ();
198195 }
199196
197+ this .httpClientHelper = new HttpAPIClientHelper (this .configuration , metricsRegistry , initSslContext , lz4Factory );
200198 this .serverVersion = configuration .getOrDefault (ClientConfigProperties .SERVER_VERSION .getKey (), "unknown" );
201199 this .dbUser = configuration .getOrDefault (ClientConfigProperties .USER .getKey (), ClientConfigProperties .USER .getDefObjVal ());
202200 this .typeHintMapping = (Map <ClickHouseDataType , Class <?>>) this .configuration .get (ClientConfigProperties .TYPE_HINT_MAPPING .getKey ());
@@ -1073,6 +1071,20 @@ public Builder sslSocketSNI(String sni) {
10731071 return this ;
10741072 }
10751073
1074+ /**
1075+ * Make sending statement parameters as HTTP Form data (in the body of a request).
1076+ * Note: work only with Server side compression. If client compression is enabled it will be disabled
1077+ * for query requests with parameters. It is because each parameter is sent as part of multipart content
1078+ * what would require compressions of them separately.
1079+ *
1080+ * @param enable - if feature enabled
1081+ * @return this builder instance
1082+ */
1083+ public Builder useHttpFormDataForQuery (boolean enable ) {
1084+ this .configuration .put (ClientConfigProperties .HTTP_SEND_PARAMS_IN_BODY .getKey (), String .valueOf (enable ));
1085+ return this ;
1086+ }
1087+
10761088 public Client build () {
10771089 // check if endpoint are empty. so can not initiate client
10781090 if (this .endpoints .isEmpty ()) {
@@ -1279,7 +1291,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
12791291 for (int i = 0 ; i <= maxRetries ; i ++) {
12801292 // Execute request
12811293 try (ClassicHttpResponse httpResponse =
1282- httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (), lz4Factory ,
1294+ httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (),
12831295 out -> {
12841296 out .write ("INSERT INTO " .getBytes ());
12851297 out .write (tableName .getBytes ());
@@ -1496,7 +1508,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
14961508 for (int i = 0 ; i <= retries ; i ++) {
14971509 // Execute request
14981510 try (ClassicHttpResponse httpResponse =
1499- httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (), lz4Factory ,
1511+ httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (),
15001512 out -> {
15011513 writer .onOutput (out );
15021514 out .close ();
@@ -1607,25 +1619,27 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
16071619 ClientStatisticsHolder clientStats = new ClientStatisticsHolder ();
16081620 clientStats .start (ClientMetrics .OP_DURATION );
16091621
1610- Supplier <QueryResponse > responseSupplier ;
1622+ if (queryParams != null ) {
1623+ requestSettings .setOption (HttpAPIClientHelper .KEY_STATEMENT_PARAMS , queryParams );
1624+ }
16111625
1612- if (queryParams != null ) {
1613- requestSettings .setOption (HttpAPIClientHelper .KEY_STATEMENT_PARAMS , queryParams );
1614- }
1615- responseSupplier = () -> {
1626+ Supplier <QueryResponse > responseSupplier = () -> {
16161627 long startTime = System .nanoTime ();
16171628 // Selecting some node
16181629 Endpoint selectedEndpoint = getNextAliveNode ();
16191630 RuntimeException lastException = null ;
16201631 for (int i = 0 ; i <= retries ; i ++) {
16211632 ClassicHttpResponse httpResponse = null ;
16221633 try {
1623- httpResponse =
1624- httpClientHelper .executeRequest (selectedEndpoint , requestSettings .getAllSettings (), lz4Factory , output -> {
1625- output .write (sqlQuery .getBytes (StandardCharsets .UTF_8 ));
1626- output .close ();
1627- });
1628-
1634+ boolean useMultipart = ClientConfigProperties .HTTP_SEND_PARAMS_IN_BODY .getOrDefault (requestSettings .getAllSettings ());
1635+ if (queryParams != null && useMultipart ) {
1636+ httpResponse = httpClientHelper .executeMultiPartRequest (selectedEndpoint ,
1637+ requestSettings .getAllSettings (), sqlQuery );
1638+ } else {
1639+ httpResponse = httpClientHelper .executeRequest (selectedEndpoint ,
1640+ requestSettings .getAllSettings (),
1641+ sqlQuery );
1642+ }
16291643 // Check response
16301644 if (httpResponse .getCode () == HttpStatus .SC_SERVICE_UNAVAILABLE ) {
16311645 LOG .warn ("Failed to get response. Server returned {}. Retrying. (Duration: {})" , httpResponse .getCode (), durationSince (startTime ));
0 commit comments