Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
package org.apache.pinot.common.messages;

import com.google.common.base.Preconditions;
import java.util.List;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.model.Message;
import org.apache.helix.zookeeper.datamodel.ZNRecord;

Expand All @@ -34,19 +35,39 @@ public class SegmentReloadMessage extends Message {
public static final String RELOAD_SEGMENT_MSG_SUB_TYPE = "RELOAD_SEGMENT";

private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
private static final String SEGMENT_NAMES = "segmentNames";

public SegmentReloadMessage(@Nonnull String tableNameWithType, @Nullable String segmentName, boolean forceDownload) {
/**
* This msg asks server to reload all segments in the given a table.
*
* @param tableNameWithType the table where the segments are from.
* @param forceDownload whether to download segments from deep store when reloading.
*/
public SegmentReloadMessage(String tableNameWithType, boolean forceDownload) {
this(tableNameWithType, null, forceDownload);
}

/**
* This msg asks server to reload a list of specified segments in the given a table.
*
* @param tableNameWithType the table where the segments are from.
* @param segmentNames a list of specified segments to reload, or null for all segments.
* @param forceDownload whether to download segments from deep store when reloading.
*/
public SegmentReloadMessage(String tableNameWithType, @Nullable List<String> segmentNames, boolean forceDownload) {
super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
setResourceName(tableNameWithType);
if (segmentName != null) {
setPartitionName(segmentName);
}
setMsgSubType(RELOAD_SEGMENT_MSG_SUB_TYPE);
// Give it infinite time to process the message, as long as session is alive
setExecutionTimeout(-1);

ZNRecord znRecord = getRecord();
znRecord.setBooleanField(FORCE_DOWNLOAD_KEY, forceDownload);
if (CollectionUtils.isNotEmpty(segmentNames)) {
// TODO: use the new List field and deprecate the partition name in next release.
setPartitionName(segmentNames.get(0));
znRecord.setListField(SEGMENT_NAMES, segmentNames);
}
}

public SegmentReloadMessage(Message message) {
Expand All @@ -59,4 +80,9 @@ public SegmentReloadMessage(Message message) {
public boolean shouldForceDownload() {
return getRecord().getBooleanField(FORCE_DOWNLOAD_KEY, false);
}

@Nullable
public List<String> getSegmentList() {
return getRecord().getListField(SEGMENT_NAMES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
import org.apache.pinot.controller.helix.core.relocation.SegmentTierAssigner;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
Expand Down Expand Up @@ -152,7 +151,6 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
protected BrokerResourceValidationManager _brokerResourceValidationManager;
protected SegmentRelocator _segmentRelocator;
protected SegmentTierAssigner _segmentTierAssigner;
protected RetentionManager _retentionManager;
protected SegmentStatusChecker _segmentStatusChecker;
protected PinotTaskManager _taskManager;
Expand All @@ -166,6 +164,7 @@ public abstract class BaseControllerStarter implements ServiceStartable {
protected List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
protected MinionInstancesCleanupTask _minionInstancesCleanupTask;
protected TaskMetricsEmitter _taskMetricsEmitter;
protected MultiThreadedHttpConnectionManager _connectionManager;

@Override
public void init(PinotConfiguration pinotConfiguration)
Expand Down Expand Up @@ -429,6 +428,9 @@ private void setUpPinotController() {
}
_sqlQueryExecutor = new SqlQueryExecutor(_config.generateVipUrl());

_connectionManager = new MultiThreadedHttpConnectionManager();
_connectionManager.getParams().setConnectionTimeout(_config.getServerAdminRequestTimeoutSeconds() * 1000);

// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
LOGGER.info("Init controller periodic tasks scheduler");
Expand Down Expand Up @@ -456,8 +458,6 @@ private void setUpPinotController() {

LOGGER.info("Controller download url base: {}", _config.generateVipUrl());
LOGGER.info("Injecting configuration and resource managers to the API context");
final MultiThreadedHttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
connectionManager.getParams().setConnectionTimeout(_config.getServerAdminRequestTimeoutSeconds() * 1000);
// register all the controller objects for injection to jersey resources
_adminApp.registerBinder(new AbstractBinder() {
@Override
Expand All @@ -469,7 +469,7 @@ protected void configure() {
bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
bind(_taskManager).to(PinotTaskManager.class);
bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class);
bind(connectionManager).to(HttpConnectionManager.class);
bind(_connectionManager).to(HttpConnectionManager.class);
bind(_executorService).to(Executor.class);
bind(_controllerMetrics).to(ControllerMetrics.class);
bind(accessControlFactory).to(AccessControlFactory.class);
Expand Down Expand Up @@ -679,11 +679,8 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
_executorService);
periodicTasks.add(_segmentStatusChecker);
_segmentRelocator = new SegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics,
_executorService);
_executorService, _connectionManager);
periodicTasks.add(_segmentRelocator);
_segmentTierAssigner =
new SegmentTierAssigner(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_segmentTierAssigner);
_minionInstancesCleanupTask =
new MinionInstancesCleanupTask(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_minionInstancesCleanupTask);
Expand Down Expand Up @@ -753,6 +750,9 @@ private void stopPinotController() {
LOGGER.info("Disconnecting helix participant zk manager");
_helixParticipantManager.disconnect();

LOGGER.info("Shutting down http connection manager");
_connectionManager.shutdown();

LOGGER.info("Shutting down executor service");
_executorService.shutdownNow();
_executorService.awaitTermination(10L, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,8 @@ public static class ControllerPeriodicTasksConf {
"controller.realtimeSegmentRelocation.initialDelayInSeconds";
public static final String SEGMENT_RELOCATOR_INITIAL_DELAY_IN_SECONDS =
"controller.segmentRelocator.initialDelayInSeconds";
public static final String SEGMENT_TIER_ASSIGNER_FREQUENCY_PERIOD =
"controller.segmentTierAssigner.frequencyPeriod";
public static final String SEGMENT_TIER_ASSIGNER_INITIAL_DELAY_IN_SECONDS =
"controller.segmentTierAssigner.initialDelayInSeconds";
public static final String SEGMENT_RELOCATOR_ENABLE_LOCAL_TIER_MIGRATION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a separate config for this? Isn't this the same as tier assigner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SegmentTierAssigner just calculates target tiers for segments, but not trigger the real migration.

The SegmentRelocator is extended to kick off the tier migration automatically. But it can be turned off via this knob, if someone wants to trigger this manually to fully control when the migration starts.

"controller.segmentRelocator.enableLocalTierMigration";

// The flag to indicate if controller periodic job will fix the missing LLC segment deep store copy.
// Default value is false.
Expand Down Expand Up @@ -615,11 +613,9 @@ public void setStatusCheckerWaitForPushTimeInSeconds(int statusCheckerWaitForPus
}

/**
* RealtimeSegmentRelocator has been rebranded to SegmentRelocator. Returns
* <code>controller.segment.relocator.frequencyInSeconds</code> or <code>controller.segment.relocator
* .frequencyInSeconds</code>
* or REALTIME_SEGMENT_RELOCATOR_FREQUENCY, in the order of decreasing perference (left ->
* right).
* RealtimeSegmentRelocator has been rebranded to SegmentRelocator. Returns <code>controller.segment.relocator
* .frequencyPeriod</code> or <code>controller.segment.relocator .frequencyInSeconds</code> or
* REALTIME_SEGMENT_RELOCATOR_FREQUENCY, in the order of decreasing perference (left -> right).
*/
public int getSegmentRelocatorFrequencyInSeconds() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_FREQUENCY_PERIOD))
Expand Down Expand Up @@ -854,15 +850,8 @@ public long getSegmentRelocatorInitialDelayInSeconds() {
return segmentRelocatorInitialDelaySeconds;
}

public int getSegmentTierAssignerFrequencyInSeconds() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.SEGMENT_TIER_ASSIGNER_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period))
.orElse(ControllerPeriodicTasksConf.DEFAULT_SEGMENT_TIER_ASSIGNER_FREQUENCY_IN_SECONDS);
}

public long getSegmentTierAssignerInitialDelayInSeconds() {
return getProperty(ControllerPeriodicTasksConf.SEGMENT_TIER_ASSIGNER_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
public boolean enableSegmentRelocatorLocalTierMigration() {
return getProperty(ControllerPeriodicTasksConf.SEGMENT_RELOCATOR_ENABLE_LOCAL_TIER_MIGRATION, false);
}

public long getPeriodicTaskInitialDelayInSeconds() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2323,7 +2323,7 @@ public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean
recipientCriteria.setInstanceName("%");
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, null, forceDownload);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, forceDownload);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();

// Infinite timeout on the recipient
Expand Down Expand Up @@ -2356,7 +2356,8 @@ public Pair<Integer, String> reloadSegment(String tableNameWithType, String segm
recipientCriteria.setResource(tableNameWithType);
recipientCriteria.setPartition(segmentName);
recipientCriteria.setSessionSpecific(true);
SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segmentName, forceDownload);
SegmentReloadMessage segmentReloadMessage =
new SegmentReloadMessage(tableNameWithType, Collections.singletonList(segmentName), forceDownload);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();

// Infinite timeout on the recipient
Expand Down
Loading