Skip to content

Commit

Permalink
Improve the Parallel processing example in many ways
Browse files Browse the repository at this point in the history
  • Loading branch information
Tremeschin committed Aug 5, 2024
1 parent b5d8a4a commit f6dd1c1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 47 deletions.
92 changes: 49 additions & 43 deletions Examples/Parallel.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,71 @@
import math
import multiprocessing
import time
from multiprocessing import Process
from multiprocessing import Manager, Process
from pathlib import Path
from threading import Thread
from typing import List

from attr import Factory, define
from DepthFlow import DepthScene

from Broken.Externals.Depthmap import DepthAnythingV2
from Broken.Externals.Depthmap import DepthAnythingV2, DepthEstimator

# Sharing objects between processes
MANAGER = Manager()

class MultiDepth(DepthScene):
class YourScene(DepthScene):
def update(self):
self.state.offset_x = math.sin(2*self.cycle)
self.state.offset_x = math.sin(2 * self.cycle)
self.state.isometric = 1

@define
class Logic:

IMAGES = "/home/tremeschin/Documents/Wallpapers"
THREADS = list()
MAX_THREADS = 4
MANAGER = multiprocessing.Manager()
OUTPUTS = MANAGER.list()
# Process management
outputs: List[Path] = MANAGER.list()
workers: List[Process] = list()
max_workers: int = 4

class Main:
def __init__(self) -> None:
estimator = DepthAnythingV2()
# DepthFlow objects
estimator: DepthEstimator = Factory(DepthAnythingV2)

def worker(self, image: Path):
process = Process(target=self.render, args=(image,))
process.start()
process.join()

for image in Path(IMAGES).iterdir():
def run(self, image: Path):

# Cache depth map
estimator.estimate(image)
# Optimization: Cache depth map, so the worker doesn't have to load it;
# Note: The estimator must match the one used in the worker
self.estimator.estimate(image)

# Wait for free thread
while len(THREADS) >= MAX_THREADS:
for index, thread in enumerate(THREADS):
if thread.is_alive():
continue
THREADS.pop(index)
break
time.sleep(0.05)
# Limit the maximum concurrent processes
while len(self.workers) >= self.max_workers:
self.workers = list(filter(lambda x: x.is_alive(), self.workers))
time.sleep(0.05)

thread = Thread(target=worker, args=(self, image))
THREADS.append(thread)
thread.start()
# Create and add a new running process
process = Process(target=self._worker, args=(image,))
self.workers.append(process)
process.start()

for thread in THREADS:
thread.join()

for output in OUTPUTS:
print(f"• {output}")
def join(self):
for process in self.workers:
process.join()

def render(self, image: Path):
scene = MultiDepth()
def _worker(self, image: Path):
scene = YourScene()
scene.input(image=image)
video = scene.main(output=image.with_suffix(".mp4").name, time=5)[0]
video = scene.main(
output=image.with_suffix(".mp4").name,
time=5, fps=30
)[0]
scene.window.destroy()
OUTPUTS.append(video)
self.outputs.append(video)

if __name__ == "__main__":
Main()
IMAGES = Path("/home/tremeschin/Documents/Wallpapers")
logic = Logic()

for image in IMAGES.glob("*"):
logic.run(image)

logic.join()

for output in logic.outputs:
print(f"• {output}")
13 changes: 9 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
[project.urls]
issues = "https://github.com/BrokenSource/DepthFlow/issues"
repository = "https://github.com/BrokenSource/DepthFlow"
documentation = "https://brokensrc.dev/depthflow"
homepage = "https://brokensrc.dev"
Changelog = "https://brokensrc.dev/news/category/releases"
GitHub = "https://github.com/BrokenSource/DepthFlow"
Funding = "https://brokensrc.dev/about/sponsors"
Contact = "https://brokensrc.dev/about/contact"
Credits = "https://brokensrc.dev/about/credits"
License = "https://brokensrc.dev/about/license"
Download = "https://brokensrc.dev/get/releases"
Docs = "https://brokensrc.dev/depthflow"
Homepage = "https://brokensrc.dev"

[project]
name = "depthflow"
Expand Down

0 comments on commit f6dd1c1

Please sign in to comment.