Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@ public void init(PinotConfiguration brokerConf)
_port = _listenerConfigs.get(0).getPort();
_tlsPort = ListenerConfigUtil.findLastTlsPort(_listenerConfigs, -1);

_instanceId = _brokerConf.getProperty(Helix.Instance.INSTANCE_ID_KEY);
if (_instanceId != null) {
// NOTE: Force all instances to have the same prefix in order to derive the instance type based on the instance id
Preconditions.checkState(InstanceTypeUtils.isBroker(_instanceId), "Instance id must have prefix '%s', got '%s'",
Helix.PREFIX_OF_BROKER_INSTANCE, _instanceId);
} else {
_instanceId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID);
if (_instanceId == null) {
_instanceId = _brokerConf.getProperty(Helix.Instance.INSTANCE_ID_KEY);
}
if (_instanceId == null) {
_instanceId = Helix.PREFIX_OF_BROKER_INSTANCE + _hostname + "_" + _port;
}
// NOTE: Force all instances to have the same prefix in order to derive the instance type based on the instance id
Preconditions.checkState(InstanceTypeUtils.isBroker(_instanceId), "Instance id must have prefix '%s', got '%s'",
Helix.PREFIX_OF_BROKER_INSTANCE, _instanceId);

_brokerConf.setProperty(Broker.CONFIG_OF_BROKER_ID, _instanceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.testng.annotations.Test;

import static org.apache.pinot.spi.utils.CommonConstants.Broker.CONFIG_OF_BROKER_HOSTNAME;
import static org.apache.pinot.spi.utils.CommonConstants.Broker.CONFIG_OF_BROKER_ID;
import static org.apache.pinot.spi.utils.CommonConstants.Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME;
import static org.apache.pinot.spi.utils.CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER;
Expand Down Expand Up @@ -110,6 +111,33 @@ public void testDefaultInstanceId()
brokerStarter.stop();
}

@Test
public void testInstanceIdPrecedence()
throws Exception {
// Ensures that pinot.broker.instance.id has higher precedence compared to instanceId
Map<String, Object> properties = new HashMap<>();
properties.put(CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
properties.put(CONFIG_OF_CLUSTER_NAME, getHelixClusterName());
properties.put(CONFIG_OF_BROKER_ID, "Broker_morePrecedence");
properties.put(INSTANCE_ID_KEY, "Broker_lessPrecedence");
properties.put(CONFIG_OF_BROKER_HOSTNAME, "myHost");
properties.put(KEY_OF_BROKER_QUERY_PORT, 1234);
properties.put(CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);

HelixBrokerStarter brokerStarter = new HelixBrokerStarter();
brokerStarter.init(new PinotConfiguration(properties));
brokerStarter.start();

String instanceId = brokerStarter.getInstanceId();
assertEquals(instanceId, "Broker_morePrecedence");
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixManager, instanceId);
assertEquals(instanceConfig.getInstanceName(), instanceId);
assertEquals(instanceConfig.getHostName(), "myHost");
assertEquals(instanceConfig.getPort(), "1234");

brokerStarter.stop();
}

@AfterClass
public void tearDown() {
stopController();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public int getPort() {
}

public String getInstanceId() {
return getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
String instanceId = getProperty(CommonConstants.Minion.CONFIG_OF_MINION_ID);
return instanceId != null ? instanceId : getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
}

public int getEndReplaceSegmentsTimeoutMs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public enum SegmentAssignmentStrategyType {
}

public static class Instance {
@Deprecated
public static final String INSTANCE_ID_KEY = "instanceId";
public static final String DATA_DIR_KEY = "dataDir";
public static final String ADMIN_PORT_KEY = "adminPort";
Expand Down Expand Up @@ -210,7 +211,7 @@ public static class Broker {
public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d;
public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";
public static final String CONFIG_OF_BROKER_ID = "pinot.broker.instance.id";
public static final String CONFIG_OF_BROKER_HOSTNAME = "pinot.broker.hostname";
public static final String CONFIG_OF_SWAGGER_USE_HTTPS = "pinot.broker.swagger.use.https";
// Configuration to consider the broker ServiceStatus as being STARTED if the percent of resources (tables) that
Expand Down Expand Up @@ -526,6 +527,7 @@ public static class Controller {

public static class Minion {
public static final String CONFIG_OF_METRICS_PREFIX = "pinot.minion.";
public static final String CONFIG_OF_MINION_ID = "pinot.minion.instance.id";
public static final String METADATA_EVENT_OBSERVER_PREFIX = "metadata.event.notifier";

// Config keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private void startBroker()
String brokerInstanceName = "Broker_localhost_" + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;

Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, brokerInstanceName);
properties.put(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerInstanceName);
properties.put(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, BROKER_TIMEOUT_MS);
properties.put(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME, _clusterName);
properties.put(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, _zkAddress);
Expand Down