Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkov/common/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ParallelizationType(str, Enum):
SPAWN = "spawn"
THREAD = "thread"
NONE = "none"
FORK_WITH_POOL = "fork_with_pool"

def __str__(self) -> str:
# needed, because of a Python 3.11 change
Expand Down
50 changes: 49 additions & 1 deletion checkov/common/parallelizer/parallel_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import platform
from collections.abc import Iterator, Iterable
from multiprocessing.pool import Pool
from typing import Any, List, Generator, Callable, Optional, TypeVar, TYPE_CHECKING
from typing import Any, List, Generator, Callable, Optional, TypeVar, TYPE_CHECKING, Tuple

from checkov.common.models.enums import ParallelizationType

Expand All @@ -23,6 +23,14 @@ def __init__(self, internal_exception: Exception) -> None:
super().__init__(internal_exception)


def _fork_pool_worker(args: Tuple[Callable[[Any], _T], Any]) -> _T:
"""Wrapper so we can handle both single args and tuple-args."""
func, item = args
if isinstance(item, tuple):
return func(*item)
return func(item)


class ParallelRunner:
def __init__(
self, workers_number: int | None = None,
Expand Down Expand Up @@ -68,6 +76,8 @@ def run_function(
return self._run_function_multiprocess_fork(func, items, group_size)
elif self.type == ParallelizationType.SPAWN:
return self._run_function_multiprocess_spawn(func, items, group_size)
elif self.type == ParallelizationType.FORK_WITH_POOL:
return self._run_function_multiprocess_fork_with_pool(func, items, group_size)
else:
return self._run_function_sequential(func, items)

Expand Down Expand Up @@ -121,6 +131,44 @@ def func_wrapper(original_func: Callable[[Any], _T], items_group: List[Any], con
except EOFError:
pass

def _run_function_multiprocess_fork_with_pool(
self,
func: Callable[[Any], _T],
items: list[Any],
group_size: Optional[int],
) -> Generator[_T, None, None]:
import multiprocessing as mp
import time

start = time.time()
print(f"in _run_function_multiprocess_fork with workers_number={self.workers_number}, items={len(items)}")

if not items:
return

ctx = mp.get_context("fork")

# Use at most len(items) workers
workers = max(1, min(self.workers_number, len(items)))

if not group_size:
# rule of thumb: a few chunks per worker
group_size = 8

print(f"using {workers} workers, chunksize={group_size}")

# Pool takes care of all the process / queue management for us
with ctx.Pool(processes=workers) as pool:
for result in pool.imap_unordered(
_fork_pool_worker,
((func, item) for item in items),
chunksize=group_size,
):
yield result

end = time.time()
print(f"Elapsed in fork (Pool): {end - start:.3f} seconds")

def _run_function_multiprocess_spawn(
self, func: Callable[[Any], _T], items: list[Any], group_size: int | None
) -> Iterable[_T]:
Expand Down
Loading