Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers stuck, increased memory usage while processing large CSV from S3. #1467

Closed
konrad-roze opened this issue Oct 11, 2017 · 32 comments
Closed

Comments

@konrad-roze
Copy link

I'm processing a dataframe stored as a (relatively) large CSV on S3.
Using distributed scheduler with multiprocessing (1 thread per 1 worker process, --no-nanny).
Workers seem to be accumulating data and getting stuck, in some cases this also leads to failure of whole job.

I came up with minimal reproducing example as below (only read/write CSV)

frame = df.read_csv(input_url,
                    collection=True,
                    blocksize=1024*1024,
                    compression=None,
                    lineterminator='\n',
                    dtype=str,
                    sep=',',
                    quotechar='"',
                    encoding='utf-8')

fun_list = frame.to_csv(output_url,
                        compute=False,
                        encoding='utf-8',
                        index=False,
                        index_label=False)

futures = client.compute(fun_list)
progress(futures)
client.gather(futures)

This would hang forever with progress at 0%.
In worker log:
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 7.16 GB -- Worker memory limit: 8.02 GB

The file itself is only 1.2GB though.
Using distributed 1.19.2 and dask 0.15.4

@mrocklin
Copy link
Member

It's odd that the worker process would be observing a 7GB memory use in this situation. It is evaluating memory use with the following code:

psutil.Process().memory_info().rss

Do you have any thoughts about why that code would return 7GB in your situation?

You can disable this functionality if you want by setting the following in your config.yaml

worker-memory-pause: False  # fraction at which we pause worker threads

@konrad-roze
Copy link
Author

No clear idea.
Before starting the job the worker memory is on the level of <100MB. It jumps only after the job is started. If I increase --memory-limit to 16GB then the job is progressing, but the memory consumption is around 8GB. It's unexpected as we're doing simple read-write operation and the input data is 1.25GB (we hope to process much bigger than that).

I will be looking further into this, anything in particular that I could check?

PS. for debugging purpose I'm running scheduler and single worker locally on my development host.
Worker is started as
dask-worker --nthreads 1 --no-nanny 0.0.0.0:8786

@konrad-roze
Copy link
Author

With default 8GB limit it seems to get stuck on read_block
image
My current idea is that it gets stuck when reading the data and looking for record terminators.

@mrocklin
Copy link
Member

You might consider running your computation locally and tracking memory use. You might do this with the single-machine scheduler diagnostics or, if you can run your functions independently, with a memory line-profiler.

@konrad-roze
Copy link
Author

Thanks, I will try to do that. Currently I'm running locally in the sense that all processes run on the same host. If I switch to default local scheduler the job does not block and memory consumption is also significantly lower. I tried to tweak read_csv() parameters and figure out what might be wrong with our input data, but I haven't found anything yet.

@konrad-roze
Copy link
Author

I have tried guppy, memory-profiler and cprofile, but these didn't tell me much about nature of problem. We seem to have single read_block task that hangs and consumes all memory. Unfortunately no success in running dask diagnostics so far, I'm getting warnings:
(NO_DATA_RENDERERS): Plot has no data renderers
which result in empty plot.

@konrad-roze
Copy link
Author

update:
As mentioned earlier, this problem occurs only when using distributed scheduler.
After reviewing other posts I tried other versions of cloudpickle (0.2.2 and 0.4.0) but this doesn't help.

Next, I have generated test input data with the same file size, but shorter records, no unicode, quotes, escaping etc. - to check the possibility of funny data being the cause, but it doesn't seem to be the problem.

