[Data] Ray Data logs are garbled when using concurrent datasets #48633
Open
Description
What happened + What you expected to happen
See repro.
One solution is to write each streaming executor's logs to a separate file (preferably in a file with the executor/dataset's name).
Versions / Dependencies
Reproduction script
import ray
@ray.remote
def f():
ray.data.range(1).materialize()
ray.get([f.remote() for _ in range(2)])
2024-11-07 10:35:27,233 DEBUG util.py:203 -- Autodetected parallelism=20 based on estimated_available_cpus=10 and estimated_data_size=8.
2024-11-07 10:35:27,233 DEBUG util.py:203 -- Autodetected parallelism=20 based on estimated_available_cpus=10 and estimated_data_size=8.
2024-11-07 10:35:27,520 DEBUG util.py:203 -- Autodetected parallelism=20 based on estimated_available_cpus=10 and estimated_data_size=8.
2024-11-07 10:35:27,520 DEBUG util.py:203 -- Autodetected parallelism=20 based on estimated_available_cpus=10 and estimated_data_size=8.
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:34 -- Expected in-memory size 8, block size 8.0
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:42 -- Size based split factor 1
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:44 -- Blocks after size splits 1
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:34 -- Expected in-memory size 8, block size 8.0
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:42 -- Size based split factor 1
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:44 -- Blocks after size splits 1
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:117 -- Using autodetected parallelism=20 for operator ReadRange to satisfy parallelism at least twice the available number of CPUs (10).
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:117 -- Using autodetected parallelism=20 for operator ReadRange to satisfy parallelism at least twice the available number of CPUs (10).
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:124 -- To satisfy the requested parallelism of 20, each read task output is split into 20 smaller blocks.
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:124 -- To satisfy the requested parallelism of 20, each read task output is split into 20 smaller blocks.
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:132 -- Estimated num output blocks 20
2024-11-07 10:35:27,520 DEBUG set_read_parallelism.py:132 -- Estimated num output blocks 20
2024-11-07 10:35:27,520 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-07_10-35-25_574208_73812/logs/ray-data
2024-11-07 10:35:27,520 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-07_10-35-25_574208_73812/logs/ray-data
2024-11-07 10:35:27,520 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange]
2024-11-07 10:35:27,520 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange]
2024-11-07 10:35:27,521 DEBUG streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=inf, gpu=inf, object_store_memory=inf), exclude_resources=ExecutionResources(cpu=0.0, gpu=0.0, object_store_memory=0.0B), locality_with_output=False, preserve_order=False, actor_locality_enabled=False, verbose_progress=True)
2024-11-07 10:35:27,521 DEBUG streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=inf, gpu=inf, object_store_memory=inf), exclude_resources=ExecutionResources(cpu=0.0, gpu=0.0, object_store_memory=0.0B), locality_with_output=False, preserve_order=False, actor_locality_enabled=False, verbose_progress=True)
2024-11-07 10:35:27,523 DEBUG concurrency_cap_backpressure_policy.py:37 -- ConcurrencyCapBackpressurePolicy initialized with: {InputDataBuffer[Input]: inf, InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange]: inf}
2024-11-07 10:35:27,523 DEBUG concurrency_cap_backpressure_policy.py:37 -- ConcurrencyCapBackpressurePolicy initialized with: {InputDataBuffer[Input]: inf, InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange]: inf}
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:496 -- Operator Metrics:
Input: {'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_outputs_taken': 1, 'bytes_outputs_taken': 2740, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
ReadRange->SplitBlocks(20): {'average_num_outputs_per_task': None, 'average_bytes_per_output': None, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 2740, 'average_bytes_inputs_per_task': 2740.0, 'average_bytes_outputs_per_task': None, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 2740, 'num_task_outputs_generated': 0, 'bytes_task_outputs_generated': 0, 'rows_task_outputs_generated': 0, 'num_outputs_taken': 0, 'bytes_outputs_taken': 0, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 1, 'num_tasks_running': 1, 'num_tasks_have_outputs': 0, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 0, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 268435456, 'cpu_usage': 1, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:496 -- Operator Metrics:
Input: {'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_outputs_taken': 1, 'bytes_outputs_taken': 2740, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
ReadRange->SplitBlocks(20): {'average_num_outputs_per_task': None, 'average_bytes_per_output': None, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 2740, 'average_bytes_inputs_per_task': 2740.0, 'average_bytes_outputs_per_task': None, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 2740, 'num_task_outputs_generated': 0, 'bytes_task_outputs_generated': 0, 'rows_task_outputs_generated': 0, 'num_outputs_taken': 0, 'bytes_outputs_taken': 0, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 1, 'num_tasks_running': 1, 'num_tasks_have_outputs': 0, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 0, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 268435456, 'cpu_usage': 1, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:479 -- Execution Progress:
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:481 -- 0: - Input: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store, Blocks Outputted: 1/1
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:481 -- 1: - ReadRange->SplitBlocks(20): Tasks: 1; Queued blocks: 0; Resources: 1.0 CPU, 256.0MB object store, Blocks Outputted: 0/None
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:479 -- Execution Progress:
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] completed. Operator Metrics:
{'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_outputs_taken': 1, 'bytes_outputs_taken': 2740, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
2024-11-07 10:35:27,526 DEBUG streaming_executor.py:481 -- 0: - Input: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store, Blocks Outputted: 1/1
2024-11-07 10:35:27,527 DEBUG streaming_executor.py:481 -- 1: - ReadRange->SplitBlocks(20): Tasks: 1; Queued blocks: 0; Resources: 1.0 CPU, 256.0MB object store, Blocks Outputted: 0/None
2024-11-07 10:35:27,527 DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] completed. Operator Metrics:
{'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_outputs_taken': 1, 'bytes_outputs_taken': 2740, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0}
2024-11-07 10:35:27,797 DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] completed. Operator Metrics:
{'average_num_outputs_per_task': 20.0, 'average_bytes_per_output': 0.4, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'average_bytes_inputs_per_task': 2740.0, 'average_bytes_outputs_per_task': 8.0, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_task_inputs_processed': 1, 'bytes_task_inputs_processed': 2740, 'bytes_inputs_of_submitted_tasks': 2740, 'num_task_outputs_generated': 20, 'bytes_task_outputs_generated': 8, 'rows_task_outputs_generated': 1, 'num_outputs_taken': 20, 'bytes_outputs_taken': 8, 'num_outputs_of_finished_tasks': 20, 'bytes_outputs_of_finished_tasks': 8, 'num_tasks_submitted': 1, 'num_tasks_running': 0, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 1, 'num_tasks_failed': 0, 'block_generation_time': 0.0036488329999997404, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_freed': 2740, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 8, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
2024-11-07 10:35:27,800 DEBUG streaming_executor.py:182 -- Shutting down <StreamingExecutor(StreamingExecutor-ae0ae63c8283473598b424e3f4c21af5, stopped daemon 12930002944)>.
2024-11-07 10:35:27,801 DEBUG streaming_executor.py:338 -- Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] completed. Operator Metrics:
{'average_num_outputs_per_task': 20.0, 'average_bytes_per_output': 0.4, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'average_bytes_inputs_per_task': 2740.0, 'average_bytes_outputs_per_task': 8.0, 'num_inputs_received': 1, 'bytes_inputs_received': 2740, 'num_task_inputs_processed': 1, 'bytes_task_inputs_processed': 2740, 'bytes_inputs_of_submitted_tasks': 2740, 'num_task_outputs_generated': 20, 'bytes_task_outputs_generated': 8, 'rows_task_outputs_generated': 1, 'num_outputs_taken': 20, 'bytes_outputs_taken': 8, 'num_outputs_of_finished_tasks': 20, 'bytes_outputs_of_finished_tasks': 8, 'num_tasks_submitted': 1, 'num_tasks_running': 0, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 1, 'num_tasks_failed': 0, 'block_generation_time': 0.003505416999999289, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_freed': 2740, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 0, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
2024-11-07 10:35:27,808 DEBUG streaming_executor.py:182 -- Shutting down <StreamingExecutor(StreamingExecutor-c5306ff2dc5048a3b965c6a33c4be927, stopped daemon 6241726464)>.
Issue Severity
None