Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce metadata memory usage in augur filter #699

Closed
huddlej opened this issue Mar 17, 2021 · 5 comments · Fixed by #750
Closed

Reduce metadata memory usage in augur filter #699

huddlej opened this issue Mar 17, 2021 · 5 comments · Fixed by #750
Labels
bug Something isn't working enhancement New feature or request priority: high To be resolved before other issues

Comments

@huddlej
Copy link
Contributor

huddlej commented Mar 17, 2021

Current Behavior

While profiling memory usage for Nextstrain's SARS-CoV-2 workflow, I was surprised to find that augur filter rules consumed >2 GB of memory per job. Through memory profiling with the memory_profiler package, I found that the majority of this memory usage was caused by reading in the ~165 MB metadata input with the read_metadata function in utils.py . This puts the in-memory size of metadata at ~13 times the on-disk size.

In comparison, workflow scripts that read the same metadata file directly into a pandas DataFrame (e.g., scripts/assign-colors.py) only consumed ~400 MB of memory per job or 2-3 times the on-disk size of the data. These results indicate that the pandas DataFrame representation is considerably more memory efficient than the alternate Python dict representation returned by the read_metadata function.

Expected behavior

Augur should not use as much memory to represent metadata as it does, especially when alternate representations (pandas DataFrames) provide a better alternative.

How to reproduce

Run augur filter with metadata from a recent ncov workflow run.

Possible solution

Since Augur already loads metadata into a DataFrame before finally converting it to a dict, my recommendation is to always use a DataFrame to represent metadata internally. This solution would cause a breaking change to the current API, such that all consumers of the read_metadata function would need to be updated to expect a different data structure.

However, this data structure provides a more flexible interface (most of the operations we apply in filter.py could be rewritten as simple DataFrame operations) and more potential to scale with larger datasets in the future through more efficient data types, chunking, or replacement with an alternate framework like Dask that implements a similar API.

@huddlej huddlej added bug Something isn't working enhancement New feature or request hard problem Requires more work than most issues priority: moderate To be resolved after high priority issues and removed hard problem Requires more work than most issues labels Mar 17, 2021
@jameshadfield
Copy link
Member

jameshadfield commented Mar 17, 2021

This solution would cause a breaking change to the current API, such that all consumers of the read_metadata function would need to be updated to expect a different data structure.

Given that read_metadata is in augur.utils we could keep it around, with a deprecation warning, whilst building a new memory-efficient function in augur.io and then transitioning our augur code to the latter. Anecdotally, a lot of scripts use this function so it would be nice to preserve it for a while at least!

@huddlej
Copy link
Contributor Author

huddlej commented Mar 17, 2021

That sounds like a great plan. I'm happy to see how widespread read_metadata is (in our own scripts, at least).

@huddlej huddlej added priority: high To be resolved before other issues and removed priority: moderate To be resolved after high priority issues labels Jun 2, 2021
@huddlej
Copy link
Contributor Author

huddlej commented Jul 3, 2021

