Description
What is your question?
Ray version and other system information (Python version, TensorFlow version, OS):
Ray 0.8.5
Python 3.7.5
OS Kubuntu 18.04
Other dependencies:
opencv-python 4.2.0.34
imutils 0.5.3
Reposting from ray-dev
Hi, I'm trying to build a image processing pipeline using OpenCV and Ray. For this I've defined a couple of actors to read and write video respectively (the idea is to plug other elements in between afterwards):
import ray
import cv2
from imutils.video import FPS
import asyncio
from loguru import logger
@ray.remote(num_cpus=1)
class VideoReader:
def __init__(self, cap=0):
self.cap = cap
self.capture = cv2.VideoCapture(cap)
self.frames = []
async def read(self):
fps = FPS()
fps.start()
while True:
ret, frame = self.capture.read()
if not ret:
break
fps.update()
self.frames.append(frame)
fps.stop()
if fps._numFrames % 100 == 0:
logger.info(f'Reading at {fps.fps():0.2f} FPS')
await asyncio.sleep(0.0)
async def get_frames(self):
result = self.frames
self.frames = []
return result
@ray.remote(num_cpus=1)
class VideoWriter:
def __init__(self, actor_handle, filename='out.avi', codec='XVID',
fps=30.0):
self.source = actor_handle
self.filename = filename
self.codec = cv2.VideoWriter_fourcc(*codec)
self.fps = fps
self.writer = cv2.VideoWriter(self.filename, self.codec, self.fps,
(640, 480))
async def save(self):
fps = FPS()
fps.start()
count = 0
while count < 5:
frames = await self.source.get_frames.remote()
if not frames:
count += 1
await asyncio.sleep(0.0)
else:
count = 0
for frame in frames:
fps.update()
self.writer.write(frame)
del frames
fps.stop()
if fps._numFrames % 100 == 0:
logger.info(f'Writing at {fps.fps():0.2f} FPS')
def main():
ray.init()
webcam = VideoReader.remote()
writer = VideoWriter.remote(webcam)
ray.get([webcam.read.remote(), writer.save.remote()])
if __name__ == '__main__':
main()
The problem is that while this is running it keeps local references to what I think are all the frames that had been written into the video file, until the object store gets full and the script crashes. Running ray memory lead to this:
-----------------------------------------------------------------------------------------------------
Object ID Reference Type Object Size Reference Creation Site
=====================================================================================================
; driver pid=11359
f66d17bae2b0e76545b95b1c010000c801000000 LOCAL_REFERENCE ? (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:74
ffffffffffffffff45b95b1c010000c801000000 USED_BY_PENDING_TASK ? (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:72
44ee453cd1e8e283ef0a6c22010000c801000000 LOCAL_REFERENCE ? (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:74
ffffffffffffffffef0a6c22010000c801000000 USED_BY_PENDING_TASK ? (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:73
; worker pid=11404
0d7cda845c5a77d945b95b1c010000c801000000 LOCAL_REFERENCE 2765186 (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
2a8b9e2564dfb83745b95b1c010000c801000000 LOCAL_REFERENCE 2765186 (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
e3ab2e8bd663349545b95b1c010000c801000000 LOCAL_REFERENCE 2765186 (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
25d4b18b0e31cd3145b95b1c010000c801000000 LOCAL_REFERENCE 2765186 (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
4bd2303a738d519f45b95b1c010000c801000000 LOCAL_REFERENCE 2765186 (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
...
With the last 5 lines repeated until infinity. Note that process_actor.py:save:52
corresponds to this line inside VideoWriter frames = await self.source.get_frames.remote()
. I tried to delete that variable to remove the local reference without success. How should I properly handle the memory in this case?