I have also taken a look at computation graph visualizations, but to me all looks as expected (I'm attaching example graph from a smaller dataset)
graph1.zip

Might that be caused by huge size of task graph? (for development I have only 1 worker running currently)

@konrad-roze
Copy link
Author

Further observation - problem seems to be related to blocksize.
Input data is around 1.2GB.
With blocksize of 1MB the worker eats up almost 8GB of memory.
With blocksize set to 2MB worker takes "only" around 4GB of memory.

@konrad-roze
Copy link
Author

Moreover, after the job is killed, worker does not release the memory, but running the job again does not increase initial memory consumption - starts still around 4GB.

@pitrou
Copy link
Member

pitrou commented Oct 16, 2017

Do you see abnormal memory consumption if you try with a subset of the file (say 1/10th the size, but with a similar structure)? Also, can you also reproduce if the file is stored locally?

@konrad-roze
Copy link
Author

If I store the file locally while still using distributed scheduler, I do not observe this problem - memory consumption is very low. If I use smaller input data the memory consumption is proportionally smaller, e.g. for 280MB (roughly 1/4) it would consume almost 2GB (roughly 1/4) - in such cases worker would not get stuck because it has enough memory, but the consumption is still unexpectedly high.

@pitrou
Copy link
Member

pitrou commented Oct 17, 2017

Hmm... so there would be something in the S3 layer consuming undue amounts of memory? Is it possible for you to give us access to that file (preferably the smaller version) and post a standalone script showing how to reproduce?

@konrad-roze
Copy link
Author

The code is in my initial comment on top (minus the imports). Unfortunately I cannot share the file, but I have already confirmed that this is not a problem with specific data pattern - I have generated simple data with short records and it behaves the same. If I use default local scheduler but consume data from S3 I can see that it launches multiple (8) processes and each one of them consumes almost 1GB of memory - so this might be related to S3 access.

@pitrou
Copy link
Member

pitrou commented Oct 17, 2017

Could you then share the generated data file?

@konrad-roze
Copy link
Author

I have used this small script to generate data file

header = "COL1_NAME,COL2_NAME\n"
desired_size = 1200000000
fname = "test_data_big.csv"
line_template = "Column1 description line %s autogenerated, column2 description line %s autogenerated\n"
with open(fname, "w+") as fout:
    fout.write(header)
    i = 1
    size = 0
    while size < desired_size:
        l = line_template.replace("%s", str(i))
        fout.write(l)
        size += len(l)
        i += 1

@konrad-roze
Copy link
Author

I have updated s3fs to 0.1.2 and bokeh to 0.12.10 but no change. Looks like the first task eats up all the memory on the worker and nothing happens afterwards due to the mem limits imposed:
image
As a side note, while the task is being executed the worker dashboard completely hangs up.

@konrad-roze
Copy link
Author

I have also removed bokeh from installation just to verify whether dashboards may be causing problems, but problem still persists.

@konrad-roze
Copy link
Author

Additional info for reproduction, I am running Python 2.7.13.

@pitrou
Copy link
Member

pitrou commented Oct 17, 2017

(sorry, wrong issue)

@konrad-roze
Copy link
Author

Problem does not seem to occur with synchronous single-threaded scheduler.

@konrad-roze
Copy link
Author

No problem also in the case of local multiprocessing (process pool) scheduler.

@konrad-roze
Copy link
Author

I have run the IPython kernel on a worker in attempt to inspect what's going on.
Setup is:

  • distributed scheduler
  • one single threaded worker process
  • all running on the same host
  • input file from S3 size ~1.2GB
  • code as on the top of this post
  • worker consumes ~8GB memory and gets stuck

What I got from IPython kernel:

worker
<Worker: tcp://127.0.0.1:38191, running, stored: 1, running: 0/1, ready: 1191, comm: 0, waiting: 0>

worker.monitor
<SystemMonitor: cpu: 0 memory: 7187 MB fds: 77>

worker.has_what
defaultdict(set,
            {'tcp://127.0.0.1:38191': {'read-block-177209344-0574dae3c961d89cc7b78882d3b95415'}})

worker.tasks['read-block-177209344-0574dae3c961d89cc7b78882d3b95415']
(<function dask.bytes.core.read_block_from_file>,
 (<dask.bytes.core.OpenFile at 0x7ff92c5318d0>, 177209344, 1048576, '\n'),
 {})

worker.task_state['read-block-177209344-0574dae3c961d89cc7b78882d3b95415']
'memory'

worker.suspicious_deps
defaultdict(<function distributed.worker.<lambda>>, {})

worker.stateof('read-block-177209344-0574dae3c961d89cc7b78882d3b95415')
{'data': True, 'executing': False, 'heap': False, 'waiting_for_data': False}

worker.status
'running'

worker.paused
True

worker.nbytes
{'read-block-177209344-0574dae3c961d89cc7b78882d3b95415': 1065590}

worker.long_running
set()

worker.keys()
['read-block-177209344-0574dae3c961d89cc7b78882d3b95415']

worker.in_flight_tasks
{}

worker.host_health()
{'cpu': 1.1,
 'disk-read': 0,
 'disk-write': 159744,
 'memory': 64163962880,
 'memory_percent': 17.6,
 'network-recv': 11956,
 'network-send': 14886,
 'time': 1508429967.029673}

 worker.executing
set()

worker.exceptions
{}

worker.events
defaultdict(<function distributed.core.<lambda>>, {})

list(worker.data)
['read-block-177209344-0574dae3c961d89cc7b78882d3b95415']

worker.dep_state
{'read-block-177209344-0574dae3c961d89cc7b78882d3b95415': 'memory'}

worker.dependencies
...
'read-block-1129316352-0574dae3c961d89cc7b78882d3b95415': set(),
 'read-block-546308096-0574dae3c961d89cc7b78882d3b95415': set(),
 'read-block-670040064-0574dae3c961d89cc7b78882d3b95415': set(),
 'pandas_read_text-2f673e08dadd1811622d0cf31122ce77': {'read-block-177209344-0574dae3c961d89cc7b78882d3b95415'},
 'read-block-605028352-0574dae3c961d89cc7b78882d3b95415': set(),
 'read-block-556793856-0574dae3c961d89cc7b78882d3b95415': set(),
 'read-block-981467136-0574dae3c961d89cc7b78882d3b95415': set(),
...

@konrad-roze
Copy link
Author

konrad-roze commented Oct 19, 2017

Attaching the worker.log
wokre_log.txt
the tail is

...
('read-block-399507456-0574dae3c961d89cc7b78882d3b95415', 'new'),
 ('read-block-399507456-0574dae3c961d89cc7b78882d3b95415', 'waiting', 'ready'), 
('read-block-177209344-0574dae3c961d89cc7b78882d3b95415', 'new'), 
('read-block-177209344-0574dae3c961d89cc7b78882d3b95415', 'waiting', 'ready'), 
('read-block-177209344-0574dae3c961d89cc7b78882d3b95415', 'ready', 'executing'), 
('read-block-177209344-0574dae3c961d89cc7b78882d3b95415', 'put-in-memory'), 
('read-block-177209344-0574dae3c961d89cc7b78882d3b95415', 'executing', 'memory'), 
('pandas_read_text-2f673e08dadd1811622d0cf31122ce77', 'new'),
 ('read-block-177209344-0574dae3c961d89cc7b78882d3b95415', 'new-dep', 'memory'), 
('pandas_read_text-2f673e08dadd1811622d0cf31122ce77', 'waiting', 'ready'),
 ('handle-missing', ())]

