Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -115,7 +114,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final TableCache _tableCache;
protected final BrokerMetrics _brokerMetrics;

protected final AtomicLong _requestIdGenerator = new AtomicLong();
protected final BrokerRequestIdGenerator _brokerIdGenerator;
protected final QueryOptimizer _queryOptimizer = new QueryOptimizer();

protected final String _brokerId;
Expand All @@ -134,6 +133,7 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
_brokerId = brokerId;
_brokerIdGenerator = new BrokerRequestIdGenerator(brokerId);
_config = config;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
Expand Down Expand Up @@ -231,7 +231,7 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.requesthandler;

import java.util.concurrent.atomic.AtomicLong;

/**
* An ID generator to produce a global unique identifier for each query, used in v1/v2 engine for tracking and
* inter-stage communication(v2 only). It's guaranteed by:
* <ol>
* <li>
* Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
* requestId. This mask becomes the most significant 9 digits (in base-10).
* </li>
* <li>
* Using a auto-incrementing counter for the least significant 9 digits (in base-10).
* </li>
* </ol>
*/
public class BrokerRequestIdGenerator {
private static final long OFFSET = 1_000_000_000L;
private final long _mask;
private final AtomicLong _incrementingId = new AtomicLong(0);

public BrokerRequestIdGenerator(String brokerId) {
_mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
}

public long get() {
long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
return _mask + normalized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -77,7 +76,6 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final MailboxService _mailboxService;
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;
private final MultiStageRequestIdGenerator _multistageRequestIdGenerator;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
Expand Down Expand Up @@ -111,15 +109,13 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId

// TODO: move this to a startUp() function.
_mailboxService.start();

_multistageRequestIdGenerator = new MultiStageRequestIdGenerator(brokerIdFromConfig);
}

@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
long requestId = _multistageRequestIdGenerator.get();
long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());

Expand Down Expand Up @@ -322,34 +318,4 @@ public void shutDown() {
_queryDispatcher.shutdown();
_mailboxService.shutdown();
}

/**
* OpChains in Multistage queries are identified by the requestId and the stage-id. v1 Engine uses an incrementing
* long to generate requestId, so the requestIds are numbered [0, 1, 2, ...]. When running with multiple brokers,
* it could be that two brokers end up generating the same requestId which could lead to weird query errors. This
* requestId generator addresses that by:
* <ol>
* <li>
* Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
* requestId. This mask becomes the most significant 9 digits (in base-10).
* </li>
* <li>
* Using a auto-incrementing counter for the least significant 9 digits (in base-10).
* </li>
* </ol>
*/
static class MultiStageRequestIdGenerator {
private static final long OFFSET = 1_000_000_000L;
private final long _mask;
private final AtomicLong _incrementingId = new AtomicLong(0);

public MultiStageRequestIdGenerator(String brokerId) {
_mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
}

public long get() {
long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
return _mask + normalized;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,18 @@ public void testCancelQuery()
BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
when(routingManager.routingExists(anyString())).thenReturn(true);
RoutingTable rt = mock(RoutingTable.class);
when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), Collections.singletonList("segment01")));
when(rt.getServerInstanceToSegmentsMap()).thenReturn(
Collections.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
Collections.singletonList("segment01")));
when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
when(queryQuotaManager.acquire(anyString())).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
final long[] testRequestId = {-1};
PinotConfiguration config =
new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true"));
BaseBrokerRequestHandler requestHandler =
new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(),
new BaseBrokerRequestHandler(config, "testBrokerId", routingManager, new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
@Override
Expand All @@ -225,6 +227,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
RequestContext requestContext)
throws Exception {
testRequestId[0] = requestId;
latch.await();
return null;
}
Expand All @@ -239,12 +242,12 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
throw new RuntimeException(e);
}
});
TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(1).size() == 1, 500, 5000,
TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(testRequestId[0]).size() == 1, 500, 5000,
"Failed to submit query");
Map.Entry<Long, String> entry = requestHandler.getRunningQueries().entrySet().iterator().next();
Assert.assertEquals(entry.getKey().longValue(), 1);
Assert.assertEquals(entry.getKey().longValue(), testRequestId[0]);
Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE limit 10"));
Set<ServerInstance> servers = requestHandler.getRunningServers(1);
Set<ServerInstance> servers = requestHandler.getRunningServers(testRequestId[0]);
Assert.assertEquals(servers.size(), 1);
Assert.assertEquals(servers.iterator().next().getHostname(), "server01");
Assert.assertEquals(servers.iterator().next().getPort(), 9000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ public void testLiteralOnlyWithAsBrokerRequestFromSQL() {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
null, mock(ServerRoutingStatsManager.class));
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));

long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
Expand All @@ -209,9 +209,9 @@ null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Co
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
null, mock(ServerRoutingStatsManager.class));
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
Expand Down Expand Up @@ -416,9 +416,9 @@ null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Co
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
null, mock(ServerRoutingStatsManager.class));
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));

// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
Expand Down