Skip to content

Commit

Permalink
Allow configuring DLO ranking params (#228)
Browse files Browse the repository at this point in the history
## Summary

Support additional 2 optional arguments that configure DLO strategies
ranking.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Create 2 test tables and generate strategies
```
scala> spark.sql("show tblproperties openhouse.db.test1 ('write.data-layout.strategies')").show(2000, false)
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key                         |value                                                                                                                                                                                                                                                                                                                                         |
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|write.data-layout.strategies|[{"score":9.99998858771037,"entropy":2.77080950301652896E17,"cost":0.5000005706151327,"gain":5.0,"config":{"targetByteSize":526385152,"minByteSizeRatio":0.75,"maxByteSizeRatio":10.0,"minInputFiles":5,"maxConcurrentFileGroupRewrites":5,"partialProgressEnabled":true,"partialProgressMaxCommits":1,"maxFileGroupSizeBytes":107374182400}}]|
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


scala> spark.sql("show tblproperties openhouse.db.test2 ('write.data-layout.strategies')").show(2000, false)
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key                         |value                                                                                                                                                                                                                                                                                                                                         |
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|write.data-layout.strategies|[{"score":9.99998858771037,"entropy":2.77080950301652896E17,"cost":0.5000005706151327,"gain":5.0,"config":{"targetByteSize":526385152,"minByteSizeRatio":0.75,"maxByteSizeRatio":10.0,"minInputFiles":5,"maxConcurrentFileGroupRewrites":5,"partialProgressEnabled":true,"partialProgressMaxCommits":1,"maxFileGroupSizeBytes":107374182400}}]|
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
Run DLO execution and check only 1 strategy is picked
```
24/10/9 7:49:24@oh-hadoop-spark:~$ docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type DATA_LAYOUT_STRATEGY_EXECUTION --cluster local --tablesURL http://openhouse-tables:8080/ --jobsURL http://openhouse-jobs:8080/ --tableMinAgeThresholdHours 0 --maxCostBudgetGbHrs 100 --maxStrategiesCount 1
2024-10-10 02:50:26 INFO  JobsScheduler:111 - Starting scheduler
2024-10-10 02:50:26 INFO  WebClientFactory:121 - Using connection pool strategy
2024-10-10 02:50:26 INFO  WebClientFactory:218 - Creating custom connection provider
2024-10-10 02:50:27 INFO  WebClientFactory:196 - Client session id: 97f5e44e-16ee-4b82-a84d-382b24b13415
2024-10-10 02:50:27 INFO  WebClientFactory:209 - Client name: null
2024-10-10 02:50:27 INFO  JobsScheduler:144 - Fetching task list based on the job type: DATA_LAYOUT_STRATEGY_EXECUTION
2024-10-10 02:50:28 INFO  OperationTasksBuilder:67 - Fetched metadata for 2 data layout strategies
2024-10-10 02:50:28 INFO  OperationTasksBuilder:82 - Max compute cost budget: 100.0, max strategies count: 1
2024-10-10 02:50:28 INFO  OperationTasksBuilder:89 - Selected 1 strategies
2024-10-10 02:50:28 INFO  OperationTasksBuilder:102 - Total estimated compute cost: 0.5000005706151327, total estimated reduced file count: 5.0
2024-10-10 02:50:28 INFO  OperationTasksBuilder:121 - Found metadata TableDataLayoutMetadata(super=TableMetadata(super=Metadata(creator=openhouse), dbName=db, tableName=test1, creationTimeMs=1728518785971, isPrimary=true, isTimePartitioned=true, isClustered=false, jobExecutionProperties={}, retentionConfig=null), dataLayoutStrategy=DataLayoutStrategy(score=9.99998858771037, entropy=2.770809503016529E17, cost=0.5000005706151327, gain=5.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
2024-10-10 02:50:28 INFO  JobsScheduler:155 - Submitting and running 1 jobs based on the job type: DATA_LAYOUT_STRATEGY_EXECUTION
2024-10-10 02:50:28 INFO  OperationTask:67 - Launching job for TableDataLayoutMetadata(super=TableMetadata(super=Metadata(creator=openhouse), dbName=db, tableName=test1, creationTimeMs=1728518785971, isPrimary=true, isTimePartitioned=true, isClustered=false, jobExecutionProperties={}, retentionConfig=null), dataLayoutStrategy=DataLayoutStrategy(score=9.99998858771037, entropy=2.770809503016529E17, cost=0.5000005706151327, gain=5.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
2024-10-10 02:50:29 INFO  OperationTask:93 - Launched a job with id DATA_LAYOUT_STRATEGY_EXECUTION_db_test1_65994c2f-6276-4dba-a05b-6dd6f78009b6 for TableDataLayoutMetadata(super=TableMetadata(super=Metadata(creator=openhouse), dbName=db, tableName=test1, creationTimeMs=1728518785971, isPrimary=true, isTimePartitioned=true, isClustered=false, jobExecutionProperties={}, retentionConfig=null), dataLayoutStrategy=DataLayoutStrategy(score=9.99998858771037, entropy=2.770809503016529E17, cost=0.5000005706151327, gain=5.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
...
2024-10-10 02:55:29 INFO  OperationTask:139 - Finished job for entity TableDataLayoutMetadata(super=TableMetadata(super=Metadata(creator=openhouse), dbName=db, tableName=test1, creationTimeMs=1728518785971, isPrimary=true, isTimePartitioned=true, isClustered=false, jobExecutionProperties={}, retentionConfig=null), dataLayoutStrategy=DataLayoutStrategy(score=9.99998858771037, entropy=2.770809503016529E17, cost=0.5000005706151327, gain=5.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400))): JobId DATA_LAYOUT_STRATEGY_EXECUTION_db_test1_65994c2f-6276-4dba-a05b-6dd6f78009b6, executionId 4, runTime 20517, queuedTime 11980, state SUCCEEDED
2024-10-10 02:55:29 INFO  JobsScheduler:198 - Finishing scheduler for job type DATA_LAYOUT_STRATEGY_EXECUTION, tasks stats: 1 created, 1 succeeded, 0 cancelled (timeout), 0 failed, 0 skipped (no state)
```

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
teamurko authored Oct 10, 2024
1 parent 5a78772 commit d11e4da
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -112,6 +113,7 @@ public static void main(String[] args) {
JobConf.JobTypeEnum operationType = getOperationJobType(cmdLine);
Class<? extends OperationTask> operationTaskCls = getOperationTaskCls(operationType.toString());
TablesClientFactory tablesClientFactory = getTablesClientFactory(cmdLine);
Properties properties = getAdditionalProperties(cmdLine);
OperationTaskFactory<? extends OperationTask> tasksFactory =
new OperationTaskFactory<>(
operationTaskCls, getJobsClientFactory(cmdLine), tablesClientFactory);
Expand All @@ -121,19 +123,28 @@ public static void main(String[] args) {
tasksFactory,
tablesClientFactory.create());
app.run(
operationType, operationTaskCls.toString(), isDryRun(cmdLine), getTasksWaitHours(cmdLine));
operationType,
operationTaskCls.toString(),
properties,
isDryRun(cmdLine),
getTasksWaitHours(cmdLine));
}

protected void run(
JobConf.JobTypeEnum jobType, String taskType, boolean isDryRun, int tasksWaitHours) {
JobConf.JobTypeEnum jobType,
String taskType,
Properties properties,
boolean isDryRun,
int tasksWaitHours) {
long startTimeMillis = System.currentTimeMillis();
METER.counterBuilder("scheduler_start_count").build().add(1);
Map<JobState, Integer> jobStateCountMap = new HashMap<>();
Arrays.stream(JobState.values()).sequential().forEach(s -> jobStateCountMap.put(s, 0));

log.info("Fetching task list based on the job type: {}", jobType);
List<OperationTask<?>> taskList =
new OperationTasksBuilder(taskFactory, tablesClient).buildOperationTaskList(jobType, METER);
new OperationTasksBuilder(taskFactory, tablesClient)
.buildOperationTaskList(jobType, properties, METER);
if (isDryRun && jobType.equals(JobConf.JobTypeEnum.ORPHAN_DIRECTORY_DELETION)) {
log.info("Dry running {} jobs based on the job type: {}", taskList.size(), jobType);
for (OperationTask<?> operationTask : taskList) {
Expand All @@ -143,14 +154,14 @@ protected void run(
}
log.info("Submitting and running {} jobs based on the job type: {}", taskList.size(), jobType);
List<Future<Optional<JobState>>> taskFutures = new ArrayList<>();
for (int taskIndex = 0; taskIndex < taskList.size(); ++taskIndex) {
taskFutures.add(executorService.submit(taskList.get(taskIndex)));
for (OperationTask<?> operationTask : taskList) {
taskFutures.add(executorService.submit(operationTask));
}

int emptyStateJobCount = 0;
for (int taskIndex = 0; taskIndex < taskList.size(); ++taskIndex) {
Optional<JobState> jobState = Optional.empty();
OperationTask task = taskList.get(taskIndex);
OperationTask<?> task = taskList.get(taskIndex);
Future<Optional<JobState>> taskFuture = taskFutures.get(taskIndex);
try {
long passedTimeMillis = System.currentTimeMillis() - startTimeMillis;
Expand Down Expand Up @@ -323,6 +334,20 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt("rootPath")
.desc("Root path of the file system")
.build());
options.addOption(
Option.builder(null)
.required(false)
.hasArg()
.longOpt(OperationTasksBuilder.MAX_STRATEGIES_COUNT)
.desc("Maximum number of strategies to schedule")
.build());
options.addOption(
Option.builder(null)
.required(false)
.hasArg()
.longOpt(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)
.desc("Maximum compute cost budget in GB hours")
.build());
CommandLineParser parser = new BasicParser();
try {
return parser.parse(options, args);
Expand Down Expand Up @@ -400,4 +425,19 @@ protected static int getNumParallelJobs(CommandLine cmdLine) {
protected static int getTasksWaitHours(CommandLine cmdLine) {
return NumberUtils.toInt(cmdLine.getOptionValue("tasksWaitHours"), TASKS_WAIT_HOURS_DEFAULT);
}

protected static Properties getAdditionalProperties(CommandLine cmdLine) {
Properties result = new Properties();
if (cmdLine.hasOption(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS)) {
result.setProperty(
OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS,
cmdLine.getOptionValue(OperationTasksBuilder.MAX_COST_BUDGET_GB_HRS));
}
if (cmdLine.hasOption(OperationTasksBuilder.MAX_STRATEGIES_COUNT)) {
result.setProperty(
OperationTasksBuilder.MAX_STRATEGIES_COUNT,
cmdLine.getOptionValue(OperationTasksBuilder.MAX_STRATEGIES_COUNT));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import io.opentelemetry.api.metrics.Meter;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.math.NumberUtils;

/**
* Prepares the task list based on the job type. Right now task type is either table based or
Expand All @@ -28,9 +30,11 @@
@Slf4j
@AllArgsConstructor
public class OperationTasksBuilder {
public static final String MAX_COST_BUDGET_GB_HRS = "maxCostBudgetGbHrs";
public static final String MAX_STRATEGIES_COUNT = "maxStrategiesCount";
private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3;
private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7;
private static final double MAX_COST_BUDGET_GB_HOURS_DEFAULT = 1000.0;
private static final double MAX_COST_BUDGET_GB_HRS_DEFAULT = 1000.0;
private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10;

@Getter(AccessLevel.NONE)
Expand All @@ -52,9 +56,14 @@ private List<OperationTask<?>> prepareTableDirectoryOperationTaskList(
}

private List<OperationTask<?>> prepareDataLayoutOperationTaskList(
JobConf.JobTypeEnum jobType, Meter meter) {
JobConf.JobTypeEnum jobType, Properties properties, Meter meter) {
List<TableDataLayoutMetadata> tableDataLayoutMetadataList =
tablesClient.getTableDataLayoutMetadataList();
// filter out non-primary and non-clustered/time-partitioned tables before ranking
tableDataLayoutMetadataList =
tableDataLayoutMetadataList.stream()
.filter(m -> m.isPrimary() && (m.isClustered() || m.isTimePartitioned()))
.collect(Collectors.toList());
log.info("Fetched metadata for {} data layout strategies", tableDataLayoutMetadataList.size());
List<DataLayoutStrategy> strategies =
tableDataLayoutMetadataList.stream()
Expand All @@ -64,9 +73,18 @@ private List<OperationTask<?>> prepareDataLayoutOperationTaskList(
new SimpleWeightedSumDataLayoutStrategyScorer(
COMPACTION_GAIN_WEIGHT_DEFAULT, COMPUTE_COST_WEIGHT_DEFAULT);
List<ScoredDataLayoutStrategy> scoredStrategies = scorer.scoreDataLayoutStrategies(strategies);
double maxComputeCost =
NumberUtils.toDouble(
properties.getProperty(MAX_COST_BUDGET_GB_HRS), MAX_COST_BUDGET_GB_HRS_DEFAULT);
int maxStrategiesCount =
NumberUtils.toInt(
properties.getProperty(MAX_STRATEGIES_COUNT), MAX_STRATEGIES_COUNT_DEFAULT);
log.info(
"Max compute cost budget: {}, max strategies count: {}",
maxComputeCost,
maxStrategiesCount);
DataLayoutCandidateSelector candidateSelector =
new GreedyMaxBudgetCandidateSelector(
MAX_COST_BUDGET_GB_HOURS_DEFAULT, MAX_STRATEGIES_COUNT_DEFAULT);
new GreedyMaxBudgetCandidateSelector(maxComputeCost, maxStrategiesCount);
List<Integer> selectedStrategyIndices = candidateSelector.select(scoredStrategies);
log.info("Selected {} strategies", selectedStrategyIndices.size());
List<TableDataLayoutMetadata> selectedTableDataLayoutMetadataList =
Expand Down Expand Up @@ -121,7 +139,8 @@ private List<OperationTask<?>> processMetadataList(
/**
* Fetches tables and associated metadata from Tables Service, and builds the operation task list.
*/
public List<OperationTask<?>> buildOperationTaskList(JobConf.JobTypeEnum jobType, Meter meter) {
public List<OperationTask<?>> buildOperationTaskList(
JobConf.JobTypeEnum jobType, Properties properties, Meter meter) {
switch (jobType) {
case DATA_COMPACTION:
case NO_OP:
Expand All @@ -134,7 +153,7 @@ public List<OperationTask<?>> buildOperationTaskList(JobConf.JobTypeEnum jobType
case DATA_LAYOUT_STRATEGY_GENERATION:
return prepareTableOperationTaskList(jobType);
case DATA_LAYOUT_STRATEGY_EXECUTION:
return prepareDataLayoutOperationTaskList(jobType, meter);
return prepareDataLayoutOperationTaskList(jobType, properties, meter);
case ORPHAN_DIRECTORY_DELETION:
return prepareTableDirectoryOperationTaskList(jobType);
default:
Expand Down

0 comments on commit d11e4da

Please sign in to comment.