Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new examples : a parallel pipeline for image processing(resize) #1755

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions examples/image-processing/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sys import stdout

from loguru import logger
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not worth it to use a third-party logger for a simple example? Let's stick to the standard library.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i also changed the loguru with the standard logging library.


log_format = [
'<W><k>{time: YYYY-MM-DD hh:mm:ss}</k></W>',
'<c>{file:^15}</c>',
'<w>{function:^25}</w>',
'<e>{line:03d}</e>',
'<r>{level:^10}</r>',
'<W><k>{message:<50}</k></W>',
]

log_separator = ' | '

logger.remove()
logger.add(sink=stdout, level='TRACE', format=log_separator.join(log_format))
212 changes: 212 additions & 0 deletions examples/image-processing/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import multiprocessing as mp
import pickle
import time
from glob import glob
from os import path
from typing import List, Tuple

import click
import cv2
from loguru import logger
from worker import process_images

import zmq

"""
ZMQ client-server for parallel image processing(resize).
The server(router) create n workers(dealer):
# each worker can ask a job to the server
# a job is an image to resize and a path where the resized_image will be saved
# once, the image was resied, the worker can ask a new job
# server will keep sending job to workers until there is no images left or user hit ctl+c
NOTE: opencv, numpy, click, loguru must be installed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably makes sense to add a requirements.txt to the directory

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, as you suggest, i've added the README.md and a requirements.txt(click, pyzmq, pillow).

"""

"""
This program has two mode :
# sequential
python main.py sequential-processing --path2initial_images /path/to/source/images --path2resized_images /path/to/target/images --image_extension '*.jpg' --size 512 512
# parallel
python main.py parallel-processing --path2initial_images /path/to/source/images --path2resized_images /path/to/target/images --image_extension '*.jpg' --nb_workers 8 --size 512 512
parallel mode can be 10x times faster than sequential mode
"""


@click.group(chain=False, invoke_without_command=True)
@click.pass_context
def cmd_group(clk: click.Context):
subcommand = clk.invoked_subcommand
if subcommand is not None:
logger.debug(f'{subcommand} was called')
else:
logger.debug('use --help to see availables subcommands')


@cmd_group.command()
@click.option(
'--path2initial_images', help='initial images location', type=click.Path(True)
)
@click.option(
'--path2resized_images', help='resized images location', type=click.Path(True)
)
@click.option(
'--image_extension', help='image file extension', default='*.jpg', type=str
)
@click.option(
'--size', help='new image size', type=click.Tuple([int, int]), default=(512, 512)
)
@click.pass_context
def sequential_processing(
clk: click.Context,
path2initial_images: click.Path(True),
path2resized_images: click.Path(True),
image_extension: str,
size: Tuple[int, int],
):
image_filepaths: List[str] = sorted(
glob(path.join(path2initial_images, image_extension))
)
nb_images = len(image_filepaths)
logger.debug(f'{nb_images:05d} were found at {path2initial_images}')
start = time.time()
for cursor, path2source_image in enumerate(image_filepaths):
bgr_image = cv2.imread(path2source_image)
resized_image = cv2.resize(bgr_image, dsize=size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're just resizing images, pillow is a far lighter dependency than a full computer vision library like opencv.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new version use pillow for image resizing, hope this example will help the community to have a better understanding on how to combine zeromq and python-multiprocessing for fast image processing tasks.

_, filename = path.split(path2source_image)
path2target_image = path.join(path2resized_images, filename)
cv2.imwrite(path2target_image, resized_image)
print(resized_image.shape, f'{cursor:04d} images')

end = time.time()
duration = int(round(end - start))
logger.success(
f'server has processed {cursor:04d}/{nb_images} images in {duration:03d}s'
)


@cmd_group.command()
@click.option(
'--path2initial_images', help='initial images location', type=click.Path(True)
)
@click.option(
'--path2resized_images', help='resized images location', type=click.Path(True)
)
@click.option(
'--image_extension', help='image file extension', default='*.jpg', type=str
)
@click.option(
'--nb_workers', help='number of workers to process images', default=2, type=int
)
@click.option(
'--size', help='new image size', type=click.Tuple([int, int]), default=(512, 512)
)
@click.pass_context
def parallel_processing(
clk: click.Context,
path2initial_images: click.Path(True),
path2resized_images: click.Path(True),
image_extension: str,
nb_workers: int,
size: Tuple[int, int],
):
ZEROMQ_INIT = 0
WORKER_INIT = 0
try:
router_address = 'ipc://router.ipc'
publisher_address = 'ipc://publisher.ipc'

ctx: zmq.Context = zmq.Context()
router_socket: zmq.Socket = ctx.socket(zmq.ROUTER)
publisher_socket: zmq.Socket = ctx.socket(zmq.PUB)

router_socket.bind(router_address)
publisher_socket.bind(publisher_address)
ZEROMQ_INIT = 1

image_filepaths: List[str] = sorted(
glob(path.join(path2initial_images, image_extension))
)
nb_images = len(image_filepaths)
logger.debug(f'{nb_images:05d} were found at {path2initial_images}')

if nb_images == 0:
raise Exception(f'{path2initial_images} is empty')

workers_acc = []
server_liveness = mp.Event()
worker_arrival = mp.Value('i', 0)
arrival_condition = mp.Condition()
workers_synchronizer = mp.Barrier(nb_workers)
for worker_id in range(nb_workers):
worker_ = mp.Process(
target=process_images,
kwargs={
'size': size,
'worker_id': worker_id,
'router_address': router_address,
'publisher_address': publisher_address,
'worker_arrival': worker_arrival,
'server_liveness': server_liveness,
'arrival_condition': arrival_condition,
'workers_synchronizer': workers_synchronizer,
'path2resized_images': path2resized_images,
},
)

workers_acc.append(worker_)
workers_acc[-1].start()

WORKER_INIT = 1
arrival_condition.acquire()
server_liveness.set() # send signal to worker
arrival_condition.wait_for(
predicate=lambda: worker_arrival.value == nb_workers, timeout=10
)

if worker_arrival.value != nb_workers:
logger.error('server wait to long for worker to be ready')
exit(1)

logger.success('all workers are up and ready to process images')
cursor = 0
keep_loop = True
start = time.time()
while keep_loop:
socket_id, _, msgtype, message = router_socket.recv_multipart()
if msgtype == b'req':
if cursor < nb_images:
path2source_image = image_filepaths[cursor]
router_socket.send_multipart(
[socket_id, b'', path2source_image.encode()]
)
cursor = cursor + 1
if msgtype == b'rsp':
content = pickle.loads(message)
if content['status'] == 1:
print(f"{content['worker_id']:03d}", f"{cursor:04d} items")
keep_loop = cursor < nb_images
# end loop over images
end = time.time()
duration = int(round(end - start))
logger.success(
f'server has processed {cursor:04d}/{nb_images} images in {duration:03d}s'
)
except Exception as e:
logger.error(e)
finally:
if WORKER_INIT:
logger.debug('server is waiting for worker to quit the loop')
publisher_socket.send_multipart([b'quit', b''])
for worker_ in workers_acc:
worker_.join()

if ZEROMQ_INIT == 1:
publisher_socket.close()
router_socket.close()
ctx.term()

logger.success('server has released all zeromq ressources')


if __name__ == '__main__':
cmd_group()
134 changes: 134 additions & 0 deletions examples/image-processing/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import multiprocessing as mp
from os import path
from typing import Tuple

import cv2
from log import logger

import zmq


def process_images(
size: Tuple[int, int],
worker_id: int,
router_address: str,
publisher_address: str,
path2resized_images: str,
worker_arrival: mp.Value,
arrival_condition: mp.Condition,
server_liveness: mp.Event,
workers_synchronizer: mp.Barrier,
):

ZEROMQ_INIT = 0
try:
ctx: zmq.Context = zmq.Context()
dealer_socket: zmq.Socket = ctx.socket(zmq.DEALER)
subscriber_socket: zmq.Socket = ctx.socket(zmq.SUB)

dealer_socket.connect(router_address)
subscriber_socket.connect(publisher_address)
subscriber_socket.set(zmq.SUBSCRIBE, b'') # subscribe to all topics

ZEROMQ_INIT = 1

poller = zmq.Poller()
poller.register(dealer_socket, zmq.POLLIN)
poller.register(subscriber_socket, zmq.POLLIN)

liveness_value = server_liveness.wait(
timeout=10
) # wait atleast 10s for server to be ready
if not liveness_value:
logger.error(f'worker {worker_id:03d} wait too long for server to be ready')
exit(1)

arrival_condition.acquire()
with worker_arrival.get_lock():
worker_arrival.value += 1
logger.debug(
f'worker {worker_id:03d} has established connection with {router_address} and {publisher_address}'
)
arrival_condition.notify_all()
arrival_condition.release()

logger.debug(f'worker {worker_id:03d} is waiting at the barrier')
workers_synchronizer.wait(timeout=5) # wait at the barrier

worker_status = 0 # 0 => free | 1 => busy
keep_loop = True
while keep_loop:
if not server_liveness.is_set():
logger.warning(f'server is down...! worker {worker_id:03d} will stop')
break

if worker_status == 0:
dealer_socket.send_multipart([b'', b'req', b'']) # ask a job
worker_status = 1 # worker is busy

incoming_events = dict(poller.poll(100))
dealer_poller_status = incoming_events.get(dealer_socket, None)
subscriber_poller_status = incoming_events.get(subscriber_socket, None)
if dealer_poller_status is not None:
if dealer_poller_status == zmq.POLLIN:
try:
_, encoded_path2image = dealer_socket.recv_multipart()
path2source_image = encoded_path2image.decode()
bgr_image = cv2.imread(path2source_image)
resized_image = cv2.resize(bgr_image, dsize=size)
_, filename = path.split(path2source_image)
path2target_image = path.join(path2resized_images, filename)
cv2.imwrite(path2target_image, resized_image)
dealer_socket.send_multipart([b'', b'rsp'], flags=zmq.SNDMORE)
dealer_socket.send_pyobj(
{
'incoming_image': path2source_image,
'worker_id': worker_id,
'status': 1,
}
)
except Exception as e:
logger.error(e)
logger.error(
f'worker {worker_id:03d} was not able to process : {path2source_image}'
)
dealer_socket.send_multipart([b'', b'rsp'], flags=zmq.SNDMORE)
dealer_socket.send_pyobj(
{
'incoming_image': path2source_image,
'worker_id': worker_id,
'status': 0,
}
)
worker_status = 0 # worker is free => can ask a new job

if subscriber_poller_status is not None:
if subscriber_poller_status == zmq.POLLIN:
topic, _ = subscriber_socket.recv_multipart()
if topic == b'quit':
logger.debug(
f'worker {worker_id:03d} got the quit signal from server'
)
keep_loop = False
# end while loop ...!

except KeyboardInterrupt:
pass
except Exception as e:
if workers_synchronizer.broken:
logger.warning(
f'worker {worker_id:03d} will stop. the barrier was broken by some workers'
)
else:
logger.error(e)
finally:
if ZEROMQ_INIT == 1:
poller.unregister(subscriber_socket)
poller.unregister(dealer_socket)

subscriber_socket.close()
dealer_socket.close()

ctx.term()

logger.success(f'worker {worker_id:03d} has released all zeromq ressources')