Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Pinot Connector Libraries to 0.8.0 #9098

Merged
merged 5 commits into from
Nov 5, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use JsonCodec when submitting broker request in Pinot
  • Loading branch information
elonazoulay committed Nov 4, 2021
commit 2f0275eacf6dbcd9c32696e9d7270e975219faa8
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@

import javax.inject.Inject;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand All @@ -73,6 +71,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.airlift.json.JsonCodec.listJsonCodec;
import static io.airlift.json.JsonCodec.mapJsonCodec;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
Expand All @@ -91,7 +90,7 @@ public class PinotClient
private static final Pattern BROKER_PATTERN = Pattern.compile("Broker_(.*)_(\\d+)");
private static final String TIME_BOUNDARY_NOT_FOUND_ERROR_CODE = "404";
private static final JsonCodec<Map<String, Map<String, List<String>>>> ROUTING_TABLE_CODEC = mapJsonCodec(String.class, mapJsonCodec(String.class, listJsonCodec(String.class)));
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final JsonCodec<QueryRequest> QUERY_REQUEST_JSON_CODEC = jsonCodec(QueryRequest.class);

private static final String GET_ALL_TABLES_API_TEMPLATE = "tables";
private static final String TABLE_INSTANCES_API_TEMPLATE = "tables/%s/instances";
Expand Down Expand Up @@ -377,6 +376,23 @@ public TimeBoundary getTimeBoundaryForTable(String table)
}
}

public static class QueryRequest
{
private final String sql;

@JsonCreator
public QueryRequest(@JsonProperty String sql)
{
this.sql = requireNonNull(sql, "sql is null");
}

@JsonProperty
public String getSql()
{
return sql;
}
}

public interface BrokerResultRow
{
Object getField(int index);
Expand Down Expand Up @@ -426,12 +442,13 @@ protected BrokerResultRow computeNext()

private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, PinotQueryInfo query)
{
String queryRequest = QUERY_REQUEST_JSON_CODEC.toJson(new QueryRequest(query.getQuery()));
return doWithRetries(PinotSessionProperties.getPinotRetryCount(session), retryNumber -> {
String queryHost = getBrokerHost(query.getTable());
LOG.info("Query '%s' on broker host '%s'", queryHost, query.getQuery());
Request.Builder builder = Request.Builder.preparePost()
.setUri(URI.create(format(QUERY_URL_TEMPLATE, queryHost)));
BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(buildRequest(query.getQuery())), brokerResponseCodec);
BrokerResponseNative response = doHttpActionWithHeadersJson(builder, Optional.of(queryRequest), brokerResponseCodec);

if (response.getExceptionsSize() > 0 && response.getProcessingExceptions() != null && !response.getProcessingExceptions().isEmpty()) {
// Pinot is known to return exceptions with benign errorcodes like 200
Expand All @@ -449,16 +466,6 @@ private BrokerResponseNative submitBrokerQueryJson(ConnectorSession session, Pin
});
}

private static String buildRequest(String sql)
{
try {
return OBJECT_MAPPER.writeValueAsString(ImmutableMap.of("sql", sql));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* columnIndices: column name -> column index from column handles
* indiceToGroupByFunction<Int,String> (groupByFunctions): aggregationIndex -> groupByFunctionName(columnName)
Expand Down