Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ rules:
labels:
table: "$1"
taskType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.(\\w+)\\.(\\w+).cronSchedulerSkipped\"><>(\\w+)"
name: "pinot_controller_cronSchedulerSkipped_$3"
cache: true
labels:
table: "$1"
taskType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.(\\w+)\\.(\\w+).cronSchedulerJobExecutionTimeMs\"><>(\\w+)"
name: "pinot_controller_cronSchedulerJobExecutionTimeMs_$3"
cache: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
NUMBER_END_REPLACE_FAILURE("NumEndReplaceFailure", false),
NUMBER_REVERT_REPLACE_FAILURE("NumRevertReplaceFailure", false),
CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
CRON_SCHEDULER_JOB_SKIPPED("cronSchedulerJobSkipped", false),
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public static class ControllerPeriodicTasksConf {
@Deprecated
public static final String DEPRECATED_TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
public static final String TASK_MANAGER_FREQUENCY_PERIOD = "controller.task.frequencyPeriod";
public static final String TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE = "controller.task.skipLateCronSchedule";
public static final String TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS =
"controller.task.maxCronScheduleDelayInSeconds";
// Deprecated as of 0.8.0
@Deprecated
public static final String DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS =
Expand Down Expand Up @@ -654,6 +657,14 @@ public void setDeletedSegmentsRetentionInDays(int retentionInDays) {
setProperty(DELETED_SEGMENTS_RETENTION_IN_DAYS, retentionInDays);
}

public boolean isSkipLateCronSchedule() {
return getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, false);
}

public int getMaxCronScheduleDelayInSeconds() {
return getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, 600);
}

public int getTaskManagerFrequencyInSeconds() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.minion;

import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerTimer;
Expand All @@ -43,13 +44,27 @@ public void execute(JobExecutionContext jobExecutionContext)
LeadControllerManager leadControllerManager =
(LeadControllerManager) jobExecutionContext.getJobDetail().getJobDataMap()
.get(PinotTaskManager.LEAD_CONTROLLER_MANAGER_KEY);
Boolean skipLateCronSchedule =
(Boolean) jobExecutionContext.getJobDetail().getJobDataMap().get(PinotTaskManager.SKIP_LATE_CRON_SCHEDULE);
int maxDelayInSeconds = (Integer) jobExecutionContext.getJobDetail().getJobDataMap()
.get(PinotTaskManager.MAX_CRON_SCHEDULE_DELAY_IN_SECONDS);
String table = jobExecutionContext.getJobDetail().getKey().getName();
String taskType = jobExecutionContext.getJobDetail().getKey().getGroup();
pinotTaskManager.getControllerMetrics().addMeteredTableValue(PinotTaskManager.getCronJobName(table, taskType),
ControllerMeter.CRON_SCHEDULER_JOB_TRIGGERED, 1L);
if (leadControllerManager.isLeaderForTable(table)) {
Date fireTime = jobExecutionContext.getFireTime();
LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, fireTime);
Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
if (skipLateCronSchedule && isCronScheduleLate(fireTime, scheduledFireTime, maxDelayInSeconds)) {
LOGGER.warn(
"Skip late CronJob: table - {}, task - {} fired at {} but expected at {} with allowed delayInSeconds: {}",
table, taskType, fireTime, scheduledFireTime, maxDelayInSeconds);
pinotTaskManager.getControllerMetrics().addMeteredTableValue(PinotTaskManager.getCronJobName(table, taskType),
ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
return;
}
long jobStartTime = System.currentTimeMillis();
LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, jobExecutionContext.getFireTime());
pinotTaskManager.scheduleTask(taskType, table);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
Expand All @@ -60,4 +75,8 @@ public void execute(JobExecutionContext jobExecutionContext)
LOGGER.info("Not Lead, skip processing CronJob: table - {}, task - {}", table, taskType);
}
}

private boolean isCronScheduleLate(Date fireTime, Date scheduledFireTime, long maxDelayInSeconds) {
return fireTime.getTime() - scheduledFireTime.getTime() > maxDelayInSeconds * 1000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);

public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
public final static String SKIP_LATE_CRON_SCHEDULE = "SkipLateCronSchedule";
public final static String MAX_CRON_SCHEDULE_DELAY_IN_SECONDS = "MaxCronScheduleDelayInSeconds";
public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
public final static String SCHEDULE_KEY = "schedule";

Expand All @@ -96,6 +98,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {

// For cron-based scheduling
private final Scheduler _scheduler;
private final boolean _skipLateCronSchedule;
private final int _maxCronScheduleDelayInSeconds;
private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();

Expand All @@ -120,7 +124,8 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
leadControllerManager);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);

_skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
_maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
try {
_scheduler = new StdSchedulerFactory().getScheduler();
Expand Down Expand Up @@ -412,6 +417,8 @@ private void scheduleJob(String tableWithType, String taskType, String cronExprS
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, _leadControllerManager);
jobDataMap.put(SKIP_LATE_CRON_SCHEDULE, _skipLateCronSchedule);
jobDataMap.put(MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, _maxCronScheduleDelayInSeconds);
JobDetail jobDetail =
JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ public void testDefaultPinotTaskManagerNoScheduler()
stopController();
}

@Test
public void testSkipLateCronSchedule()
throws Exception {
Map<String, Object> properties = getDefaultControllerConfiguration();
properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, "true");
properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, "10");
startController(properties);
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
addFakeServerInstancesToAutoJoinHelixCluster(1, true);
Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
.addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
.addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
.addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build();
addSchema(schema);
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assertNotNull(scheduler);

// Add Table with one task.
TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
new TableTaskConfig(
ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 * * ? * * *")))).build();
addTableConfig(tableConfig);
waitForJobGroupNames(_controllerStarter.getTaskManager(),
jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
"JobGroupNames should have SegmentGenerationAndPushTask only");
validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *", true, 10);

dropOfflineTable(RAW_TABLE_NAME);
waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
stopFakeInstances();
stopController();
}

@Test
public void testPinotTaskManagerSchedulerWithUpdate()
throws Exception {
Expand Down Expand Up @@ -219,6 +254,11 @@ private void waitForJobGroupNames(PinotTaskManager taskManager, Predicate<List<S

private void validateJob(String taskType, String cronExpression)
throws Exception {
validateJob(taskType, cronExpression, false, 600);
}

private void validateJob(String taskType, String cronExpression, boolean skipLateCronSchedule, int maxDelayInSeconds)
throws Exception {
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assert scheduler != null;
Expand All @@ -231,6 +271,8 @@ private void validateJob(String taskType, String cronExpression)
assertEquals(jobDetail.getKey().getGroup(), taskType);
assertSame(jobDetail.getJobDataMap().get("PinotTaskManager"), taskManager);
assertSame(jobDetail.getJobDataMap().get("LeadControllerManager"), _controllerStarter.getLeadControllerManager());
assertEquals(jobDetail.getJobDataMap().get("SkipLateCronSchedule"), skipLateCronSchedule);
assertEquals(jobDetail.getJobDataMap().get("MaxCronScheduleDelayInSeconds"), maxDelayInSeconds);
// jobDetail and jobTrigger are not added atomically by the scheduler,
// the jobDetail is added to an internal map firstly, and jobTrigger
// is added to another internal map afterwards, so we check for the existence
Expand Down