Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dedicated array for chrom/pos lookup? #21

Open
jeromekelleher opened this issue May 22, 2024 · 3 comments
Open

Dedicated array for chrom/pos lookup? #21

jeromekelleher opened this issue May 22, 2024 · 3 comments

Comments

@jeromekelleher
Copy link
Contributor

jeromekelleher commented May 22, 2024

A common task we want to support is to answer position based queries, where a user wants to find out something about a given range of genomic positions, and therefore needs to convert that into a range of array indexes. In principle this is easy, we just load the variant_contig and variant_position arrays into memory and do binary search. (Let's assume for the purposes of this discussion that we can just look at the position, but in general we do need to examine the contig array as well.)

In practise, this can can take a long time because we have split the variant_position into lots of chunks, and retrieving all these chunks adds a lot of latency with high-latency storage. For example, in the 2020 1000 Genomes chromosome 2 data, we have ~10 million variants. With the default variant chunk size of 10_000, we get 1061 chunks, each about 30k. To read in all of variant_position, we need to do 1061 separate file reads, which can be quite slow. On a server with a 12 disk RAID 5 (after taking pains to clear all cache levels), we get this:

root = zarr.open("1kg_chr2_2.zarr")
pos = root["variant_position"]

before_wall = time.perf_counter()
before_cpu = time.process_time()
x = pos[:]
duration_wall = time.perf_counter() - before_wall
duration_cpu = time.process_time() - before_cpu
print(f"Retrieved pos: wall = {duration_wall:.2g}; cpu = {duration_cpu: .2g}")

giving

Retrieved pos: wall = 23; cpu =  0.94

Thats twenty-three seconds. Now we could argue that spinning platters aren't particularly modern and this would go away using SSDs, but I think that this is a best-case scenario for HPC shared file system installations, and likewise object storage lookups (e.g., cloud storage) will be high-latency and have similar properties.

I explored the idea of creating a dedicated index for doing these lookups, that is as small as possible and in a single chunk to make retrieval fast:

pos = root["variant_position"][:]
chrom = root["variant_contig"][:]
index = root.zeros(
    "position_index",
    shape=(pos.shape[0], 2),
    chunks=(pos.shape[0],),
    dtype=np.int32,
    compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=1),
    filters=[numcodecs.Delta(dtype=np.int32)],
    overwrite=True,
)
index[:, 0] = chrom
index[:, 1] = pos

This stores in a single 13M file. For reference, the corresponding variant_position uses 34M and variant_contig uses 4.2M across the 1060 files each. (I decided against using Delta for variant_position in vcf2zarr by default because the overall size of the arrays are negligible anyway, and having 1060 slightly smaller files isn't going to make much difference. That could easily be changed, though.)

Retrieving this array is much, much faster (from the same cold cache):

position_index = root["position_index"]
before_wall = time.perf_counter()
before_cpu = time.process_time()
x = position_index[:]
duration_wall = time.perf_counter() - before_wall
duration_cpu = time.process_time() - before_cpu

print(f"Retrieved index: wall = {duration_wall:.2g}; cpu = {duration_cpu: .2g}")

giving

Retrieved index: wall = 0.21; cpu =  0.32

That's down to a wall time of 0.2 seconds, which is much more like it (the CPU time is greater than wall because I didn't turn off the numcodecs threading for Blosc)

This wouldn't really be necessary if we didn't have the hard requirement that chunking is uniform across dimensions, but that's a separate issue (which I'll follow up separately). I thought it was worth documenting the idea of having a dedicated chrom+pos array which should make position-based searches much more efficient. (Note we'd have to read the contig_id array as well, but I don't see how that can be helped.)

If we do decide that chunking along a given dimension has to be strictly uniform, we could leave this position_index as un-dimensioned as a pragmatic workaround.

@jeromekelleher
Copy link
Contributor Author

jeromekelleher commented Jun 2, 2024

Aha - it's actually pretty silly storing the entire chrom/pos array in one chunk, as all we actually need are the minimum values in each chunk. This would allow us to binary search to the relevant chunks, and we could then just load the needed chunks from there.

Something like this would do the trick:

pos = root["variant_position"]
chrom = root["variant_contig"]

assert pos.cdata_shape == chrom.cdata_shape

index = []

for v_chunk in range(pos.cdata_shape[0]):
    p = pos.blocks[v_chunk]
    c = chrom.blocks[v_chunk]
    # Note in general we'd need to deal with having multiple contig values per chunk. That's why we've
    # got 3 columns in the array.
    assert np.all(c[1:] == c[0])
    index.append((c[0], p[0], v_chunk))

index = np.array(index, dtype=np.int32)
# print(index)
index_array = root.array(
    "position_index",
    data=index,
    compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0),
    overwrite=True,
)

Doing this for the chr2 example above gives a 5.3K file, which is as good as you'll do.

If we were to do this for the whole human genome (assuming the number of variants for the UKB 500K WGS data) we've have 1,037,556,156 variants, and with a chunk size of 10K, would give us 103K chunks. That's about 100X bigger than this example, which would give a roughly 500K index file to download. This could probably be made smaller, though, with some thought.

There's no real reason we should be splitting the Zarrs by chromosome, right?

@tomwhite
Copy link
Collaborator

tomwhite commented Jun 3, 2024

To read in all of variant_position, we need to do 1061 separate file reads, which can be quite slow.

It should be possible to use async calls to make this a lot faster. Fsspec does this under the hood, but it might be possible to tune it https://filesystem-spec.readthedocs.io/en/latest/async.html#coroutine-batching. The Zarr V3 API will provide a way to do this directly, but in the meantime it might be an interesting experiment to try with Zarrita which will work with Zarr V2 today.

In the same vein, this is an interesting project to keep an eye on: https://github.com/jackKelly/light-speed-io/

all we actually need are the minimum values in each chunk

This is something that Zarr could provide one day, see "Partition Statistics" in zarr-developers/zarr-specs#154 for example. I'm not sure if there's a Zarr 3 proposal for it.

There's no real reason we should be splitting the Zarrs by chromosome, right?

I don't think there is.

@tomwhite
Copy link
Collaborator

If we extend the index to include the maximum end position in each chunk then we can do overlap queries for VCF regions (-r) and targets (-t).

The idea is that we can do an initial query to find the chunks that overlap the regions specified in the query. Then we can do a second query over just those chunks to find the variants that overlap. This saves us from reading every chunk in the variant_contig and variant_position Zarr arrays, which is the thing we are trying to avoid for small region queries (e.g. a few variants).

To support this we need to add the end position to the vcz file, which I've done here: https://github.com/tomwhite/bio2zarr/tree/variant-end

I've implemented the logic including indexing and regions/targets in this PR: sgkit-dev/vcztools#37

I tested it on 1000 genomes chr20, and got a modest speed up compared to main. (The speed up would be more significant for a larger chromosome, or for the whole genome, as discussed earlier.)

# main branch
time vcztools view -r chr20:30000397 ~/workspace/1kg/vcf/CCDG_13607_B01_GRM_WGS_2019-02-19_chr20.recalibrated_variants.vcz | wc -l
       1
vcztools view -r chr20:30000397   8.27s user 3.89s system 128% cpu 9.455 total
# regions-index branch
time vcztools view -r chr20:30000397 ~/workspace/1kg/vcf/CCDG_13607_B01_GRM_WGS_2019-02-19_chr20.recalibrated_variants.vcz | wc -l
       1
vcztools view -r chr20:30000397   6.12s user 3.78s system 133% cpu 7.405 total

(To improve this more we could use asyncio to fetch the Zarr field chunks, using Zarr v3 or TensorStore, but that's a separate issue.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants