Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ranking and scheduling of DLO strategies #219

Merged
merged 9 commits into from
Oct 7, 2024

Conversation

teamurko
Copy link
Collaborator

@teamurko teamurko commented Oct 3, 2024

Summary

Support DataLayout strategies ranking and execution

  • Fetch strategies and add to metadata
  • Score, rank strategies and execute strategies utilizing existing DataCompaction app

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

Create test table

scala> spark.sql("create table openhouse.db.test (id int, data string, ts timestamp) partitioned by (days(ts))")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("insert into openhouse.db.test values (0, '0', current_timestamp()), (1, '1', current_timestamp())")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("insert into openhouse.db.test values (2, '2', current_timestamp()), (3, '3', current_timestamp())")
res3: org.apache.spark.sql.DataFrame = []

scala> spark.sql("insert into openhouse.db.test values (4, '4', current_timestamp()), (5, '5', current_timestamp())")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select file_path from openhouse.db.test.data_files").show(200, false)
+------------------------------------------------------------------------------------------------------------------------------------------+
|file_path                                                                                                                                 |
+------------------------------------------------------------------------------------------------------------------------------------------+
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00000-1-7b099167-c0e8-405b-9fc7-2b0047647f5e-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00001-2-4668d138-dda6-4787-a734-7f9427124b79-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00000-2-e7f95684-0dd7-4af6-9d88-6214fc50593c-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00001-3-15597ffe-d676-4388-b1b9-40cb42d3bba7-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00000-0-32aed73f-21d4-4b2d-b7bc-1563ba7fd78d-00001.orc|
|/data/openhouse/db/test-c5afc2e3-13d7-48b6-9ad4-fd606ba67206/data/ts_day=2024-10-03/00001-1-26f2bca1-c70f-45a5-9215-3fdae39b3758-00001.orc|
+------------------------------------------------------------------------------------------------------------------------------------------+

Run OFD and check that it starts the job

docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type ORPHAN_FILES_DELETION --cluster local --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 --tableMinAgeThresholdHours 0


2024-10-03 13:28:17 INFO  JobsScheduler:110 - Starting scheduler
2024-10-03 13:28:18 INFO  JobsScheduler:134 - Fetching task list based on the job type: ORPHAN_FILES_DELETION
2024-10-03 13:28:19 INFO  OperationTasksBuilder:37 - Fetched metadata for 1 tables
2024-10-03 13:28:19 INFO  OperationTasksBuilder:73 - Found metadata TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:28:19 INFO  JobsScheduler:144 - Submitting and running 1 jobs based on the job type: ORPHAN_FILES_DELETION
2024-10-03 13:28:19 INFO  OperationTask:67 - Launching job for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:28:25 INFO  OperationTask:93 - Launched a job with id ORPHAN_FILES_DELETION_db_test_b00e4ff2-7d51-42bf-8059-7fba3db354f3 for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
...

Run DLO generation and check that it starts the job

docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type DATA_LAYOUT_STRATEGY_GENERATION --cluster local --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 --tableMinAgeThresholdHours 0

2024-10-03 13:38:16 INFO  JobsScheduler:110 - Starting scheduler
2024-10-03 13:38:16 INFO  JobsScheduler:134 - Fetching task list based on the job type: DATA_LAYOUT_STRATEGY_GENERATION
2024-10-03 13:38:17 INFO  OperationTasksBuilder:37 - Fetched metadata for 1 tables
2024-10-03 13:38:17 INFO  OperationTasksBuilder:73 - Found metadata TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:38:17 INFO  JobsScheduler:144 - Submitting and running 1 jobs based on the job type: DATA_LAYOUT_STRATEGY_GENERATION
2024-10-03 13:38:17 INFO  OperationTask:67 - Launching job for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
2024-10-03 13:38:18 INFO  OperationTask:93 - Launched a job with id DATA_LAYOUT_STRATEGY_GENERATION_db_test_4229af93-1dfb-40ff-af14-0e2c0483ffbe for TableMetadata(dbName=db, tableName=test, creationTimeMs=1727929659896, isPrimary=true, isTimePartitioned=true, isClustered=false, retentionConfig=null)
...

