diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java index 9e051f51..1f471061 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/JobsScheduler.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -112,6 +113,7 @@ public static void main(String[] args) { JobConf.JobTypeEnum operationType = getOperationJobType(cmdLine); Class operationTaskCls = getOperationTaskCls(operationType.toString()); TablesClientFactory tablesClientFactory = getTablesClientFactory(cmdLine); + Properties properties = getAdditionalProperties(cmdLine); OperationTaskFactory tasksFactory = new OperationTaskFactory<>( operationTaskCls, getJobsClientFactory(cmdLine), tablesClientFactory); @@ -121,11 +123,19 @@ public static void main(String[] args) { tasksFactory, tablesClientFactory.create()); app.run( - operationType, operationTaskCls.toString(), isDryRun(cmdLine), getTasksWaitHours(cmdLine)); + operationType, + operationTaskCls.toString(), + properties, + isDryRun(cmdLine), + getTasksWaitHours(cmdLine)); } protected void run( - JobConf.JobTypeEnum jobType, String taskType, boolean isDryRun, int tasksWaitHours) { + JobConf.JobTypeEnum jobType, + String taskType, + Properties properties, + boolean isDryRun, + int tasksWaitHours) { long startTimeMillis = System.currentTimeMillis(); METER.counterBuilder("scheduler_start_count").build().add(1); Map jobStateCountMap = new HashMap<>(); @@ -133,7 +143,8 @@ protected void run( log.info("Fetching task list based on the job type: {}", jobType); List> taskList = - new OperationTasksBuilder(taskFactory, tablesClient).buildOperationTaskList(jobType, METER); + new OperationTasksBuilder(taskFactory, tablesClient) + .buildOperationTaskList(jobType, properties, METER); if (isDryRun && jobType.equals(JobConf.JobTypeEnum.ORPHAN_DIRECTORY_DELETION)) { log.info("Dry running {} jobs based on the job type: {}", taskList.size(), jobType); for (OperationTask operationTask : taskList) { @@ -143,14 +154,14 @@ protected void run( } log.info("Submitting and running {} jobs based on the job type: {}", taskList.size(), jobType); List>> taskFutures = new ArrayList<>(); - for (int taskIndex = 0; taskIndex < taskList.size(); ++taskIndex) { - taskFutures.add(executorService.submit(taskList.get(taskIndex))); + for (OperationTask operationTask : taskList) { + taskFutures.add(executorService.submit(operationTask)); } int emptyStateJobCount = 0; for (int taskIndex = 0; taskIndex < taskList.size(); ++taskIndex) { Optional jobState = Optional.empty(); - OperationTask task = taskList.get(taskIndex); + OperationTask task = taskList.get(taskIndex); Future> taskFuture = taskFutures.get(taskIndex); try { long passedTimeMillis = System.currentTimeMillis() - startTimeMillis; @@ -323,6 +334,20 @@ protected static CommandLine parseArgs(String[] args) { .longOpt("rootPath") .desc("Root path of the file system") .build()); + options.addOption( + Option.builder(null) + .required(false) + .hasArg() + .longOpt(OperationTasksBuilder.MAX_STRATEGIES_COUNT) + .desc("Maximum number of strategies to schedule") + .build()); + options.addOption( + Option.builder(null) + .required(false) + .hasArg() + .longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS) + .desc("Maximum compute cost budget in GB hours") + .build()); CommandLineParser parser = new BasicParser(); try { return parser.parse(options, args); @@ -400,4 +425,19 @@ protected static int getNumParallelJobs(CommandLine cmdLine) { protected static int getTasksWaitHours(CommandLine cmdLine) { return NumberUtils.toInt(cmdLine.getOptionValue("tasksWaitHours"), TASKS_WAIT_HOURS_DEFAULT); } + + protected static Properties getAdditionalProperties(CommandLine cmdLine) { + Properties result = new Properties(); + if (cmdLine.hasOption(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)) { + result.setProperty( + OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS, + cmdLine.getOptionValue(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)); + } + if (cmdLine.hasOption(OperationTasksBuilder.MAX_STRATEGIES_COUNT)) { + result.setProperty( + OperationTasksBuilder.MAX_STRATEGIES_COUNT, + cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT)); + } + return result; + } } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java index 0cc8a395..8dfd2eda 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java @@ -15,11 +15,13 @@ import io.opentelemetry.api.metrics.Meter; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.math.NumberUtils; /** * Prepares the task list based on the job type. Right now task type is either table based or @@ -28,9 +30,11 @@ @Slf4j @AllArgsConstructor public class OperationTasksBuilder { + public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs"; + public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount"; private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3; private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7; - private static final double MAX_COST_BUDGET_GB_HOURS_DEFAULT = 1000.0; + private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0; private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10; @Getter(AccessLevel.NONE) @@ -52,9 +56,14 @@ private List> prepareTableDirectoryOperationTaskList( } private List> prepareDataLayoutOperationTaskList( - JobConf.JobTypeEnum jobType, Meter meter) { + JobConf.JobTypeEnum jobType, Properties properties, Meter meter) { List tableDataLayoutMetadataList = tablesClient.getTableDataLayoutMetadataList(); + // filter out non-primary and non-clustered/time-partitioned tables before ranking + tableDataLayoutMetadataList = + tableDataLayoutMetadataList.stream() + .filter(m -> m.isPrimary() && (m.isClustered() || m.isTimePartitioned())) + .collect(Collectors.toList()); log.info("Fetched metadata for {} data layout strategies", tableDataLayoutMetadataList.size()); List strategies = tableDataLayoutMetadataList.stream() @@ -64,9 +73,18 @@ private List> prepareDataLayoutOperationTaskList( new SimpleWeightedSumDataLayoutStrategyScorer( COMPACTION_GAIN_WEIGHT_DEFAULT, COMPUTE_COST_WEIGHT_DEFAULT); List scoredStrategies = scorer.scoreDataLayoutStrategies(strategies); + double maxComputeCost = + NumberUtils.toDouble( + properties.getProperty(MAX_COST_BUDGET_GB_HRS), MAX_COST_BUDGET_GB_HRS_DEFAULT); + int maxStrategiesCount = + NumberUtils.toInt( + properties.getProperty(MAX_STRATEGIES_COUNT), MAX_STRATEGIES_COUNT_DEFAULT); + log.info( + "Max compute cost budget: {}, max strategies count: {}", + maxComputeCost, + maxStrategiesCount); DataLayoutCandidateSelector candidateSelector = - new GreedyMaxBudgetCandidateSelector( - MAX_COST_BUDGET_GB_HOURS_DEFAULT, MAX_STRATEGIES_COUNT_DEFAULT); + new GreedyMaxBudgetCandidateSelector(maxComputeCost, maxStrategiesCount); List selectedStrategyIndices = candidateSelector.select(scoredStrategies); log.info("Selected {} strategies", selectedStrategyIndices.size()); List selectedTableDataLayoutMetadataList = @@ -121,7 +139,8 @@ private List> processMetadataList( /** * Fetches tables and associated metadata from Tables Service, and builds the operation task list. */ - public List> buildOperationTaskList(JobConf.JobTypeEnum jobType, Meter meter) { + public List> buildOperationTaskList( + JobConf.JobTypeEnum jobType, Properties properties, Meter meter) { switch (jobType) { case DATA_COMPACTION: case NO_OP: @@ -134,7 +153,7 @@ public List> buildOperationTaskList(JobConf.JobTypeEnum jobType case DATA_LAYOUT_STRATEGY_GENERATION: return prepareTableOperationTaskList(jobType); case DATA_LAYOUT_STRATEGY_EXECUTION: - return prepareDataLayoutOperationTaskList(jobType, meter); + return prepareDataLayoutOperationTaskList(jobType, properties, meter); case ORPHAN_DIRECTORY_DELETION: return prepareTableDirectoryOperationTaskList(jobType); default: