Skip to content

Use standard thread library #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from concurrent.futures import (
Future,
)
from threading import Thread
from typing import Callable, Optional, Union

from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.queue import cancel_items_in_queue
from executorlib.standalone.serialize import cloudpickle_register
from executorlib.standalone.thread import RaisingThread


class ExecutorBase(FutureExecutor):
Expand All @@ -29,7 +29,7 @@ def __init__(self, max_cores: Optional[int] = None):
cloudpickle_register(ind=3)
self._max_cores = max_cores
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[RaisingThread, list[RaisingThread]]] = None
self._process: Optional[Union[Thread, list[Thread]]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify exception handling behavior with standard Thread.

The switch from RaisingThread to standard Thread might affect how exceptions are propagated from worker threads to the main thread. The standard Thread class doesn't automatically propagate exceptions to the main thread.


🏁 Script executed:

#!/bin/bash
# Search for exception handling patterns in the codebase
rg -A 5 "try:|except:|finally:" --type py

Length of output: 11463


🏁 Script executed:

#!/bin/bash
# Search for any remaining references to RaisingThread in the repository
rg "RaisingThread"

Length of output: 477


Attention: Inconsistent Thread Type References May Affect Exception Propagation

The update in executorlib/base/executor.py correctly changes the _process annotation to use the standard Thread. However, several parts of the codebase still reference RaisingThread (e.g., docstrings and method signatures in executorlib/base/executor.py and executorlib/interactive/shared.py). This inconsistency could lead to confusion regarding exception propagation—since the standard Thread does not automatically propagate exceptions to the main thread as RaisingThread might have.

  • Action Items:
    • Ensure that exception handling with the standard Thread is explicitly managed if propagating errors is required.
    • Update all references (docstrings, type annotations, and method signatures) from RaisingThread to Thread (or alternatively, reintroduce RaisingThread functionality where needed) for consistency.


@property
def info(self) -> Optional[dict]:
Expand All @@ -40,13 +40,13 @@ def info(self) -> Optional[dict]:
Optional[dict]: Information about the executor.
"""
if self._process is not None and isinstance(self._process, list):
meta_data_dict = self._process[0].get_kwargs().copy()
meta_data_dict = self._process[0]._kwargs.copy() # type: ignore
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
meta_data_dict["max_workers"] = len(self._process)
return meta_data_dict
elif self._process is not None:
meta_data_dict = self._process.get_kwargs().copy()
meta_data_dict = self._process._kwargs.copy() # type: ignore
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
return meta_data_dict
Expand Down Expand Up @@ -138,13 +138,13 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
cancel_items_in_queue(que=self._future_queue)
if self._process is not None and self._future_queue is not None:
self._future_queue.put({"shutdown": True, "wait": wait})
if wait and isinstance(self._process, RaisingThread):
if wait and isinstance(self._process, Thread):
self._process.join()
self._future_queue.join()
self._process = None
self._future_queue = None

def _set_process(self, process: RaisingThread):
def _set_process(self, process: Thread):
"""
Set the process for the executor.

Expand Down
4 changes: 2 additions & 2 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from threading import Thread
from typing import Callable, Optional

from executorlib.base.executor import ExecutorBase
Expand All @@ -15,7 +16,6 @@
check_max_workers_and_cores,
check_nested_flux_executor,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.cache.queue_spawner import execute_with_pysqa
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(
cache_directory_path = os.path.abspath(cache_directory)
os.makedirs(cache_directory_path, exist_ok=True)
self._set_process(
RaisingThread(
Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": self._future_queue,
Expand Down
6 changes: 4 additions & 2 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ def execute_tasks_h5(
]
else:
if len(future_wait_key_lst) > 0:
raise ValueError(
"Future objects are not supported as input if disable_dependencies=True."
task_dict["future"].set_exception(
ValueError(
"Future objects are not supported as input if disable_dependencies=True."
)
)
task_dependent_lst = []
process_dict[task_key] = execute_function(
Expand Down
4 changes: 2 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from concurrent.futures import Future
from threading import Thread
from typing import Any, Callable, Optional

from executorlib.base.executor import ExecutorBase
Expand All @@ -8,7 +9,6 @@
generate_nodes_and_edges,
generate_task_hash,
)
from executorlib.standalone.thread import RaisingThread


class ExecutorWithDependencies(ExecutorBase):
Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(
) -> None:
super().__init__(max_cores=max_cores)
self._set_process(
RaisingThread(
Thread(
target=execute_tasks_with_dependencies,
kwargs={
# Executor Arguments
Expand Down
36 changes: 18 additions & 18 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from asyncio.exceptions import CancelledError
from concurrent.futures import Future, TimeoutError
from threading import Thread
from time import sleep
from typing import Any, Callable, Optional, Union

Expand All @@ -20,7 +21,6 @@
)
from executorlib.standalone.interactive.spawner import BaseSpawner, MpiExecSpawner
from executorlib.standalone.serialize import serialize_funct_h5
from executorlib.standalone.thread import RaisingThread


class ExecutorBroker(ExecutorBase):
Expand Down Expand Up @@ -89,7 +89,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False):
self._process = None
self._future_queue = None

def _set_process(self, process: list[RaisingThread]): # type: ignore
def _set_process(self, process: list[Thread]): # type: ignore
"""
Set the process for the executor.

Expand Down Expand Up @@ -149,7 +149,7 @@ def __init__(
executor_kwargs["queue_join_on_shutdown"] = False
self._set_process(
process=[
RaisingThread(
Thread(
target=execute_parallel_tasks,
kwargs=executor_kwargs,
)
Expand Down Expand Up @@ -205,7 +205,7 @@ def __init__(
executor_kwargs["max_cores"] = max_cores
executor_kwargs["max_workers"] = max_workers
self._set_process(
RaisingThread(
Thread(
target=execute_separate_tasks,
kwargs=executor_kwargs,
)
Expand Down Expand Up @@ -363,17 +363,18 @@ def execute_tasks_with_dependencies(
):
future_lst, ready_flag = _get_future_objects_from_input(task_dict=task_dict)
exception_lst = _get_exception_lst(future_lst=future_lst)
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
executor_queue.put(task_dict)
else: # Otherwise add the function to the wait list
task_dict["future_lst"] = future_lst
wait_lst.append(task_dict)
if not _get_exception(future_obj=task_dict["future"]):
if len(exception_lst) > 0:
task_dict["future"].set_exception(exception_lst[0])
elif len(future_lst) == 0 or ready_flag:
# No future objects are used in the input or all future objects are already done
task_dict["args"], task_dict["kwargs"] = _update_futures_in_input(
args=task_dict["args"], kwargs=task_dict["kwargs"]
)
executor_queue.put(task_dict)
else: # Otherwise add the function to the wait list
task_dict["future_lst"] = future_lst
wait_lst.append(task_dict)
future_queue.task_done()
elif len(wait_lst) > 0:
number_waiting = len(wait_lst)
Expand Down Expand Up @@ -589,7 +590,7 @@ def _submit_function_to_separate_process(
"init_function": None,
}
)
process = RaisingThread(
process = Thread(
target=execute_parallel_tasks,
kwargs=task_kwargs,
)
Expand All @@ -610,14 +611,13 @@ def _execute_task(
future_queue (Queue): Queue for receiving new tasks.
"""
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
if not f.done() and f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()

Expand Down
2 changes: 0 additions & 2 deletions executorlib/standalone/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
interface_shutdown,
)
from executorlib.standalone.interactive.spawner import MpiExecSpawner
from executorlib.standalone.thread import RaisingThread

__all__ = [
"SocketInterface",
Expand All @@ -16,6 +15,5 @@
"interface_send",
"interface_shutdown",
"interface_receive",
"RaisingThread",
"MpiExecSpawner",
]
42 changes: 0 additions & 42 deletions executorlib/standalone/thread.py

This file was deleted.

11 changes: 6 additions & 5 deletions tests/test_cache_executor_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from queue import Queue
import shutil
import unittest
from threading import Thread

from executorlib.cache.subprocess_spawner import (
execute_in_subprocess,
terminate_subprocess,
)
from executorlib.standalone.thread import RaisingThread

try:
from executorlib.cache.executor import FileExecutor, create_file_executor
Expand Down Expand Up @@ -57,7 +57,8 @@ def test_executor_dependence_error(self):
with FileExecutor(
execute_function=execute_in_subprocess, disable_dependencies=True
) as exe:
exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2))
fs = exe.submit(my_funct, 1, b=exe.submit(my_funct, 1, b=2))
fs.result()

def test_executor_working_directory(self):
cwd = os.path.join(os.path.dirname(__file__), "executables")
Expand All @@ -81,7 +82,7 @@ def test_executor_function(self):
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
process = Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": q,
Expand Down Expand Up @@ -122,7 +123,7 @@ def test_executor_function_dependence_kwargs(self):
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
process = Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": q,
Expand Down Expand Up @@ -163,7 +164,7 @@ def test_executor_function_dependence_args(self):
)
cache_dir = os.path.abspath("cache")
os.makedirs(cache_dir, exist_ok=True)
process = RaisingThread(
process = Thread(
target=execute_tasks_h5,
kwargs={
"future_queue": q,
Expand Down
Loading
Loading