Show strategies

scala> spark.sql("show tblproperties openhouse.db.test ('write.data-layout.strategies')").show(200, false)
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|key                         |value                                                                                                                                                                                                                                                                                                                                          |
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|write.data-layout.strategies|[{"score":5.999994214381393,"entropy":2.77080849235781824E17,"cost":0.5000004821353489,"gain":3.0,"config":{"targetByteSize":526385152,"minByteSizeRatio":0.75,"maxByteSizeRatio":10.0,"minInputFiles":5,"maxConcurrentFileGroupRewrites":5,"partialProgressEnabled":true,"partialProgressMaxCommits":1,"maxFileGroupSizeBytes":107374182400}}]|
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Run DLO execution and check that the job completes

docker compose --profile with_jobs_scheduler run openhouse-jobs-scheduler - --type DATA_LAYOUT_STRATEGY_EXECUTION --cluster local --tablesURL http://openhouse-tables:8080 --jobsURL http://openhouse-jobs:8080 --tableMinAgeThresholdHours 0

2024-10-03 14:18:18 INFO  JobsScheduler:110 - Starting scheduler
2024-10-03 14:18:18 INFO  JobsScheduler:134 - Fetching task list based on the job type: DATA_LAYOUT_STRATEGY_EXECUTION
2024-10-03 14:18:19 INFO  OperationTasksBuilder:51 - Fetched metadata for 1 data layout strategies
2024-10-03 14:18:19 INFO  OperationTasksBuilder:61 - Selected 1 strategies
2024-10-03 14:18:19 INFO  OperationTasksBuilder:73 - Found metadata TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
2024-10-03 14:18:19 INFO  JobsScheduler:144 - Submitting and running 1 jobs based on the job type: DATA_LAYOUT_STRATEGY_EXECUTION
2024-10-03 14:18:19 INFO  OperationTask:67 - Launching job for TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
2024-10-03 14:18:20 INFO  OperationTask:93 - Launched a job with id DATA_LAYOUT_STRATEGY_EXECUTION_db_test_e0fe80e4-f0b7-4fac-a10f-f870dabf3937 for TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400)))
...
2024-10-03 14:23:20 INFO  OperationTask:139 - Finished job for entity TableDataLayoutMetadata(dataLayoutStrategy=DataLayoutStrategy(score=5.999994214381393, entropy=2.7708084923578182E17, cost=0.5000004821353489, gain=3.0, config=DataCompactionConfig(targetByteSize=526385152, minByteSizeRatio=0.75, maxByteSizeRatio=10.0, minInputFiles=5, maxConcurrentFileGroupRewrites=5, partialProgressEnabled=true, partialProgressMaxCommits=1, maxFileGroupSizeBytes=107374182400))): JobId DATA_LAYOUT_STRATEGY_EXECUTION_db_test_e0fe80e4-f0b7-4fac-a10f-f870dabf3937, executionId 3, runTime 16588, queuedTime 12461, state SUCCEEDED
2024-10-03 14:23:20 INFO  JobsScheduler:187 - Finishing scheduler for job type DATA_LAYOUT_STRATEGY_EXECUTION, tasks stats: 1 created, 1 succeeded, 0 cancelled (timeout), 0 failed, 0 skipped (no state)

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

For all the boxes checked, include additional details of the changes made in this pull request.

anjagruenheid
anjagruenheid previously approved these changes Oct 7, 2024
Copy link
Collaborator

@sumedhsakdeo sumedhsakdeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @teamurko for the PR. Overall this looks great. Few minor comments.

Copy link
Collaborator

@sumedhsakdeo sumedhsakdeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit.

@teamurko teamurko merged commit de704cd into linkedin:main Oct 7, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants