Skip to content

[ray] Unable to remove local reference from object store in async actors #8743

Closed
@syonekura

Description

What is your question?

Ray version and other system information (Python version, TensorFlow version, OS):
Ray 0.8.5
Python 3.7.5
OS Kubuntu 18.04

Other dependencies:
opencv-python 4.2.0.34
imutils 0.5.3

Reposting from ray-dev

Hi, I'm trying to build a image processing pipeline using OpenCV and Ray. For this I've defined a couple of actors to read and write video respectively (the idea is to plug other elements in between afterwards):

import ray
import cv2
from imutils.video import FPS
import asyncio
from loguru import logger


@ray.remote(num_cpus=1)
class VideoReader:
    def __init__(self, cap=0):
        self.cap = cap
        self.capture = cv2.VideoCapture(cap)
        self.frames = []

    async def read(self):
        fps = FPS()
        fps.start()
        while True:
            ret, frame = self.capture.read()
            if not ret:
                break

            fps.update()
            self.frames.append(frame)
            fps.stop()
            if fps._numFrames % 100 == 0:
                logger.info(f'Reading at {fps.fps():0.2f} FPS')
            await asyncio.sleep(0.0)

    async def get_frames(self):
        result = self.frames
        self.frames = []
        return result


@ray.remote(num_cpus=1)
class VideoWriter:
    def __init__(self, actor_handle, filename='out.avi', codec='XVID',
                 fps=30.0):
        self.source = actor_handle
        self.filename = filename
        self.codec = cv2.VideoWriter_fourcc(*codec)
        self.fps = fps
        self.writer = cv2.VideoWriter(self.filename, self.codec, self.fps,
                                      (640, 480))

    async def save(self):
        fps = FPS()
        fps.start()
        count = 0
        while count < 5:
            frames = await self.source.get_frames.remote()
            if not frames:
                count += 1
                await asyncio.sleep(0.0)
            else:
                count = 0
            for frame in frames:
                fps.update()
                self.writer.write(frame)

            del frames

            fps.stop()
            if fps._numFrames % 100 == 0:
                logger.info(f'Writing at {fps.fps():0.2f} FPS')


def main():
    ray.init()
    webcam = VideoReader.remote()
    writer = VideoWriter.remote(webcam)
    ray.get([webcam.read.remote(), writer.save.remote()])


if __name__ == '__main__':
    main()

The problem is that while this is running it keeps local references to what I think are all the frames that had been written into the video file, until the object store gets full and the script crashes. Running ray memory lead to this:

-----------------------------------------------------------------------------------------------------
Object ID                                Reference Type       Object Size   Reference Creation Site
=====================================================================================================
; driver pid=11359
f66d17bae2b0e76545b95b1c010000c801000000  LOCAL_REFERENCE                ?   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:74
ffffffffffffffff45b95b1c010000c801000000  USED_BY_PENDING_TASK           ?   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:72
44ee453cd1e8e283ef0a6c22010000c801000000  LOCAL_REFERENCE                ?   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:74
ffffffffffffffffef0a6c22010000c801000000  USED_BY_PENDING_TASK           ?   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:main:73
; worker pid=11404
0d7cda845c5a77d945b95b1c010000c801000000  LOCAL_REFERENCE          2765186   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
2a8b9e2564dfb83745b95b1c010000c801000000  LOCAL_REFERENCE          2765186   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
e3ab2e8bd663349545b95b1c010000c801000000  LOCAL_REFERENCE          2765186   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
25d4b18b0e31cd3145b95b1c010000c801000000  LOCAL_REFERENCE          2765186   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
4bd2303a738d519f45b95b1c010000c801000000  LOCAL_REFERENCE          2765186   (actor call) /home/syonekura/PycharmProjects/async_video/
async_video/process_actor.py:save:52
...

With the last 5 lines repeated until infinity. Note that process_actor.py:save:52 corresponds to this line inside VideoWriter frames = await self.source.get_frames.remote(). I tried to delete that variable to remove the local reference without success. How should I properly handle the memory in this case?

Metadata

Assignees

Labels

questionJust a question :)

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions