Skip to content

Commit

Permalink
Support ranking and scheduling of DLO strategies (#219)
Browse files Browse the repository at this point in the history
## Summary

Support DataLayout strategies ranking and execution
- Fetch strategies and add to metadata
- Score, rank strategies and execute strategies utilizing existing
DataCompaction app

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Create test table 
```
scala> spark.sql("create table openhouse.db.test (id int, data string, ts timestamp) partitioned by (days(ts))")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("insert into openhouse.db.test values (0, '0', current_timestamp()), (1, '1', current_timestamp())")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("insert into openhouse.db.test values (2, '2', current_timestamp()), (3, '3', current_timestamp())")
res3: org.apache.spark.sql.DataFrame = []

scala> spark.sql("insert into openhouse.db.test values (4, '4', current_timestamp()), (5, '5', current_timestamp())")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select file_path from openhouse.db.test.data_files").show(200, false)
+------------------------------------------------------------------------------------------------------------------------------------------+
|file_path                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------+
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00000-1-7b099167-c0e8-405b-9fc7-2b0047647f5e-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00001-2-4668d138-dda6-4787-a734-7f9427124b79-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00000-2-e7f95684-0dd7-4af6-9d88-6214fc50593c-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00001-3-15597ffe-d676-4388-b1b9-40cb42d3bba7-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00000-0-32aed73f-21d4-4b2d-b7bc-1563ba7fd78d-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00001-1-26f2bca1-c70f-45a5-9215-3fdae39b3758-00001.orc|
+------------------------------------------------------------------------------------------------------------------------------------------+
```

Run OFD and check that it starts the job
```
docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type ORPHAN_FILES_DELETION --cluster local --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 --tableMinAgeThresholdHours 0


2024-10-03 13:28:17 INFO  JobsScheduler:110 - Starting scheduler
2024-10-03 13:28:18 INFO  JobsScheduler:134 - Fetching task list based on the job type: ORPHAN_FILES_DELETION
2024-10-03 13:28:19 INFO  OperationTasksBuilder:37 - Fetched metadata for 1 tables
2024-10-03 13:28:19 INFO  OperationTasksBuilder:73 - Found metadata TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:28:19 INFO  JobsScheduler:144 - Submitting and running 1 jobs based on the job type: ORPHAN_FILES_DELETION
2024-10-03 13:28:19 INFO  OperationTask:67 - Launching job for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:28:25 INFO  OperationTask:93 - Launched a job with id ORPHAN_FILES_DELETION_db_test_b00e4ff2-7d51-42bf-8059-7fba3db354f3 for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
...
```

Run DLO generation and check that it starts the job
```
docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type DATA_LAYOUT_STRATEGY_GENERATION --cluster local --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 --tableMinAgeThresholdHours 0

2024-10-03 13:38:16 INFO  JobsScheduler:110 - Starting scheduler
2024-10-03 13:38:16 INFO  JobsScheduler:134 - Fetching task list based on the job type: DATA_LAYOUT_STRATEGY_GENERATION
2024-10-03 13:38:17 INFO  OperationTasksBuilder:37 - Fetched metadata for 1 tables
2024-10-03 13:38:17 INFO  OperationTasksBuilder:73 - Found metadata TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:38:17 INFO  JobsScheduler:144 - Submitting and running 1 jobs based on the job type: DATA_LAYOUT_STRATEGY_GENERATION
2024-10-03 13:38:17 INFO  OperationTask:67 - Launching job for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:38:18 INFO  OperationTask:93 - Launched a job with id DATA_LAYOUT_STRATEGY_GENERATION_db_test_4229af93-1dfb-40ff-af14-0e2c0483ffbe for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
...
```

Show strategies
```
scala> spark.sql("show tblproperties openhouse.db.test ('write.data-layout.strategies')").show(200, false)
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key                         |value                                                                                                                                                                                                                                                                                                                                          |
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|write.data-layout.strategies|[{"score":5.999994214381393,"entropy":2.77080849235781824E17,"cost":0.5000004821353489,"gain":3.0,"config":{"targetByteSize":526385152,"minByteSizeRatio":0.75,"maxByteSizeRatio":10.0,"minInputFiles":5,"maxConcurrentFileGroupRewrites":5,"partialProgressEnabled":true,"partialProgressMaxCommits":1,"maxFileGroupSizeBytes":107374182400}}]|
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```

Run DLO execution and check that the job completes
```
docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type DATA_LAYOUT_STRATEGY_EXECUTION --cluster local --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 --tableMinAgeThresholdHours 0

2024-10-03 14:18:18 INFO  JobsScheduler:110 - Starting scheduler
2024-10-03 14:18:18 INFO  JobsScheduler:134 - Fetching task list based on the job type: DATA_LAYOUT_STRATEGY_EXECUTION
2024-10-03 14:18:19 INFO  OperationTasksBuilder:51 - Fetched metadata for 1 data layout strategies
2024-10-03 14:18:19 INFO  OperationTasksBuilder:61 - Selected 1 strategies
2024-10-03 14:18:19 INFO  OperationTasksBuilder:73 - Found metadata TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
2024-10-03 14:18:19 INFO  JobsScheduler:144 - Submitting and running 1 jobs based on the job type: DATA_LAYOUT_STRATEGY_EXECUTION
2024-10-03 14:18:19 INFO  OperationTask:67 - Launching job for TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
2024-10-03 14:18:20 INFO  OperationTask:93 - Launched a job with id DATA_LAYOUT_STRATEGY_EXECUTION_db_test_e0fe80e4-f0b7-4fac-a10f-f870dabf3937 for TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
...
2024-10-03 14:23:20 INFO  OperationTask:139 - Finished job for entity TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400))): JobId DATA_LAYOUT_STRATEGY_EXECUTION_db_test_e0fe80e4-f0b7-4fac-a10f-f870dabf3937, executionId 3, runTime 16588, queuedTime 12461, state SUCCEEDED
2024-10-03 14:23:20 INFO  JobsScheduler:187 - Finishing scheduler for job type DATA_LAYOUT_STRATEGY_EXECUTION, tasks stats: 1 created, 1 succeeded, 0 cancelled (timeout), 0 failed, 0 skipped (no state)
```

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
teamurko authored Oct 7, 2024
1 parent 32964d1 commit de704cd
Show file tree
Hide file tree
Showing 21 changed files with 320 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.RetentionConfig;
import com.linkedin.openhouse.jobs.util.RetryUtil;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import com.linkedin.openhouse.tables.client.api.DatabaseApi;
import com.linkedin.openhouse.tables.client.api.TableApi;
Expand All @@ -15,17 +16,20 @@
import com.linkedin.openhouse.tables.client.model.GetTableResponseBody;
import com.linkedin.openhouse.tables.client.model.Policies;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.hadoop.fs.Path;
Expand All @@ -39,6 +43,7 @@
@Slf4j
@AllArgsConstructor
public class TablesClient {
private static final String MAINTENANCE_PROPERTY_PREFIX = "maintenance.";
private static final int REQUEST_TIMEOUT_SECONDS = 180;
private final RetryTemplate retryTemplate;
private final TableApi tableApi;
Expand Down Expand Up @@ -67,7 +72,8 @@ private Optional<RetentionConfig> getTableRetention(GetTableResponseBody respons
if (response.getTimePartitioning() != null) {
columnName = response.getTimePartitioning().getColumnName();
} else {
columnName = policies.getRetention().getColumnPattern().getColumnName();
columnName =
Objects.requireNonNull(policies.getRetention().getColumnPattern()).getColumnName();
columnPattern = policies.getRetention().getColumnPattern().getPattern();
}

Expand Down Expand Up @@ -95,11 +101,6 @@ protected GetTableResponseBody getTable(String dbName, String tableName) {
null);
}

public Map<String, String> getTableProperties(TableMetadata tableMetadata) {
GetTableResponseBody response = getTable(tableMetadata);
return response == null ? Collections.emptyMap() : response.getTableProperties();
}

/**
* Scans all databases and tables in the databases, converts Tables Service responses to {@link
* TableMetadata}, filters out using {@link DatabaseTableFilter}, and returns as a list.
Expand Down Expand Up @@ -137,6 +138,38 @@ public List<TableMetadata> getTableMetadataList() {
return tableMetadataList;
}

public List<TableDataLayoutMetadata> getTableDataLayoutMetadataList() {
List<TableDataLayoutMetadata> tableDataLayoutMetadataList = new ArrayList<>();
for (String dbName : getDatabases()) {
if (databaseFilter.applyDatabaseName(dbName)) {
tableDataLayoutMetadataList.addAll(
RetryUtil.executeWithRetry(
retryTemplate,
(RetryCallback<List<TableDataLayoutMetadata>, Exception>)
context -> {
GetAllTablesResponseBody response =
tableApi
.searchTablesV1(dbName)
.block(Duration.ofSeconds(REQUEST_TIMEOUT_SECONDS));
if (response == null) {
return Collections.emptyList();
}
return Optional.ofNullable(response.getResults())
.map(Collection::stream)
.orElseGet(Stream::empty)
.flatMap(
shallowResponseBody ->
mapTableResponseToTableDataLayoutMetadataList(shallowResponseBody)
.stream()
.filter(databaseFilter::apply))
.collect(Collectors.toList());
},
Collections.emptyList()));
}
}
return tableDataLayoutMetadataList;
}

/**
* For the given database name, get all registered tables
*
Expand Down Expand Up @@ -239,38 +272,84 @@ protected Optional<TableMetadata> mapTableResponseToTableMetadata(

TableMetadata.TableMetadataBuilder<?, ?> builder =
TableMetadata.builder()
.creator(tableResponseBody.getTableCreator())
.dbName(tableResponseBody.getDatabaseId())
.tableName(tableResponseBody.getTableId())
.creator(Objects.requireNonNull(tableResponseBody.getTableCreator()))
.dbName(Objects.requireNonNull(tableResponseBody.getDatabaseId()))
.tableName(Objects.requireNonNull(tableResponseBody.getTableId()))
.isPrimary(
tableResponseBody.getTableType()
== GetTableResponseBody.TableTypeEnum.PRIMARY_TABLE)
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
.isClustered(tableResponseBody.getClustering() != null)
.retentionConfig(getTableRetention(tableResponseBody).orElse(null));
if (tableResponseBody.getCreationTime() != null) {
builder.creationTimeMs(tableResponseBody.getCreationTime());
}
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody));
builder.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
return Optional.of(builder.build());
}

protected List<TableDataLayoutMetadata> mapTableResponseToTableDataLayoutMetadataList(
GetTableResponseBody shallowResponseBody) {
GetTableResponseBody tableResponseBody =
getTable(shallowResponseBody.getDatabaseId(), shallowResponseBody.getTableId());

if (tableResponseBody == null) {
log.error(
"Error while fetching metadata for table: {}.{}",
shallowResponseBody.getDatabaseId(),
shallowResponseBody.getTableCreator());
return Collections.emptyList();
}

TableDataLayoutMetadata.TableDataLayoutMetadataBuilder<?, ?> builder =
TableDataLayoutMetadata.builder()
.creator(Objects.requireNonNull(tableResponseBody.getTableCreator()))
.dbName(Objects.requireNonNull(tableResponseBody.getDatabaseId()))
.tableName(Objects.requireNonNull(tableResponseBody.getTableId()))
.isPrimary(
tableResponseBody.getTableType()
== GetTableResponseBody.TableTypeEnum.PRIMARY_TABLE)
.isTimePartitioned(tableResponseBody.getTimePartitioning() != null)
.isClustered(tableResponseBody.getClustering() != null)
.retentionConfig(getTableRetention(tableResponseBody).orElse(null))
.jobExecutionProperties(getJobExecutionProperties(tableResponseBody));
builder.creationTimeMs(Objects.requireNonNull(tableResponseBody.getCreationTime()));
List<TableDataLayoutMetadata> result = new ArrayList<>();
for (DataLayoutStrategy strategy : getDataLayoutStrategies(tableResponseBody)) {
result.add(builder.dataLayoutStrategy(strategy).build());
}
return result;
}

private @NonNull Map<String, String> getJobExecutionProperties(
GetTableResponseBody responseBody) {
if (responseBody.getTableProperties() == null) {
return Collections.emptyMap();
}
return responseBody.getTableProperties().entrySet().stream()
.filter(e -> e.getKey().startsWith(MAINTENANCE_PROPERTY_PREFIX))
.map(
e ->
new AbstractMap.SimpleEntry<>(
e.getKey().substring(MAINTENANCE_PROPERTY_PREFIX.length()), e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private List<DataLayoutStrategy> getDataLayoutStrategies(GetTableResponseBody tableResponseBody) {
Map<String, String> tableProps = tableResponseBody.getTableProperties();
if (tableProps == null
|| !tableProps.containsKey(StrategiesDaoTableProps.DATA_LAYOUT_STRATEGIES_PROPERTY_KEY)) {
return Collections.emptyList();
}
return StrategiesDaoTableProps.deserialize(
return StrategiesDaoTableProps.deserializeList(
tableProps.get(StrategiesDaoTableProps.DATA_LAYOUT_STRATEGIES_PROPERTY_KEY));
}

private String mapTableResponseToTableDirectoryName(GetTableResponseBody responseBody) {
TableMetadata metadata =
TableMetadata.builder()
.dbName(responseBody.getDatabaseId())
.tableName(responseBody.getTableId())
.dbName(Objects.requireNonNull(responseBody.getDatabaseId()))
.tableName(Objects.requireNonNull(responseBody.getTableId()))
.build();
String location = getTable(metadata).getTableLocation();
return new Path(location).getParent().getName();
return new Path(Objects.requireNonNull(location)).getParent().getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void run(

log.info("Fetching task list based on the job type: {}", jobType);
List<OperationTask<?>> taskList =
new OperationTasksBuilder(taskFactory, tablesClient).buildOperationTaskList(jobType);
new OperationTasksBuilder(taskFactory, tablesClient).buildOperationTaskList(jobType, METER);
if (isDryRun && jobType.equals(JobConf.JobTypeEnum.ORPHAN_DIRECTORY_DELETION)) {
log.info("Dry running {} jobs based on the job type: {}", taskList.size(), jobType);
for (OperationTask<?> operationTask : taskList) {
Expand Down Expand Up @@ -284,7 +284,7 @@ protected static CommandLine parseArgs(String[] args) {
Option.builder(null)
.required(false)
.hasArg()
.longOpt("cutoffHours")
.longOpt("tableMinAgeThresholdHours")
.desc("Time in hour for filtering older tables, defaults to 72")
.build());
options.addOption(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.datalayout.ranker.DataLayoutCandidateSelector;
import com.linkedin.openhouse.datalayout.ranker.DataLayoutStrategyScorer;
import com.linkedin.openhouse.datalayout.ranker.GreedyMaxBudgetCandidateSelector;
import com.linkedin.openhouse.datalayout.ranker.SimpleWeightedSumDataLayoutStrategyScorer;
import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import com.linkedin.openhouse.datalayout.strategy.ScoredDataLayoutStrategy;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.DirectoryMetadata;
import com.linkedin.openhouse.jobs.util.Metadata;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import io.opentelemetry.api.metrics.Meter;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -19,6 +28,11 @@
@Slf4j
@AllArgsConstructor
public class OperationTasksBuilder {
private static final double COMPUTE_COST_WEIGHT_DEFAULT = 0.3;
private static final double COMPACTION_GAIN_WEIGHT_DEFAULT = 0.7;
private static final double MAX_COST_BUDGET_GB_HOURS_DEFAULT = 1000.0;
private static final int MAX_STRATEGIES_COUNT_DEFAULT = 10;

@Getter(AccessLevel.NONE)
private final OperationTaskFactory<? extends OperationTask<?>> taskFactory;

Expand All @@ -37,6 +51,51 @@ private List<OperationTask<?>> prepareTableDirectoryOperationTaskList(
return processMetadataList(directoryMetadataList, jobType);
}

private List<OperationTask<?>> prepareDataLayoutOperationTaskList(
JobConf.JobTypeEnum jobType, Meter meter) {
List<TableDataLayoutMetadata> tableDataLayoutMetadataList =
tablesClient.getTableDataLayoutMetadataList();
log.info("Fetched metadata for {} data layout strategies", tableDataLayoutMetadataList.size());
List<DataLayoutStrategy> strategies =
tableDataLayoutMetadataList.stream()
.map(TableDataLayoutMetadata::getDataLayoutStrategy)
.collect(Collectors.toList());
DataLayoutStrategyScorer scorer =
new SimpleWeightedSumDataLayoutStrategyScorer(
COMPACTION_GAIN_WEIGHT_DEFAULT, COMPUTE_COST_WEIGHT_DEFAULT);
List<ScoredDataLayoutStrategy> scoredStrategies = scorer.scoreDataLayoutStrategies(strategies);
DataLayoutCandidateSelector candidateSelector =
new GreedyMaxBudgetCandidateSelector(
MAX_COST_BUDGET_GB_HOURS_DEFAULT, MAX_STRATEGIES_COUNT_DEFAULT);
List<Integer> selectedStrategyIndices = candidateSelector.select(scoredStrategies);
log.info("Selected {} strategies", selectedStrategyIndices.size());
List<TableDataLayoutMetadata> selectedTableDataLayoutMetadataList =
selectedStrategyIndices.stream()
.map(tableDataLayoutMetadataList::get)
.collect(Collectors.toList());
double totalComputeCost =
selectedTableDataLayoutMetadataList.stream()
.map(m -> m.getDataLayoutStrategy().getCost())
.reduce(0.0, Double::sum);
double totalReducedFileCount =
selectedTableDataLayoutMetadataList.stream()
.map(m -> m.getDataLayoutStrategy().getGain())
.reduce(0.0, Double::sum);
log.info(
"Total estimated compute cost: {}, total estimated reduced file count: {}",
totalComputeCost,
totalReducedFileCount);
meter
.counterBuilder("data_layout_optimization_estimated_compute_cost")
.build()
.add((long) totalComputeCost);
meter
.counterBuilder("data_layout_optimization_estimated_reduced_file_count")
.build()
.add((long) totalReducedFileCount);
return processMetadataList(selectedTableDataLayoutMetadataList, jobType);
}

private List<OperationTask<?>> processMetadataList(
List<? extends Metadata> metadataList, JobConf.JobTypeEnum jobType) {
List<OperationTask<?>> taskList = new ArrayList<>();
Expand All @@ -62,7 +121,7 @@ private List<OperationTask<?>> processMetadataList(
/**
* Fetches tables and associated metadata from Tables Service, and builds the operation task list.
*/
public List<OperationTask<?>> buildOperationTaskList(JobConf.JobTypeEnum jobType) {
public List<OperationTask<?>> buildOperationTaskList(JobConf.JobTypeEnum jobType, Meter meter) {
switch (jobType) {
case DATA_COMPACTION:
case NO_OP:
Expand All @@ -74,6 +133,8 @@ public List<OperationTask<?>> buildOperationTaskList(JobConf.JobTypeEnum jobType
case STAGED_FILES_DELETION:
case DATA_LAYOUT_STRATEGY_GENERATION:
return prepareTableOperationTaskList(jobType);
case DATA_LAYOUT_STRATEGY_EXECUTION:
return prepareDataLayoutOperationTaskList(jobType, meter);
case ORPHAN_DIRECTORY_DELETION:
return prepareTableDirectoryOperationTaskList(jobType);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @see <a href="https://iceberg.apache.org/docs/latest/maintenance/#compact-data-files">Compact
* data files</a>
*/
public class TableDataCompactionTask extends TableOperationTask {
public class TableDataCompactionTask extends TableOperationTask<TableMetadata> {
public static final JobConf.JobTypeEnum OPERATION_TYPE = JobConf.JobTypeEnum.DATA_COMPACTION;

protected TableDataCompactionTask(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoTableProps;
import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata;
import java.util.Arrays;
import java.util.List;

public class TableDataLayoutStrategyExecutionTask
extends TableOperationTask<TableDataLayoutMetadata> {
public static final JobConf.JobTypeEnum OPERATION_TYPE =
JobConf.JobTypeEnum.DATA_LAYOUT_STRATEGY_EXECUTION;

protected TableDataLayoutStrategyExecutionTask(
JobsClient jobsClient, TablesClient tablesClient, TableDataLayoutMetadata metadata) {
super(jobsClient, tablesClient, metadata);
}

@Override
public JobConf.JobTypeEnum getType() {
return OPERATION_TYPE;
}

@Override
protected List<String> getArgs() {
return Arrays.asList(
"--tableName",
metadata.fqtn(),
"--strategy",
StrategiesDaoTableProps.serialize(metadata.getDataLayoutStrategy()));
}

@Override
protected boolean shouldRun() {
return metadata.isPrimary() && (metadata.isTimePartitioned() || metadata.isClustered());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import java.util.List;

/** A task to generate data layout strategies for a table. */
public class TableDataLayoutStrategyGenerationTask extends TableOperationTask {
public class TableDataLayoutStrategyGenerationTask extends TableOperationTask<TableMetadata> {
public static final JobConf.JobTypeEnum OPERATION_TYPE =
JobConf.JobTypeEnum.DATA_LAYOUT_STRATEGY_GENERATION;

Expand Down
Loading

0 comments on commit de704cd

Please sign in to comment.