Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import nl.altindag.ssl.SSLFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants.ChangeType;
Expand Down Expand Up @@ -77,16 +79,17 @@
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.tls.PinotInsecureMode;
import org.apache.pinot.common.utils.tls.RenewableTlsUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.common.version.PinotVersion;
import org.apache.pinot.core.instance.context.BrokerContext;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.utils.rewriter.ResultRewriterFactory;
import org.apache.pinot.core.routing.MultiClusterRoutingContext;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
import org.apache.pinot.query.runtime.context.BrokerContext;
import org.apache.pinot.query.runtime.operator.factory.DefaultQueryOperatorFactoryProvider;
import org.apache.pinot.query.runtime.operator.factory.QueryOperatorFactoryProvider;
import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
Expand Down Expand Up @@ -399,6 +402,19 @@ public void start()
TlsConfig tlsDefaults = null;
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
SSLFactory sslFactory =
RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(tlsDefaults,
PinotInsecureMode::isPinotInInsecureMode);
SSLContext sslContext = sslFactory.getSslContext();
BrokerContext brokerContext = BrokerContext.getInstance();
if (brokerContext.getClientHttpsContext() != null) {
LOGGER.warn("Overriding broker client HTTPS context during startup");
}
brokerContext.setClientHttpsContext(sslContext);
if (brokerContext.getServerHttpsContext() != null) {
LOGGER.warn("Overriding broker server HTTPS context during startup");
}
brokerContext.setServerHttpsContext(sslContext);
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,19 @@ static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
SSLFactory sslFactory =
RenewableTlsUtils.createSSLFactoryAndEnableAutoRenewalWhenUsingFileStores(tlsConfig);
// since tlsConfig.getKeyStorePath() is not null, sslFactory.getKeyManagerFactory().get() should not be null
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(sslFactory.getKeyManagerFactory().get())
.sslProvider(SslProvider.valueOf(tlsConfig.getSslProvider()));
SslProvider sslProvider = SslProvider.valueOf(tlsConfig.getSslProvider());
if (sslProvider != SslProvider.JDK) {
LOGGER.warn("Configured SSL provider is {}. For FIPS/BCJSSE environments use JDK provider.", sslProvider);
}
SslContextBuilder sslContextBuilder =
SslContextBuilder.forServer(sslFactory.getKeyManagerFactory().get()).sslProvider(sslProvider);
sslFactory.getTrustManagerFactory().ifPresent(sslContextBuilder::trustManager);

if (tlsConfig.isClientAuthEnabled()) {
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
}

return GrpcSslContexts.configure(sslContextBuilder).build();
return GrpcSslContexts.configure(sslContextBuilder, sslProvider).build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.pinot.client;

import javax.annotation.Nullable;

/**
* Shared implementation between the different ResultSets.
*/
public abstract class AbstractResultSet implements ResultSet {

@Override
@Nullable
public String getColumnDataType(int columnIndex) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.pinot.client.utils.BrokerSelectorUtils;
import org.apache.pinot.client.utils.ConnectionUtils;
Expand Down Expand Up @@ -211,6 +212,7 @@ protected void updateBrokerData()
_brokerData = getBrokerData(responses);
}

@Nullable
public String getBroker(String... tableNames) {
// If tableNames is not-null, filter out nulls
tableNames = tableNames == null ? tableNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.client;

import com.fasterxml.jackson.databind.JsonNode;
import javax.annotation.Nullable;


/**
Expand Down Expand Up @@ -51,18 +52,22 @@ public boolean hasExceptions() {
return _exceptions != null && !_exceptions.isEmpty();
}

@Nullable
public JsonNode getExceptions() {
return _exceptions;
}

@Nullable
public JsonNode getAggregationResults() {
return _aggregationResults;
}

@Nullable
public JsonNode getSelectionResults() {
return _selectionResults;
}

@Nullable
public JsonNode getResultTable() {
return _resultTable;
}
Expand Down Expand Up @@ -104,6 +109,7 @@ private static String getTextOrDefault(JsonNode node, String fieldName, String d
return (valueNode != null && !valueNode.isNull()) ? valueNode.asText() : defaultValue;
}

@Nullable
private static JsonNode getJsonNodeOrNull(JsonNode node, String fieldName) {
if (node == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.client;

import com.fasterxml.jackson.databind.JsonNode;
import javax.annotation.Nullable;

/**
* Extended BrokerResponse with cursor-specific fields for cursor pagination queries.
Expand Down Expand Up @@ -64,47 +65,58 @@ private CursorAwareBrokerResponse(JsonNode brokerResponse) {


// Cursor-specific field getters
@Nullable
public Long getOffset() {
return _offset;
}

@Nullable
public Integer getNumRows() {
return _numRows;
}

@Nullable
public Long getNumRowsResultSet() {
return _numRowsResultSet;
}

@Nullable
public Long getCursorResultWriteTimeMs() {
return _cursorResultWriteTimeMs;
}

@Nullable
public Long getSubmissionTimeMs() {
return _submissionTimeMs;
}

@Nullable
public Long getExpirationTimeMs() {
return _expirationTimeMs;
}

@Nullable
public String getBrokerHost() {
return _brokerHost;
}

@Nullable
public Integer getBrokerPort() {
return _brokerPort;
}

@Nullable
public Long getBytesWritten() {
return _bytesWritten;
}

@Nullable
public Long getCursorFetchTimeMs() {
return _cursorFetchTimeMs;
}

// Helper methods for extracting values from JsonNode with null checks
@Nullable
private static Long getLongOrNull(JsonNode node, String fieldName) {
if (node == null) {
return null;
Expand All @@ -113,6 +125,7 @@ private static Long getLongOrNull(JsonNode node, String fieldName) {
return (valueNode != null && !valueNode.isNull()) ? valueNode.asLong() : null;
}

@Nullable
private static Integer getIntOrNull(JsonNode node, String fieldName) {
if (node == null) {
return null;
Expand All @@ -121,6 +134,7 @@ private static Integer getIntOrNull(JsonNode node, String fieldName) {
return (valueNode != null && !valueNode.isNull()) ? valueNode.asInt() : null;
}

@Nullable
private static String getTextOrNull(JsonNode node, String fieldName) {
if (node == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client;

import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;


/**
* Default SSL context provider that uses the JDK/BCJSSE stack and disables OpenSSL.
*/
public class DefaultSslContextProvider implements SslContextProvider {

@Override
public DefaultAsyncHttpClientConfig.Builder configure(DefaultAsyncHttpClientConfig.Builder builder,
@Nullable SSLContext sslContext, TlsProtocols tlsProtocols) {
builder.setUseOpenSsl(false);

List<String> enabledProtocolList = Collections.emptyList();
if (tlsProtocols != null) {
List<String> configuredProtocols = tlsProtocols.getEnabledProtocols();
if (configuredProtocols != null) {
enabledProtocolList = configuredProtocols;
}
}
String[] enabledProtocols = enabledProtocolList.toArray(new String[0]);
if (sslContext != null) {
builder.setSslEngineFactory((config, peerHost, peerPort) -> {
SSLEngine engine = sslContext.createSSLEngine(peerHost, peerPort);
engine.setUseClientMode(true);
if (enabledProtocols.length > 0) {
engine.setEnabledProtocols(enabledProtocols);
}
return engine;
});
}
if (enabledProtocols.length > 0) {
builder.setEnabledProtocols(enabledProtocols);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -60,13 +58,17 @@ public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport<C
private final AsyncHttpClient _httpClient;
private final String _extraOptionStr;
private final boolean _useMultistageEngine;
private static final SslContextProvider SSL_CONTEXT_PROVIDER = SslContextProviderFactory.create();

public JsonAsyncHttpPinotClientTransport() {
_brokerReadTimeout = 60000;
_headers = new HashMap<>();
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
_httpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(Duration.ofMillis(_brokerReadTimeout)));
Builder builder = Dsl.config();
SSL_CONTEXT_PROVIDER.configure(builder, null, TlsProtocols.defaultProtocols(false));
_httpClient =
Dsl.asyncHttpClient(builder.setRequestTimeout(Duration.ofMillis(_brokerReadTimeout)).build());
_useMultistageEngine = false;
}

Expand All @@ -80,16 +82,12 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
_useMultistageEngine = useMultistageEngine;

Builder builder = Dsl.config();
if (sslContext != null) {
builder.setSslContext(new JdkSslContext(sslContext, true, ClientAuth.OPTIONAL));
}

SSL_CONTEXT_PROVIDER.configure(builder, sslContext, tlsProtocols);
builder.setRequestTimeout(Duration.ofMillis(_brokerReadTimeout))
.setReadTimeout(Duration.ofMillis(connectionTimeouts.getReadTimeoutMs()))
.setConnectTimeout(Duration.ofMillis(connectionTimeouts.getConnectTimeoutMs()))
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", appId))
.setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", appId));
_httpClient = Dsl.asyncHttpClient(builder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.client;

import javax.annotation.Nullable;

/**
* A Pinot result group, representing an aggregation function in the original query.
*/
Expand Down Expand Up @@ -50,6 +52,7 @@ public interface ResultSet {
* @param columnIndex The index of the column for which to retrieve the name
* @return The data type of the column at the given column index. null if data type is not supported
*/
@Nullable
String getColumnDataType(int columnIndex);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client;

import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;


/**
* Pluggable provider for configuring AsyncHttpClient SSL/TLS.
*
* A custom provider can be supplied using the system property
* {@code pinot.client.sslContextProvider} (fully qualified class name)
* or via the Java service loader (META-INF/services).
*/
public interface SslContextProvider {

/**
* Configure the AsyncHttpClient builder with SSL/TLS settings.
*
* @param builder the client config builder to update
* @param sslContext optional SSL context to use
* @param tlsProtocols configured TLS protocol list
* @return the same builder for chaining
*/
DefaultAsyncHttpClientConfig.Builder configure(DefaultAsyncHttpClientConfig.Builder builder,
@Nullable SSLContext sslContext, TlsProtocols tlsProtocols);
}
Loading
Loading