Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS to minion config instead of task config. #7516

Merged
merged 1 commit into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Introduce MinionConf, move END_REPLACE_SEGMENTS_TIMEOUT_MS to minion …
…config instead of task config.
  • Loading branch information
jtao15 committed Oct 8, 2021
commit 761e28c41922ed05cc8cbccd10e82e10e77fbaaf
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ private MinionConstants() {

public static final String TABLE_MAX_NUM_TASKS_KEY = "tableMaxNumTasks";
public static final String ENABLE_REPLACE_SEGMENTS_KEY = "enableReplaceSegments";
public static final String END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY = "endReplaceSegmentsSocketTimeoutMs";

public static class ConvertToRawIndexTask {
public static final String TASK_TYPE = "ConvertToRawIndexTask";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.integration.tests.SimpleMinionClusterIntegrationTest;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
Expand All @@ -47,6 +48,10 @@ public class TestTaskExecutorFactory implements PinotTaskExecutorFactory {
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
}

@Override
public String getTaskType() {
return SimpleMinionClusterIntegrationTest.TASK_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,7 +69,7 @@ public abstract class BaseMinionStarter implements ServiceStartable {

private static final String HTTPS_ENABLED = "enabled";

protected PinotConfiguration _config;
protected MinionConf _config;
protected String _hostname;
protected int _port;
protected String _instanceId;
Expand All @@ -83,14 +82,12 @@ public abstract class BaseMinionStarter implements ServiceStartable {
@Override
public void init(PinotConfiguration config)
throws Exception {
_config = config;
String helixClusterName = _config.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
String zkAddress = _config.getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
_hostname = _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_HOST,
_config.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils
.getHostnameOrAddress() : NetUtils.getHostAddress());
_port = _config.getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT);
_instanceId = _config.getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
_config = new MinionConf(config.toMap());
String helixClusterName = _config.getHelixClusterName();
String zkAddress = _config.getZkAddress();
_hostname = _config.getHostName();
_port = _config.getPort();
_instanceId = _config.getInstanceId();
if (_instanceId != null) {
// NOTE: Force all instances to have the same prefix in order to derive the instance type based on the instance id
Preconditions.checkState(_instanceId.startsWith(CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE),
Expand All @@ -102,7 +99,7 @@ public void init(PinotConfiguration config)
setupHelixSystemProperties();
_helixManager = new ZKHelixManager(helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress);
MinionTaskZkMetadataManager minionTaskZkMetadataManager = new MinionTaskZkMetadataManager(_helixManager);
_taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
_taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry(minionTaskZkMetadataManager, _config);
_eventObserverFactoryRegistry = new EventObserverFactoryRegistry(minionTaskZkMetadataManager);
}

Expand Down
66 changes: 66 additions & 0 deletions pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.minion;

import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.NetUtils;


public class MinionConf extends PinotConfiguration {
public static final String END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY = "pinot.minion.endReplaceSegments.timeoutMs";
public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 * 60 * 1000; // 10 mins

public MinionConf() {
super(new HashMap<>());
}

public MinionConf(Map<String, Object> baseProperties) {
super(baseProperties);
}

public String getHelixClusterName() {
return getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME);
}

public String getZkAddress() {
return getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER);
}

public String getHostName()
throws Exception {
return getProperty(CommonConstants.Helix.KEY_OF_MINION_HOST,
getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false) ? NetUtils
.getHostnameOrAddress() : NetUtils.getHostAddress());
}

public int getPort() {
return getProperty(CommonConstants.Helix.KEY_OF_MINION_PORT, CommonConstants.Minion.DEFAULT_HELIX_PORT);
}

public String getInstanceId() {
return getProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY);
}

public int getEndReplaceSegmentsTimeoutMs() {
return getProperty(END_REPLACE_SEGMENTS_TIMEOUT_MS_KEY, DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pinot.minion.executor;

import org.apache.pinot.minion.MinionConf;


/**
* Factory for {@link PinotTaskExecutor}.
*/
Expand All @@ -26,8 +29,14 @@ public interface PinotTaskExecutorFactory {
/**
* Initializes the task executor factory.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure there's no usage of this within the Pinot code base?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's no usage of this function.

void init(MinionTaskZkMetadataManager zkMetadataManager);

/**
* Initializes the task executor factory.
*/
void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf);

/**
* Returns the task type of the executor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
import org.apache.pinot.spi.utils.PinotReflectionUtils;
import org.slf4j.Logger;
Expand All @@ -45,15 +46,15 @@ public class TaskExecutorFactoryRegistry {
* NOTE: In order to plugin a class using reflection, the class should include ".plugin.minion.tasks." in its class
* path. This convention can significantly reduce the time of class scanning.
*/
public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
long startTimeMs = System.currentTimeMillis();
Set<Class<?>> classes = getTaskExecutorFactoryClasses();
for (Class<?> clazz : classes) {
TaskExecutorFactory annotation = clazz.getAnnotation(TaskExecutorFactory.class);
if (annotation.enabled()) {
try {
PinotTaskExecutorFactory taskExecutorFactory = (PinotTaskExecutorFactory) clazz.newInstance();
taskExecutorFactory.init(zkMetadataManager);
taskExecutorFactory.init(zkMetadataManager, minionConf);
registerTaskExecutorFactory(taskExecutorFactory);
} catch (Exception e) {
LOGGER.error("Caught exception while initializing and registering task executor factory: {}, skipping it",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand All @@ -57,6 +58,12 @@
public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);

protected MinionConf _minionConf;

public BaseMultipleSegmentsConversionExecutor(MinionConf minionConf) {
_minionConf = minionConf;
}

/**
* Converts the segment based on the given {@link PinotTaskConfig}.
*
Expand Down Expand Up @@ -206,12 +213,9 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig

// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
int endReplaceSegmentsSocketTimeoutMs =
configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY) != null
? Integer.parseInt(configs.get(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY))
: FileUploadDownloadClient.DEFAULT_SOCKET_TIMEOUT_MS;
SegmentConversionUtils
.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId, endReplaceSegmentsSocketTimeoutMs);
.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs());
}

String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.converttorawindex;

import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
Expand All @@ -32,6 +33,10 @@ public class ConvertToRawIndexTaskExecutorFactory implements PinotTaskExecutorFa
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
}

@Override
public String getTaskType() {
return MinionConstants.ConvertToRawIndexTask.TASK_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
Expand All @@ -46,6 +47,10 @@
public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);

public MergeRollupTaskExecutor(MinionConf minionConf) {
super(minionConf);
}

@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
File workingDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
Expand All @@ -27,18 +28,24 @@

@TaskExecutorFactory
public class MergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory {
private MinionConf _minionConf;

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
_minionConf = minionConf;
}

@Override
public String getTaskType() {
return MinionConstants.MergeRollupTask.TASK_TYPE;
}

@Override
public PinotTaskExecutor create() {
return new MergeRollupTaskExecutor();
return new MergeRollupTaskExecutor(_minionConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
public class MergeRollupTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class);

public static final int END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 30 * 60 * 1000; // 30 mins
private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
private static final String REFRESH = "REFRESH";

Expand Down Expand Up @@ -486,8 +485,6 @@ private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> sel
configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + System.currentTimeMillis() + "_" + i + "_"
+ TableNameBuilder.extractRawTableName(offlineTableName));
configs.put(MinionConstants.END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS_KEY,
String.valueOf(END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS));
pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, configs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.purge;

import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
Expand All @@ -32,6 +33,10 @@ public class PurgeTaskExecutorFactory implements PinotTaskExecutorFactory {
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
}

@Override
public String getTaskType() {
return MinionConstants.PurgeTask.TASK_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pinot.core.segment.processing.framework.MergeType;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
Expand Down Expand Up @@ -72,7 +73,9 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
private int _expectedVersion = Integer.MIN_VALUE;

public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager) {
public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager,
MinionConf minionConf) {
super(minionConf);
_minionTaskZkMetadataManager = minionTaskZkMetadataManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;

import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutor;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
Expand All @@ -31,19 +32,26 @@
@TaskExecutorFactory
public class RealtimeToOfflineSegmentsTaskExecutorFactory implements PinotTaskExecutorFactory {
private MinionTaskZkMetadataManager _zkMetadataManager;
private MinionConf _minionConf;

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager) {
_zkMetadataManager = zkMetadataManager;
}

@Override
public void init(MinionTaskZkMetadataManager zkMetadataManager, MinionConf minionConf) {
_zkMetadataManager = zkMetadataManager;
_minionConf = minionConf;
}

@Override
public String getTaskType() {
return MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE;
}

@Override
public PinotTaskExecutor create() {
return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager);
return new RealtimeToOfflineSegmentsTaskExecutor(_zkMetadataManager, _minionConf);
}
}
Loading