|
10 | 10 | import numpy as np
|
11 | 11 | import pandas as pd
|
12 | 12 | from anndata import AnnData
|
| 13 | +from tqdm import tqdm |
13 | 14 |
|
14 | 15 | from grelu.data.utils import get_chromosomes
|
15 | 16 | from grelu.utils import get_aggfunc
|
@@ -723,3 +724,126 @@ def make_insertion_bigwig(
|
723 | 724 | os.remove(bedgraph_file)
|
724 | 725 |
|
725 | 726 | return bw_file
|
| 727 | + |
| 728 | + |
| 729 | +def write_tiledb( |
| 730 | + output_path: str, |
| 731 | + bw_files: Union[str, List[str]], |
| 732 | + chroms: Optional[Union[str, List[str]]], |
| 733 | + genome: str, |
| 734 | + tasks: Optional[Union[List[str], pd.DataFrame]] = None, |
| 735 | + num_threads=1, |
| 736 | +): |
| 737 | + """ |
| 738 | + Write BigWig files and genome sequences to TileDB. |
| 739 | +
|
| 740 | + Args: |
| 741 | + output_path: Directory where the output TileDB files should be stored. |
| 742 | + bw_files: List of paths to BigWig files. |
| 743 | + chroms: A list of chromosomes to read from the bigWig files. If not provided, |
| 744 | + all chromosomes will be read. |
| 745 | + genome: Name of the genome corresponding to the BigWig files |
| 746 | + tasks: A list of task names or a pandas dataframe containing task information. |
| 747 | + If a dataframe is supplied, the row indices should be the task names. |
| 748 | + num_threads: Number of threads. Defaults to 1. |
| 749 | + """ |
| 750 | + import multiprocessing |
| 751 | + from multiprocessing import Pool |
| 752 | + |
| 753 | + import pyBigWig |
| 754 | + import tiledb |
| 755 | + from genomicarrays import buildutils_tiledb_array as uta |
| 756 | + from natsort import natsorted |
| 757 | + |
| 758 | + from grelu.data.tdb_utils import _extract_chrom_cov, _extract_chrom_sequence |
| 759 | + from grelu.data.utils import _create_task_data, get_chromosomes |
| 760 | + from grelu.utils import make_list |
| 761 | + |
| 762 | + if not os.path.isdir(output_path): |
| 763 | + raise ValueError("'output_path' must be a directory.") |
| 764 | + |
| 765 | + bw_files = make_list(bw_files) |
| 766 | + tasks = tasks or [os.path.splitext(os.path.basename(f))[0] for f in bw_files] |
| 767 | + |
| 768 | + # Check and format the chromosomes |
| 769 | + lengths = [] |
| 770 | + chroms = get_chromosomes(chroms) |
| 771 | + chroms = natsorted(chroms) |
| 772 | + |
| 773 | + # Get chromosome lengths |
| 774 | + for chrom in chroms: |
| 775 | + length = None |
| 776 | + for bw_file in bw_files: |
| 777 | + with pyBigWig.open(bw_file) as f: |
| 778 | + if chrom not in f.chroms(): |
| 779 | + raise ValueError(f"Chromosome {chrom} not found in file {bw_file}") |
| 780 | + else: |
| 781 | + if length is None: |
| 782 | + length = f.chroms(chrom) |
| 783 | + else: |
| 784 | + if length != f.chroms(chrom): |
| 785 | + raise ValueError( |
| 786 | + f"Chromosome {chrom} does not have the same length in all bigWig files" |
| 787 | + ) |
| 788 | + lengths.append(length) |
| 789 | + |
| 790 | + # Create chromosome dataframe |
| 791 | + chroms = pd.DataFrame({"chrom": chroms, "start": 0, "end": lengths}) |
| 792 | + chroms["uri"] = [f"{output_path}/{x}" for x in chroms.chrom] |
| 793 | + |
| 794 | + # Write chromosome dataframe |
| 795 | + _chroms_uri = f"{output_path}/chroms" |
| 796 | + tiledb.from_pandas(_chroms_uri, chroms) |
| 797 | + |
| 798 | + # Create task dataframe |
| 799 | + tasks = tasks or [os.path.splitext(os.path.basename(f))[0] for f in bw_files] |
| 800 | + if isinstance(tasks, List): |
| 801 | + tasks = _create_task_data(tasks) |
| 802 | + |
| 803 | + tasks["task_idx"] = range(1, len(tasks) + 1) |
| 804 | + tasks["bigwig_path"] = bw_files |
| 805 | + |
| 806 | + # Write task dataframe |
| 807 | + _task_uri = f"{output_path}/tasks" |
| 808 | + tiledb.from_pandas(_task_uri, tasks) |
| 809 | + |
| 810 | + # Create empty array for each chromosome |
| 811 | + for row in chroms.itertuples(): |
| 812 | + uta.create_tiledb_array( |
| 813 | + row.uri, |
| 814 | + matrix_attr_name="data", |
| 815 | + matrix_dim_dtype=np.int8, |
| 816 | + x_dim_length=1 + len(tasks), |
| 817 | + y_dim_length=row.end, |
| 818 | + is_sparse=False, |
| 819 | + ) |
| 820 | + |
| 821 | + # Write sequences |
| 822 | + print("Writing genome sequence") |
| 823 | + chrom_options = [(genome, chroms.iloc[i]) for i in range(len(chroms))] |
| 824 | + if num_threads > 1: |
| 825 | + try: |
| 826 | + multiprocessing.set_start_method("spawn", force=True) |
| 827 | + except RuntimeError: |
| 828 | + pass |
| 829 | + |
| 830 | + with Pool(num_threads) as p: |
| 831 | + p.map(_extract_chrom_sequence, chrom_options) |
| 832 | + else: |
| 833 | + for opt in tqdm(chrom_options): |
| 834 | + _extract_chrom_sequence(opt) |
| 835 | + |
| 836 | + # Writing the coverage |
| 837 | + print("Writing coverage from BigWig files") |
| 838 | + for i in range(len(chroms)): |
| 839 | + print(chroms.chrom.iloc[i]) |
| 840 | + bw_options = [ |
| 841 | + (chroms.iloc[i], row.bigwig_path, row.task_idx) |
| 842 | + for row in tasks.itertuples() |
| 843 | + ] |
| 844 | + if num_threads > 1: |
| 845 | + with Pool(num_threads) as p: |
| 846 | + p.map(_extract_chrom_cov, bw_options) |
| 847 | + else: |
| 848 | + for opt in tqdm(bw_options): |
| 849 | + _extract_chrom_cov(opt) |
0 commit comments