Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
deepthi912 authored Aug 5, 2024
2 parents 9cb2bc2 + a0f82a1 commit 77699ad
Show file tree
Hide file tree
Showing 27 changed files with 1,115 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected String _instanceId;
private volatile boolean _isStarting = false;
private volatile boolean _isShuttingDown = false;

protected final List<ClusterChangeHandler> _clusterConfigChangeHandlers = new ArrayList<>();
protected final List<ClusterChangeHandler> _idealStateChangeHandlers = new ArrayList<>();
protected final List<ClusterChangeHandler> _externalViewChangeHandlers = new ArrayList<>();
protected final List<ClusterChangeHandler> _instanceConfigChangeHandlers = new ArrayList<>();
Expand Down Expand Up @@ -214,6 +216,15 @@ public void addInstanceConfigChangeHandler(ClusterChangeHandler instanceConfigCh
_instanceConfigChangeHandlers.add(instanceConfigChangeHandler);
}

/**
* Adds a cluster config change handler to handle Helix cluster config change callbacks.
* <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change
* handlers from running. For slow change handler, make it asynchronous.
*/
public void addClusterConfigChangeHandler(ClusterChangeHandler clusterConfigChangeHandler) {
_clusterConfigChangeHandlers.add(clusterConfigChangeHandler);
}

/**
* Adds a live instance change handler to handle Helix live instance change callbacks.
* <p>NOTE: all change handlers will be run in a single thread, so any slow change handler can block other change
Expand Down Expand Up @@ -350,6 +361,10 @@ public void start()
_brokerAdminApplication.start(_listenerConfigs);

LOGGER.info("Initializing cluster change mediator");
for (ClusterChangeHandler clusterConfigChangeHandler : _clusterConfigChangeHandlers) {
clusterConfigChangeHandler.init(_spectatorHelixManager);
}
_clusterConfigChangeHandlers.add(queryQuotaManager);
for (ClusterChangeHandler idealStateChangeHandler : _idealStateChangeHandlers) {
idealStateChangeHandler.init(_spectatorHelixManager);
}
Expand All @@ -368,6 +383,7 @@ public void start()
liveInstanceChangeHandler.init(_spectatorHelixManager);
}
Map<ChangeType, List<ClusterChangeHandler>> clusterChangeHandlersMap = new HashMap<>();
clusterChangeHandlersMap.put(ChangeType.CLUSTER_CONFIG, _clusterConfigChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.IDEAL_STATE, _idealStateChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.EXTERNAL_VIEW, _externalViewChangeHandlers);
clusterChangeHandlersMap.put(ChangeType.INSTANCE_CONFIG, _instanceConfigChangeHandlers);
Expand All @@ -379,6 +395,7 @@ public void start()
_spectatorHelixManager.addIdealStateChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addExternalViewChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addInstanceConfigChangeListener(_clusterChangeMediator);
_spectatorHelixManager.addClusterfigChangeListener(_clusterChangeMediator);
if (!_liveInstanceChangeHandlers.isEmpty()) {
_spectatorHelixManager.addLiveInstanceChangeListener(_clusterChangeMediator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -81,6 +82,8 @@ public void onBecomeOnlineFromOffline(Message message, NotificationContext conte
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig,
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
_queryQuotaManager.createDatabaseRateLimiter(
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(tableNameWithType));
} catch (Exception e) {
LOGGER.error("Caught exception while processing transition from OFFLINE to ONLINE for table: {}",
tableNameWithType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.helix.model.Message;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.messages.DatabaseConfigRefreshMessage;
import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -61,6 +63,8 @@ public MessageHandler createHandler(Message message, NotificationContext context
return new RefreshTableConfigMessageHandler(new TableConfigRefreshMessage(message), context);
case RoutingTableRebuildMessage.REBUILD_ROUTING_TABLE_MSG_SUB_TYPE:
return new RebuildRoutingTableMessageHandler(new RoutingTableRebuildMessage(message), context);
case DatabaseConfigRefreshMessage.REFRESH_DATABASE_CONFIG_MSG_SUB_TYPE:
return new RefreshDatabaseConfigMessageHandler(new DatabaseConfigRefreshMessage(message), context);
default:
// NOTE: Log a warning and return no-op message handler for unsupported message sub-types. This can happen when
// a new message sub-type is added, and the sender gets deployed first while receiver is still running the
Expand Down Expand Up @@ -117,6 +121,9 @@ public HelixTaskResult handleMessage() {
// TODO: Fetch the table config here and pass it into the managers, or consider merging these 2 managers
_routingManager.buildRouting(_tableNameWithType);
_queryQuotaManager.initOrUpdateTableQueryQuota(_tableNameWithType);
// only create the rate limiter if not present. This message has no reason to update the database rate limiter
_queryQuotaManager.createDatabaseRateLimiter(
DatabaseUtils.extractDatabaseFromFullyQualifiedTableName(_tableNameWithType));
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
Expand All @@ -129,6 +136,32 @@ public void onError(Exception e, ErrorCode code, ErrorType type) {
}
}

private class RefreshDatabaseConfigMessageHandler extends MessageHandler {
final String _databaseName;

RefreshDatabaseConfigMessageHandler(DatabaseConfigRefreshMessage databaseConfigRefreshMessage,
NotificationContext context) {
super(databaseConfigRefreshMessage, context);
_databaseName = databaseConfigRefreshMessage.getDatabaseName();
}

@Override
public HelixTaskResult handleMessage() {
// only update the existing rate limiter.
// Database rate limiter creation should only be done through table based change triggers
_queryQuotaManager.updateDatabaseRateLimiter(_databaseName);
HelixTaskResult result = new HelixTaskResult();
result.setSuccess(true);
return result;
}

@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
LOGGER.error("Got error while refreshing database config for database: {} (error code: {}, error type: {})",
_databaseName, code, type, e);
}
}

private class RebuildRoutingTableMessageHandler extends MessageHandler {
final String _tableNameWithType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
Expand All @@ -55,7 +57,7 @@
@PreFetch(enabled = false)
public class ClusterChangeMediator
implements IdealStateChangeListener, ExternalViewChangeListener, InstanceConfigChangeListener,
LiveInstanceChangeListener {
ClusterConfigChangeListener, LiveInstanceChangeListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterChangeMediator.class);

// If no change got for 1 hour, proactively check changes
Expand Down Expand Up @@ -192,6 +194,14 @@ public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, Notific
enqueueChange(ChangeType.INSTANCE_CONFIG);
}

@Override
public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
// Cluster config should be null because Helix pre-fetch is disabled
assert clusterConfig == null;

enqueueChange(ChangeType.CLUSTER_CONFIG);
}

@Override
public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) {
// Live instance list should be empty because Helix pre-fetch is disabled
Expand Down
Loading

0 comments on commit 77699ad

Please sign in to comment.