Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
Expand All @@ -38,24 +35,24 @@ public class Connection {
public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions";
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);

private final PinotClientTransport _transport;
private final PinotClientTransport<?> _transport;
private final BrokerSelector _brokerSelector;
private final boolean _failOnExceptions;

Connection(List<String> brokerList, PinotClientTransport transport) {
Connection(List<String> brokerList, PinotClientTransport<?> transport) {
this(new Properties(), new SimpleBrokerSelector(brokerList), transport);
}

Connection(Properties properties, List<String> brokerList, PinotClientTransport transport) {
Connection(Properties properties, List<String> brokerList, PinotClientTransport<?> transport) {
this(properties, new SimpleBrokerSelector(brokerList), transport);
LOGGER.info("Created connection to broker list {}", brokerList);
}

Connection(BrokerSelector brokerSelector, PinotClientTransport transport) {
Connection(BrokerSelector brokerSelector, PinotClientTransport<?> transport) {
this(new Properties(), brokerSelector, transport);
}

Connection(Properties properties, BrokerSelector brokerSelector, PinotClientTransport transport) {
Connection(Properties properties, BrokerSelector brokerSelector, PinotClientTransport<?> transport) {
_brokerSelector = brokerSelector;
_transport = transport;

Expand Down Expand Up @@ -149,7 +146,7 @@ public ResultSetGroup execute(@Nullable String tableName, Request request)
* @return A future containing the result of the query
* @throws PinotClientException If an exception occurs while processing the query
*/
public Future<ResultSetGroup> executeAsync(String query)
public CompletableFuture<ResultSetGroup> executeAsync(String query)
throws PinotClientException {
return executeAsync(null, query);
}
Expand All @@ -162,7 +159,7 @@ public Future<ResultSetGroup> executeAsync(String query)
* @throws PinotClientException If an exception occurs while processing the query
*/
@Deprecated
public Future<ResultSetGroup> executeAsync(Request request)
public CompletableFuture<ResultSetGroup> executeAsync(Request request)
throws PinotClientException {
return executeAsync(null, request.getQuery());
}
Expand All @@ -174,11 +171,14 @@ public Future<ResultSetGroup> executeAsync(Request request)
* @return A future containing the result of the query
* @throws PinotClientException If an exception occurs while processing the query
*/
public Future<ResultSetGroup> executeAsync(@Nullable String tableName, String query)
public CompletableFuture<ResultSetGroup> executeAsync(@Nullable String tableName, String query)
throws PinotClientException {
String[] tableNames = (tableName == null) ? resolveTableName(query) : new String[]{tableName};
String brokerHostPort = _brokerSelector.selectBroker(tableNames);
return new ResultSetGroupFuture(_transport.executeQueryAsync(brokerHostPort, query));
if (brokerHostPort == null) {
throw new PinotClientException("Could not find broker to query for statement: " + query);
}
return _transport.executeQueryAsync(brokerHostPort, query).thenApply(ResultSetGroup::new);
}

/**
Expand Down Expand Up @@ -218,43 +218,13 @@ public void close()
_brokerSelector.close();
}

private static class ResultSetGroupFuture implements Future<ResultSetGroup> {
private final Future<BrokerResponse> _responseFuture;

public ResultSetGroupFuture(Future<BrokerResponse> responseFuture) {
_responseFuture = responseFuture;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return _responseFuture.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return _responseFuture.isCancelled();
}

@Override
public boolean isDone() {
return _responseFuture.isDone();
}

@Override
public ResultSetGroup get()
throws InterruptedException, ExecutionException {
try {
return get(60000L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new ExecutionException(e);
}
}

@Override
public ResultSetGroup get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
BrokerResponse response = _responseFuture.get(timeout, unit);
return new ResultSetGroup(response);
}
/**
* Provides access to the underlying transport mechanism for this connection.
* There may be client metrics useful for monitoring and other observability goals.
*
* @return pinot client transport.
*/
public PinotClientTransport<?> getTransport() {
return _transport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -28,8 +29,8 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
Expand All @@ -39,17 +40,17 @@
import org.apache.pinot.spi.utils.JsonUtils;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ClientStats;
import org.asynchttpclient.DefaultAsyncHttpClientConfig.Builder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* JSON encoded Pinot client transport over AsyncHttpClient.
*/
public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport {
public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport<ClientStats> {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
private static final String DEFAULT_EXTRA_QUERY_OPTION_STRING = "groupByMode=sql;responseFormat=sql";
Expand All @@ -66,7 +67,7 @@ public JsonAsyncHttpPinotClientTransport() {
_headers = new HashMap<>();
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
_httpClient = Dsl.asyncHttpClient();
_httpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionString,
Expand All @@ -82,7 +83,8 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
builder.setSslContext(new JdkSslContext(sslContext, true, ClientAuth.OPTIONAL));
}

builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
builder.setRequestTimeout(_brokerReadTimeout)
.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", appId))
Expand All @@ -103,7 +105,8 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
builder.setSslContext(sslContext);
}

builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
builder.setRequestTimeout(_brokerReadTimeout)
.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", appId))
Expand All @@ -122,7 +125,7 @@ public BrokerResponse executeQuery(String brokerAddress, String query)
}

@Override
public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query) {
public CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress, String query) {
try {
ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("sql", query);
Expand All @@ -134,11 +137,23 @@ public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String que
if (_headers != null) {
_headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
}
LOGGER.debug("Sending query {} to {}", query, url);
return requestBuilder.addHeader("Content-Type", "application/json; charset=utf-8").setBody(json.toString())
.execute().toCompletableFuture().thenApply(httpResponse -> {
LOGGER.debug("Completed query, HTTP status is {}", httpResponse.getStatusCode());

if (httpResponse.getStatusCode() != 200) {
throw new PinotClientException(
"Pinot returned HTTP status " + httpResponse.getStatusCode() + ", expected 200");
}

Future<Response> response =
requestBuilder.addHeader("Content-Type", "application/json; charset=utf-8").setBody(json.toString())
.execute();
return new BrokerResponseFuture(response, query, url, _brokerReadTimeout);
String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
try {
return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
} catch (JsonProcessingException e) {
throw new CompletionException(e);
}
});
} catch (Exception e) {
throw new PinotClientException(e);
}
Expand All @@ -155,7 +170,7 @@ public BrokerResponse executeQuery(String brokerAddress, Request request)
}

@Override
public Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
public CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
throws PinotClientException {
return executeQueryAsync(brokerAddress, request.getQuery());
}
Expand All @@ -173,60 +188,8 @@ public void close()
}
}

private static class BrokerResponseFuture implements Future<BrokerResponse> {
private final Future<Response> _response;
private final String _query;
private final String _url;
private final long _brokerReadTimeout;

public BrokerResponseFuture(Future<Response> response, String query, String url, long brokerReadTimeout) {
_response = response;
_query = query;
_url = url;
_brokerReadTimeout = brokerReadTimeout;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return _response.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return _response.isCancelled();
}

@Override
public boolean isDone() {
return _response.isDone();
}

@Override
public BrokerResponse get()
throws ExecutionException {
return get(_brokerReadTimeout, TimeUnit.MILLISECONDS);
}

@Override
public BrokerResponse get(long timeout, TimeUnit unit)
throws ExecutionException {
try {
LOGGER.debug("Sending query {} to {}", _query, _url);

Response httpResponse = _response.get(timeout, unit);

LOGGER.debug("Completed query, HTTP status is {}", httpResponse.getStatusCode());

if (httpResponse.getStatusCode() != 200) {
throw new PinotClientException(
"Pinot returned HTTP status " + httpResponse.getStatusCode() + ", expected 200");
}

String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
} catch (Exception e) {
throw new ExecutionException(e);
}
}
@Override
public ClientStats getClientMetrics() {
return _httpClient.getClientStats();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,38 @@
*/
package org.apache.pinot.client;

import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;


/**
* Interface for plugging different client transports.
*/
public interface PinotClientTransport {
public interface PinotClientTransport<METRICS> {

BrokerResponse executeQuery(String brokerAddress, String query)
throws PinotClientException;

Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query)
CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress, String query)
throws PinotClientException;

@Deprecated
BrokerResponse executeQuery(String brokerAddress, Request request)
throws PinotClientException;

@Deprecated
Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
throws PinotClientException;

void close()
throws PinotClientException;

/**
* Access to the client metrics implementation if any.
* This may be useful for observability into the client implementation.
*
* @return underlying client metrics if any
*/
default METRICS getClientMetrics() {
throw new UnsupportedOperationException("No useful client metrics available");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pinot.client;

import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;


/**
Expand Down Expand Up @@ -72,7 +72,7 @@ public ResultSetGroup execute() {
*
* @return The query results
*/
public Future<ResultSetGroup> executeAsync() {
public CompletableFuture<ResultSetGroup> executeAsync() {
return _connection.executeAsync(fillStatementWithParameters());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ public void testBrokerListWithHeaders() {
Assert.assertEquals(connection.getBrokerList(), brokers);
}

@Test
public void testConnectionTransport() {
// Create properties
Properties properties = new Properties();
properties.setProperty("brokerList", "127.0.0.1:1234,localhost:2345");

// Create the connection
Connection connection = ConnectionFactory.fromProperties(properties);

Assert.assertNotNull(connection.getTransport());
Assert.assertNotNull(connection.getTransport().getClientMetrics());
}

// For testing DynamicBrokerSelector

/**
Expand Down
Loading