After some more reflection, the main issue with reading all metadata into memory occurs when users apply augur filter to the full GISAID or GenBank SARS-CoV-2 datasets. These datasets contain 1.5+ million records and cannot reasonable fit into memory. However, for most other use cases, users work with the subset of data that are emitted by this first augur filter step (it isn't reasonable/useful to build a tree with 1.5 million sequences).

Given this main issue, the scope of the current issue can be reduced to improving the memory usage of metadata in augur filter specifically. Initial discussion within our team about how to reduce the overall size of metadata produced a couple of possible solutions including:

  • switching from Python dict to a pandas DataFrame
  • switching from Python dict to a Dask DataFrame
  • streaming metadata into a sqlite database

Given the new scope of the issue, though, we do not necessarily need to implement a new interface for accessing metadata across augur; we only need to support more efficient processing in augur filter. I think we can accomplish this goal by refactoring augur filter to iterate over chunks of metadata records (N=1 or more records per chunk). This iteration needs to account for three major use cases:

  1. In the best case (filtering only without grouping), augur filter only needs one pass through the metadata to determine which records should be kept. This case does not require additional effort to produce the final output, because it can stream accepted records to disk and/or track the names of these strains for writing to --output-strains or for streaming sequences to --output-sequences.

  2. In an intermediate case (filtering and grouping by a fixed number of sequences per group), augur filter needs one pass through the metadata to filter records and also needs to maintain a list of possible sequences to keep per group. It then needs to loop through the possible sequences per group and accept a fixed number of these based on random or user-defined priorities. Note that this case is identical to the special case when users request a maximum number of sequences and do not provide any columns to group by. In the special case, there is only one group (“_dummy”) and the sequences per group is the maximum number of sequences requested.

Several technical solutions could work. One could write valid strains and their priorities out to a temporary file for each group and process these files after the first pass through the metadata. This solution can effectively require two passes through the metadata if filters do not remove many records. Alternately, one could write records to a database (e.g., sqlite) indexed by group and strain and select the top records from each group ordered by priority in the second pass.

Another option would be to maintain a fixed-size min heap per group in memory such that each newly accepted strain is added to the corresponding heap if it has a higher priority than any other previously accepted strains. After the first pass, each heap will contain no more than N (highest priority or random) strains per group and augur filter can emit these records to --output-metadata, etc. This approach requires more memory than the file-based approach, but if user-requested group sizes are reasonable, the total memory used should be acceptable. This approach should be faster by avoiding disk I/O and a second pass through the metadata (in the worst case). Interestingly, a min heap could still be an efficient data structure to identify the highest priority strains from a file on disk (e.g., with heapq.nlargest.

  1. In the worst case (filtering with groups and max number of sequences), augur filter needs two passes through the metadata. The first pass filters strains and counts the number of groups and sequences per group. This information is required to calculate a “sequences per group” to emit. The second pass would apply the same method used to address the intermediate case above (a file-based intermediate or heap) to select strains with the highest priorities.

One possible pseudo-code version of the proposed algorithm looks like this:

sequence_index = load_or_build_sequence_index(
  sequence_index_file,
  sequences_file
)
records = load_metadata(metadata_file)
priorities = load_priorities(priorities_file)

output_report = open("filter_report.tsv")
output_strains = open("filtered_strains.txt")
output_metadata = open("filtered_metadata.tsv")
output_sequences = open("filtered_sequences.fasta")

# Build up a list of functions to apply to each
# record in the metadata.
filters = construct_filters(user_args, sequence_index)
records_to_keep = {}

# First pass. Only pass in the best case.
for record in records:
  # Apply current filter logic per record or batch of records.
  is_filtered, reason = filter(record, filters)

  if is_filtered:
    output_report.write(record, reason)
  else:
    records_to_keep.add(record)

    if group by and (sequences per group or (max sequences and "dummy")):
      heaps_by_group[group].add(
        record,
        priorities[record],
        sequences per group
      )
    elif group by and max sequences:
      records_per_group[group] += 1
    else:
      output_strains.write(record)
      output_metadata.write(record)

# Worst case, need to calculate sequences per group.
if group by and max sequences:
  sequences per group = calculate_sequences_per_group(records_per_group)
  
  # Second pass to add records to heaps in priority order.
  for record in records:
    if record in records_to_keep:
      heaps_by_group[group].add(
        record,
        priorities[record],
        sequences per group
      )

# In the intermediate and worst case, process heaps.
if heaps_by_group:
  for group, heap in heaps_by_group:
    for record in heap:
      output_strains.write(record)
      output_metadata.write(record)

# Write out sequences, if needed.
for sequence in sequences:
  if sequence.id in records_to_keep:
    output_sequences.write(sequence)

Implicit in the pseudo-code implementation above is that the current multiple lines of filter logic in filter.py needs to be refactored into individual functions that can filter one or more records (depending on implementation details).

Also implicit is that the probabilistic subsampling logic could be refactored into its own function and potentially removed from the user interface (i.e., we could only use probabilistic sampling when it is necessary).

@huddlej huddlej changed the title Reduce memory usage by metadata Reduce metadata memory usage in augur filter Jul 6, 2021
@huddlej
Copy link
Contributor Author

huddlej commented Jul 6, 2021

We can split the bigger issue of memory used by augur filter into the following smaller tasks:

@tsibley
Copy link
Member

tsibley commented Jul 8, 2021

@huddlej Overall impression is that this plan sounds excellent to me. 👍 for focusing the scope here and breaking down to incremental steps.

In terms of implementations, I think I'm partial to the in-memory heap or on-disk SQLite approach. Not sure which between the two but would be happy to discuss more in person!

huddlej added a commit that referenced this issue Jul 20, 2021
This is nearly to the point described by the correspondig pseudo-code
[1], but it is still broken. The major remaining work includes the
second pass through the metadata, processing of heaps, and updating the
filter reporting to use more programmatic output.

[1] #699 (comment)
huddlej added a commit that referenced this issue Jul 29, 2021
Instead of loading all metadata into memory at once, iterate through
fixed-size chunks of data, applying filters to these data and either
grouping data into heaps or streaming output to disk, as needed. This
approach generally follows the original pseudo-code solution [1].

One side-effect of this implementation is that it allows us to log the
reason why each strain was filtered or force-included in a new
`--output-log` argument. One of the output columns of the log file is a
kwargs column that tracks the argument passed to a given filter. This
column is structured text in JSON format which allows for more
sophisticated reporting by specific keys.

Along with this change, we apply the include/exclude logic from files
per file so we can track which specific file was responsible for
including or filtering each strain.

Fixes #424

[1] #699 (comment)
huddlej added a commit that referenced this issue Jul 31, 2021
Instead of loading all metadata into memory at once, iterate through
fixed-size chunks of data, applying filters to these data and either
grouping data into priority queues or streaming output to disk, as
needed. This approach generally follows the original pseudo-code
solution [1]. Users can override the default chunk size with the new
`--metadata-chunksize` argument to tune the amount of memory used by a
given execution of the filter command. A larger chunk size uses more
memory but may also run slightly faster.

One side-effect of this implementation is that it allows us to log the
reason why each strain was filtered or force-included in a new
`--output-log` argument. One of the output columns of the log file is a
kwargs column that tracks the argument passed to a given filter. This
column is structured text in JSON format which allows for more
sophisticated reporting by specific keys.

Along with this change, we apply the include/exclude logic from files
per file so we can track which specific file was responsible for
including or filtering each strain.

Note that we don't use context manager for CSV reading here. In version
1.2, pandas.read_csv was updated to act as a context manager when
`chunksize` is passed but this same version increased the minimum Python
version supported to 3.7. As a result, pandas for Python 3.6 does not
support the context manager `with` usage. Here, we always iterate
through the `TextFileReader` object instead of using the context
manager, an approach that works in all Python versions.

Finally, this commit changes the subsample seed argument type to an
`int` instead of string or int to match numpy's requirement for its
random generator seed [2]. We do not pass a seed value to numpy random
generator prior to Poisson sampling or the generator will always sample
the same values for a given mean (i.e., all queues will have the same
size). Use the random seed for generating random priorities when none
are provided by the user.

Fixes #424

[1] #699 (comment)
[2] https://numpy.org/doc/stable/reference/random/generator.html
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request priority: high To be resolved before other issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants