Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ public void start()
.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
LOGGER.info("Starting Grpc BrokerRequestHandler.");
_brokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, null);
new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null);
} else { // default request handler type, e.g. netty
LOGGER.info("Starting Netty BrokerRequestHandler.");
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void start()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
new BrokerUserDefinedMessageHandlerFactory(_routingManager, queryQuotaManager));
_participantHelixManager.connect();
updateInstanceConfigIfNeeded();
updateInstanceConfigAndBrokerResourceIfNeeded();
_brokerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME,
() -> _participantHelixManager.isConnected() ? 1L : 0L);
_participantHelixManager.addPreConnectCallback(
Expand All @@ -323,19 +323,35 @@ public void start()
LOGGER.info("Finish starting Pinot broker");
}

private void updateInstanceConfigIfNeeded() {
private void updateInstanceConfigAndBrokerResourceIfNeeded() {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_participantHelixManager, _instanceId);
boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
updated |= HelixHelper.addDefaultTags(instanceConfig, () -> {
boolean instanceConfigUpdated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port);
boolean shouldUpdateBrokerResource = false;
String brokerTag = null;
List<String> instanceTags = instanceConfig.getTags();
if (instanceTags.isEmpty()) {
// This is a new broker (first time joining the cluster)
if (ZKMetadataProvider.getClusterTenantIsolationEnabled(_propertyStore)) {
return Collections.singletonList(TagNameUtils.getBrokerTagForTenant(null));
brokerTag = TagNameUtils.getBrokerTagForTenant(null);
shouldUpdateBrokerResource = true;
} else {
return Collections.singletonList(Helix.UNTAGGED_BROKER_INSTANCE);
brokerTag = Helix.UNTAGGED_BROKER_INSTANCE;
}
});
if (updated) {
instanceConfig.addTag(brokerTag);
instanceConfigUpdated = true;
}
if (instanceConfigUpdated) {
HelixHelper.updateInstanceConfig(_participantHelixManager, instanceConfig);
}
if (shouldUpdateBrokerResource) {
// Update broker resource to include the new broker
long startTimeMs = System.currentTimeMillis();
List<String> tablesAdded = new ArrayList<>();
HelixHelper.updateBrokerResource(_participantHelixManager, _instanceId, Collections.singletonList(brokerTag),
tablesAdded, null);
LOGGER.info("Updated broker resource for new joining broker: {} in {}ms, tables added: {}", _instanceId,
System.currentTimeMillis() - startTimeMs, tablesAdded);
}
}

/**
Expand Down Expand Up @@ -365,8 +381,7 @@ private void registerServiceStatusHandler() {
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup),
new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_participantHelixManager,
_clusterName, _instanceId, resourcesToMonitor, minResourcePercentForStartup),
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown)
)));
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ private ZKMetadataProvider() {

public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
ZNRecord znRecord) {
propertyStore
.set(constructPropertyStorePathForResourceConfig(realtimeTableName), znRecord, AccessOption.PERSISTENT);
propertyStore.set(constructPropertyStorePathForResourceConfig(realtimeTableName), znRecord,
AccessOption.PERSISTENT);
}

public static void setOfflineTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String offlineTableName,
Expand All @@ -79,8 +79,8 @@ public static void setInstanceZKMetadata(ZkHelixPropertyStore<ZNRecord> property

public static InstanceZKMetadata getInstanceZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String instanceId) {
ZNRecord znRecord = propertyStore
.get(StringUtil.join("/", PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX, instanceId), null, AccessOption.PERSISTENT);
ZNRecord znRecord = propertyStore.get(StringUtil.join("/", PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX, instanceId), null,
AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
Expand Down Expand Up @@ -121,8 +121,8 @@ public static String constructPropertyStorePathForMinionTaskMetadata(String task

public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord> propertyStore, String resourceNameForResource,
String segmentName) {
return propertyStore
.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName), AccessOption.PERSISTENT);
return propertyStore.exists(constructPropertyStorePathForSegment(resourceNameForResource, segmentName),
AccessOption.PERSISTENT);
}

public static void removeResourceSegmentsFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore,
Expand Down Expand Up @@ -153,9 +153,9 @@ public static void removeResourceConfigFromPropertyStore(ZkHelixPropertyStore<ZN
public static boolean createSegmentZkMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
SegmentZKMetadata segmentZKMetadata) {
try {
return propertyStore
.create(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT);
return propertyStore.create(
constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), AccessOption.PERSISTENT);
} catch (Exception e) {
LOGGER.error("Caught exception while creating segmentZkMetadata for table: {}", tableNameWithType, e);
return false;
Expand All @@ -166,9 +166,9 @@ public static boolean setSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> proper
SegmentZKMetadata segmentZKMetadata, int expectedVersion) {
// NOTE: Helix will throw ZkBadVersionException if version does not match
try {
return propertyStore
.set(constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
return propertyStore.set(
constructPropertyStorePathForSegment(tableNameWithType, segmentZKMetadata.getSegmentName()),
segmentZKMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT);
} catch (ZkBadVersionException e) {
return false;
}
Expand All @@ -194,25 +194,15 @@ public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
@Nullable
public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String tableNameWithType, String segmentName) {
ZNRecord znRecord = propertyStore
.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), null, AccessOption.PERSISTENT);
ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForSegment(tableNameWithType, segmentName), null,
AccessOption.PERSISTENT);
return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
}

@Nullable
public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType) {
ZNRecord znRecord = propertyStore
.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null, AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
try {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
return (TableConfig) ConfigUtils.applyConfigWithEnvVariables(tableConfig);
} catch (Exception e) {
LOGGER.error("Caught exception while getting table configuration for table: {}", tableNameWithType, e);
return null;
}
return toTableConfig(propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null,
AccessOption.PERSISTENT));
}

@Nullable
Expand All @@ -225,6 +215,43 @@ public static TableConfig getRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord>
return getTableConfig(propertyStore, TableNameBuilder.REALTIME.tableNameWithType(tableName));
}

public static List<TableConfig> getAllTableConfigs(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecords =
propertyStore.getChildren(PROPERTYSTORE_TABLE_CONFIGS_PREFIX, null, AccessOption.PERSISTENT,
CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
if (znRecords != null) {
int numZNRecords = znRecords.size();
List<TableConfig> tableConfigs = new ArrayList<>(numZNRecords);
for (ZNRecord znRecord : znRecords) {
TableConfig tableConfig = toTableConfig(znRecord);
if (tableConfig != null) {
tableConfigs.add(tableConfig);
}
}
if (numZNRecords > tableConfigs.size()) {
LOGGER.warn("Failed to read {}/{} table configs", numZNRecords - tableConfigs.size(), numZNRecords);
}
return tableConfigs;
} else {
LOGGER.warn("Path: {} does not exist", PROPERTYSTORE_TABLE_CONFIGS_PREFIX);
return Collections.emptyList();
}
}

@Nullable
private static TableConfig toTableConfig(@Nullable ZNRecord znRecord) {
if (znRecord == null) {
return null;
}
try {
TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord);
return ConfigUtils.applyConfigWithEnvVariables(tableConfig);
} catch (Exception e) {
LOGGER.error("Caught exception while creating table config from ZNRecord: {}", znRecord.getId(), e);
return null;
}
}

public static void setSchema(ZkHelixPropertyStore<ZNRecord> propertyStore, Schema schema) {
propertyStore.set(constructPropertyStorePathForSchema(schema.getSchemaName()), SchemaUtils.toZNRecord(schema),
AccessOption.PERSISTENT);
Expand Down Expand Up @@ -296,8 +323,8 @@ public static Schema getTableSchema(ZkHelixPropertyStore<ZNRecord> propertyStore
public static List<SegmentZKMetadata> getSegmentsZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore,
String tableNameWithType) {
String parentPath = constructPropertyStorePathForResource(tableNameWithType);
List<ZNRecord> znRecords = propertyStore
.getChildren(parentPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT,
List<ZNRecord> znRecords =
propertyStore.getChildren(parentPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT,
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
if (znRecords != null) {
int numZNRecords = znRecords.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -45,9 +46,12 @@
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
Expand Down Expand Up @@ -185,6 +189,52 @@ public static void updateIdealState(final HelixManager helixManager, final Strin
updateIdealState(helixManager, resourceName, updater, policy, false);
}

/**
* Updates broker resource ideal state for the given broker with the given broker tags. Optional {@code tablesAdded}
* and {@code tablesRemoved} can be provided to track the tables added/removed during the update.
*/
public static void updateBrokerResource(HelixManager helixManager, String brokerId, List<String> brokerTags,
@Nullable List<String> tablesAdded, @Nullable List<String> tablesRemoved) {
Preconditions.checkArgument(brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE),
"Invalid broker id: %s", brokerId);
for (String brokerTag : brokerTags) {
Preconditions.checkArgument(TagNameUtils.isBrokerTag(brokerTag), "Invalid broker tag: %s", brokerTag);
}

Set<String> tablesForBrokerTag;
int numBrokerTags = brokerTags.size();
if (numBrokerTags == 0) {
tablesForBrokerTag = Collections.emptySet();
} else if (numBrokerTags == 1) {
tablesForBrokerTag = getTablesForBrokerTag(helixManager, brokerTags.get(0));
} else {
tablesForBrokerTag = getTablesForBrokerTags(helixManager, brokerTags);
}

updateIdealState(helixManager, BROKER_RESOURCE, idealState -> {
if (tablesAdded != null) {
tablesAdded.clear();
}
if (tablesRemoved != null) {
tablesRemoved.clear();
}
for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
String tableNameWithType = entry.getKey();
Map<String, String> brokerAssignment = entry.getValue();
if (tablesForBrokerTag.contains(tableNameWithType)) {
if (brokerAssignment.put(brokerId, BrokerResourceStateModel.ONLINE) == null && tablesAdded != null) {
tablesAdded.add(tableNameWithType);
}
} else {
if (brokerAssignment.remove(brokerId) != null && tablesRemoved != null) {
tablesRemoved.add(tableNameWithType);
}
}
}
return idealState;
});
}

/**
* Returns all instances for the given cluster.
*
Expand Down Expand Up @@ -320,8 +370,8 @@ public IdealState apply(IdealState idealState) {

// Removing partitions from ideal state
LOGGER.info("Trying to remove resource {} from idealstate", resourceTag);
HelixHelper
.updateIdealState(helixManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, updater, DEFAULT_RETRY_POLICY);
HelixHelper.updateIdealState(helixManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, updater,
DEFAULT_RETRY_POLICY);
}

/**
Expand Down Expand Up @@ -495,12 +545,12 @@ public static Set<String> getServerInstancesForTenantWithType(List<InstanceConfi
TableType tableType) {
Set<String> serverInstancesWithType = new HashSet<>();
if (tableType == null || tableType == TableType.OFFLINE) {
serverInstancesWithType
.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getOfflineTagForTenant(tenant)));
serverInstancesWithType.addAll(
HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getOfflineTagForTenant(tenant)));
}
if (tableType == null || tableType == TableType.REALTIME) {
serverInstancesWithType
.addAll(HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getRealtimeTagForTenant(tenant)));
serverInstancesWithType.addAll(
HelixHelper.getInstancesWithTag(instanceConfigs, TagNameUtils.getRealtimeTagForTenant(tenant)));
}
return serverInstancesWithType;
}
Expand All @@ -519,6 +569,28 @@ public static Set<InstanceConfig> getBrokerInstanceConfigsForTenant(List<Instanc
return new HashSet<>(getInstancesConfigsWithTag(instanceConfigs, TagNameUtils.getBrokerTagForTenant(tenant)));
}

public static Set<String> getTablesForBrokerTag(HelixManager helixManager, String brokerTag) {
Set<String> tablesForBrokerTag = new HashSet<>();
List<TableConfig> tableConfigs = ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
for (TableConfig tableConfig : tableConfigs) {
if (TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()).equals(brokerTag)) {
tablesForBrokerTag.add(tableConfig.getTableName());
}
}
return tablesForBrokerTag;
}

public static Set<String> getTablesForBrokerTags(HelixManager helixManager, List<String> brokerTags) {
Set<String> tablesForBrokerTags = new HashSet<>();
List<TableConfig> tableConfigs = ZKMetadataProvider.getAllTableConfigs(helixManager.getHelixPropertyStore());
for (TableConfig tableConfig : tableConfigs) {
if (brokerTags.contains(TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()))) {
tablesForBrokerTags.add(tableConfig.getTableName());
}
}
return tablesForBrokerTags;
}

/**
* Returns the instance config for a specific instance.
*/
Expand All @@ -535,9 +607,9 @@ public static void updateInstanceConfig(HelixManager helixManager, InstanceConfi
// NOTE: Use HelixDataAccessor.setProperty() instead of HelixAdmin.setInstanceConfig() because the latter explicitly
// forbids instance host/port modification
HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
Preconditions.checkState(helixDataAccessor
.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceConfig.getId()), instanceConfig),
"Failed to update instance config for instance: " + instanceConfig.getId());
Preconditions.checkState(
helixDataAccessor.setProperty(helixDataAccessor.keyBuilder().instanceConfig(instanceConfig.getId()),
instanceConfig), "Failed to update instance config for instance: " + instanceConfig.getId());
}

/**
Expand Down
Loading