Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2133,6 +2133,7 @@ static void validateRequest(BrokerRequest brokerRequest, int queryResponseLimit)
* <ul>
* <li>Value for 'LIMIT' <= configured value</li>
* <li>Query options must be set to SQL mode</li>
* <li>Check if numReplicaGroupsToQuery option provided is valid</li>
* </ul>
*/
@VisibleForTesting
Expand All @@ -2151,6 +2152,18 @@ static void validateRequest(PinotQuery pinotQuery, int queryResponseLimit) {
|| !QueryOptionsUtils.isResponseFormatSQL(queryOptions)) {
throw new IllegalStateException("SQL query should always have response format and group-by mode set to SQL");
}
try {
// throw errors if options is less than 1 or invalid
Integer numReplicaGroupsToQuery = QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions);
if (numReplicaGroupsToQuery != null) {
Preconditions.checkState(numReplicaGroupsToQuery > 0, "numReplicaGroups must be "
+ "positive number, got: %d", numReplicaGroupsToQuery);
}
} catch (NumberFormatException ex) {
String numReplicaGroupsToQuery = queryOptions.get(Broker.Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
throw new IllegalStateException(String.format("numReplicaGroups must be a positive number, got: %s",
numReplicaGroupsToQuery));
}

if (pinotQuery.getDataSource().getSubquery() != null) {
validateRequest(pinotQuery.getDataSource().getSubquery(), queryResponseLimit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public BalancedInstanceSelector(String tableNameWithType, BrokerMetrics brokerMe

@Override
Map<String, String> select(List<String> segments, int requestId,
Map<String, List<String>> segmentToEnabledInstancesMap) {
Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
for (String segment : segments) {
List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@ private List<String> calculateEnabledInstancesForSegment(String segment, List<St
@Override
public SelectionResult select(BrokerRequest brokerRequest, List<String> segments) {
int requestId = (int) (_requestId.getAndIncrement() % MAX_REQUEST_ID);
Map<String, String> segmentToInstanceMap = select(segments, requestId, _segmentToEnabledInstancesMap);
Map<String, String> queryOptions = (brokerRequest.getPinotQuery() != null
&& brokerRequest.getPinotQuery().getQueryOptions() != null)
? brokerRequest.getPinotQuery().getQueryOptions()
: Collections.emptyMap();
Map<String, String> segmentToInstanceMap = select(segments, requestId, _segmentToEnabledInstancesMap, queryOptions);
Set<String> unavailableSegments = _unavailableSegments;
if (unavailableSegments.isEmpty()) {
return new SelectionResult(segmentToInstanceMap, Collections.emptyList());
Expand All @@ -287,5 +291,5 @@ public SelectionResult select(BrokerRequest brokerRequest, List<String> segments
* ONLINE/CONSUMING instances). If enabled instances are not {@code null}, they are sorted in alphabetical order.
*/
abstract Map<String, String> select(List<String> segments, int requestId,
Map<String, List<String>> segmentToEnabledInstancesMap);
Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.util.QueryOptionsUtils;


/**
Expand All @@ -40,6 +41,12 @@
* request (there is no guarantee on choosing servers from the same replica-group though). In transitioning/error
* scenario (external view does not match ideal state), there is no guarantee on picking the least server instances, but
* the traffic is guaranteed to be evenly distributed to all available instances to avoid overwhelming hotspot servers.
*<p> If the query option NUM_REPLICA_GROUPS_TO_QUERY is provided, the servers to be picked will be from different
* replica groups such that segments are evenly distributed amongst the provided value of NUM_REPLICA_GROUPS_TO_QUERY.
* Thus in case of [S1, S2, S3] if NUM_REPLICA_GROUPS_TO_QUERY = 2, the ReplicaGroup S1 and ReplicaGroup S2 will be
* selected such that half the segments will come from S1 and other half from S2. If NUM_REPLICA_GROUPS_TO_QUERY value
* is much greater than available servers, then ReplicaGroupInstanceSelector will behave similar to
* BalancedInstanceSelector.
*/
public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {

Expand All @@ -49,15 +56,23 @@ public ReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brok

@Override
Map<String, String> select(List<String> segments, int requestId,
Map<String, List<String>> segmentToEnabledInstancesMap) {
Map<String, List<String>> segmentToEnabledInstancesMap, Map<String, String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
int replicaOffset = 0;
Integer replicaGroup = QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions);
int numReplicaGroupsToQuery = replicaGroup == null ? 1 : replicaGroup;
for (String segment : segments) {
List<String> enabledInstances = segmentToEnabledInstancesMap.get(segment);
// NOTE: enabledInstances can be null when there is no enabled instances for the segment, or the instance selector
// has not been updated (we update all components for routing in sequence)
if (enabledInstances != null) {
int numEnabledInstances = enabledInstances.size();
segmentToSelectedInstanceMap.put(segment, enabledInstances.get(requestId % numEnabledInstances));
int instanceToSelect = (requestId + replicaOffset) % numEnabledInstances;
segmentToSelectedInstanceMap.put(segment, enabledInstances.get(instanceToSelect));
if (numReplicaGroupsToQuery > numEnabledInstances) {
numReplicaGroupsToQuery = numEnabledInstances;
}
replicaOffset = (replicaOffset + 1) % numReplicaGroupsToQuery;
}
}
return segmentToSelectedInstanceMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ public void testRejectGroovyQuery() {
testRejectGroovyQuery("SELECT foo FROM bar", false);
}

@Test
public void testReplicaGroupToQueryInvalidQuery() {
PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery("SELECT COUNT(*) FROM MY_TABLE "
+ "OPTION(numReplicaGroupsToQuery=illegal)");
Assert.assertThrows(IllegalStateException.class, () -> BaseBrokerRequestHandler.validateRequest(pinotQuery, 10));
}

private void testRejectGroovyQuery(String query, boolean queryContainsGroovy) {
PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);

Expand Down
Loading