@jeffreyliu
Copy link

I'm having a similar issue, with google cloud storage as the file service.
Everything works fine when running a local scheduler, but when running distributed, the worker hangs during pandas_read_text.
I'm reading in a csv that's ~380 MB in size. The file does seem to get downloaded, since the dashboard reports bytes-stored equal to the file size, but it hangs when trying to parse the file.
Here's the stderr debug output for the worker

distributed.worker - DEBUG - future state: read-block-0-612c3bae53a58b03ed1fd3278fe778fb - RUNNING
distributed.worker - DEBUG - Heartbeat: tls://{workerhost}:{port}
distributed.core - WARNING - Event loop was unresponsive for 82.71s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.worker - DEBUG - Heartbeat: tls://{workerhost}:{port}
distributed.worker - DEBUG - future state: read-block-0-612c3bae53a58b03ed1fd3278fe778fb - FINISHED
distributed.worker - DEBUG - Send compute response to scheduler: read-block-0-612c3bae53a58b03ed1fd3278fe778fb, {'op': 'task-finished', 'status': 'OK', 'nbytes': 383309713, 'type': <class 'bytes'>, 'start': 1516905426.090314, 'stop': 1516905543.1502976, 'thread': 139770478372608, 'key': 'read-block-0-612c3bae53a58b03ed1fd3278fe778fb'}
distributed.worker - DEBUG - Heartbeat: tls://{workerhost}:{port}
distributed.worker - DEBUG - Execute key: pandas_read_text-767ee6f2c1ce2c75f9125adfb1c32975 worker: tls://{workerhost}:{port}

Not sure it's a memory issue on my end, since the memory usage reported by htop for the worker is <1% of total system RAM, and I'm running with -memory-limit=0 (no difference when setting it to 'auto' or any other value). The worker CPU usage is consistently at 100% though, despite not loading the file even after an hour. For reference, I'm able to load and compute on my laptop locally in approximately one minute.

@mrocklin
Copy link
Member

mrocklin commented Jan 25, 2018 via email

@jeffreyliu
Copy link

I'm able to get call stack only during the read_block call. Once it moves onto pandas_read_text, client.call_stack() times out, and the web interface gives 500: internal error.

The calls I'm making look like:

gcs_loaded_dd = dd.read_csv('gs://bucket/file.csv', 
                            storage_options={'project':project_name, 'token':gcs_token}, 
                            encoding='utf-8', blocksize=None, delimiter=',')
out = client.compute(gcs_loaded_dd)

I thought maybe it was an issue with the worker in general, but testing with client.submit(lambda x: x+1, 1) executes and returns fine

I also tried loading a test dataframe filled with random numbers to see if it was something weird about the dataframe, but the same issue occurs.

The scheduler and worker is running on an ubuntu 14.04 machine, client is running on macos high sierra. Both machines are running dask 0.16.1, distributed 1.20.2, gcsfs 0.0.4 on python 3.6

@mrocklin
Copy link
Member

Once it moves onto pandas_read_text, client.call_stack() times out, and the web interface gives 500: internal error

This is a sign that this part of the pandas call doesn't release the GIL, which is why other parts of the system can't respond in a timely manner. Probably it's creating many string objects in your pandas dataframe.

@jeffreyliu
Copy link

Yes, that seemed to be the issue. This thread helped get it to work.

Turning off the pandas option: pd.options.mode.chained_assignment = None allows the dataframe to load in a reasonable amount of time.

@manugarri
Copy link
Contributor

Just commenting, disabling the chain assigment pandas option made my ETL job go from running out of memory after 90 minutes to taking 17 minutes! I think we can close this issue since its related to pandas (and thanks @jeffreyliu a year and a half later for your comment!)

@TomAugspurger
Copy link
Member

Huh, that surprises me about chained_assignment. I'll look into the pandas side of things.

@TomAugspurger
Copy link
Member

TomAugspurger commented Sep 10, 2019

It seems like this performance issue was fixed on the pandas side in pandas-dev/pandas#27031 (pandas 0.25). At least, I can't reproduce the slowdown reported in pandas-dev/pandas#18743 (comment) anymore.

@GenevieveBuckley
Copy link
Contributor

I think we can close this issue since its related to pandas (and thanks @jeffreyliu a year and a half later for your comment!)

Ok, closing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants