From 20f8e3f30287557e310f9f7b8799bfeb04ead603 Mon Sep 17 00:00:00 2001 From: Gianmauro Cuccuru Date: Tue, 3 Sep 2024 17:21:50 +0100 Subject: [PATCH] simplify s3 configuration --- gwasstudio/cli/export.py | 68 +++++++++------------------------------ gwasstudio/cli/info.py | 4 +-- gwasstudio/cli/ingest.py | 47 ++++----------------------- gwasstudio/cli/query.py | 42 ++---------------------- gwasstudio/dask_client.py | 14 +++++--- gwasstudio/main.py | 64 ++++++++++++++++++++++++++++++------ 6 files changed, 89 insertions(+), 150 deletions(-) diff --git a/gwasstudio/cli/export.py b/gwasstudio/cli/export.py index cc23000..1d15d43 100755 --- a/gwasstudio/cli/export.py +++ b/gwasstudio/cli/export.py @@ -15,75 +15,51 @@ @cloup.command("export", no_args_is_help=True, help=help_doc) @cloup.option_group( "Options for Locusbreaker", - cloup.option("--locus-breaker", is_flag=True, default=False, help="Flag to use locus breaker"), - cloup.option("--pvalue_sig", default=5, help="pvalue threshold to use for filtering the data"), - cloup.option("--pvalue_limit", default=5, help="pvalue threshold to use for filtering the data"), cloup.option( "--hole_size", default=250000, help="Minimum pair-base distance between SNPs in different loci (default: 250000)", ), + cloup.option("--locus-breaker", is_flag=True, default=False, help="Flag to use locus breaker"), + cloup.option("--pvalue_sig", default=5, help="pvalue threshold to use for filtering the data"), + cloup.option("--pvalue_limit", default=5, help="pvalue threshold to use for filtering the data"), ) @cloup.option_group( "Options for filtering using a list od SNPs ids", - cloup.option( - "-s", "--snp_list", is_flag=True, default=False, help="A txt file with a column containing the SNP ids" - ), + cloup.option("--snp_list", is_flag=True, default=False, help="A txt file with a column containing the SNP ids"), ) @cloup.option_group( "TileDBVCF options", cloup.option( "-c", "--columns", default=False, help="List of columns to keep, provided as a single string comma separated" ), - cloup.option("-s", "--samples", default=False, help="A path of a txt file containing 1 sample name per line"), - cloup.option("-g", "--genome-version", help="Genome version to be used (either hg19 or hg38)", default="hg19"), # Not yet implemented cloup.option( - "-c", "--chromosome", help="Chromosomes list to use during processing. This can be a list of chromosomes separated by comma (Example: 1,2,3,4)", default="1", ), - cloup.option("--window-size", default=50000000, help="Widnow size used by tiledbvcf for later queries"), - cloup.option( - "-s", - "--sample-partitions", - help="how many partition to divide the sample list with for computation (default is 1)", - default=1, - ), + cloup.option("-g", "--genome-version", help="Genome version to be used (either hg19 or hg38)", default="hg19"), + cloup.option("-o", "--output-path", default="output", help="The name of the output file"), cloup.option( "-r", "--region-partitions", help="how many partition to divide the region list with for computation (default is 20)", default=20, ), - cloup.option("-u", "--uri", default=False, help="TileDB-VCF dataset URI"), - cloup.option("-o", "--output-path", default="output", help="The name of the output file"), -) -@cloup.option_group( - "TileDB configurations", - cloup.option( - "-b", - "--mem-budget-mb", - default=20480, - help="The memory budget in MB when ingesting the data", - ), - cloup.option("--aws-access-key-id", default=None, help="aws access key id"), - cloup.option("--aws-secret-access-key", default=None, help="aws access key"), + cloup.option("-s", "--samples", default=False, help="A path of a txt file containing 1 sample name per line"), cloup.option( - "--aws-endpoint-override", - default="https://storage.fht.org:9021", - help="endpoint where to connect", + "--sample-partitions", + help="how many partition to divide the sample list with for computation (default is 1)", + default=1, ), - cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"), - cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"), - cloup.option("--aws-region", default="", help="region where the s3 bucket is located"), - cloup.option("--aws-verify-ssl", default="false", help="if ssl verfication is needed"), + cloup.option("-u", "--uri", default=False, help="TileDB-VCF dataset URI"), + cloup.option("--window-size", default=50000000, help="Window size used by tiledbvcf for later queries"), ) @click.pass_context def export( + ctx, columns, - mem_budget_mb, pvalue_limit, pvalue_sig, hole_size, @@ -96,23 +72,9 @@ def export( region_partitions, samples, uri, - aws_access_key_id, - aws_secret_access_key, - aws_endpoint_override, - aws_use_virtual_addressing, - aws_scheme, - aws_region, - aws_verify_ssl, ): - cfg = {} - cfg["vfs.s3.aws_access_key_id"] = aws_access_key_id - cfg["vfs.s3.aws_secret_access_key"] = aws_secret_access_key - cfg["vfs.s3.endpoint_override"] = aws_endpoint_override - cfg["vfs.s3.use_virtual_addressing"] = aws_use_virtual_addressing - cfg["vfs.s3.scheme"] = aws_scheme - cfg["vfs.s3.region"] = aws_region - cfg["vfs.s3.verify_ssl"] = aws_verify_ssl - cfg["memory_budget_mb"] = mem_budget_mb + cfg = ctx.obj["cfg"] + ds = tiledbvcf.Dataset(uri, mode="r", tiledb_config=cfg) logger.info("TileDBVCF dataset loaded") diff --git a/gwasstudio/cli/info.py b/gwasstudio/cli/info.py index 343f160..841ddd6 100644 --- a/gwasstudio/cli/info.py +++ b/gwasstudio/cli/info.py @@ -11,8 +11,8 @@ def info(): print("{}, version {}\n".format(__appname__.capitalize(), __version__)) - omnia_paths = {"log file": log_file} + paths = {"log file": log_file} print("Paths: ") - for k, v in omnia_paths.items(): + for k, v in paths.items(): print(" {}: {}".format(k, v)) print("\n") diff --git a/gwasstudio/cli/ingest.py b/gwasstudio/cli/ingest.py index 19f3659..5bfda4e 100755 --- a/gwasstudio/cli/ingest.py +++ b/gwasstudio/cli/ingest.py @@ -2,7 +2,6 @@ import cloup import pathlib import tiledbvcf -import tiledb help_doc = """ Ingest GWAS-VCF data in a TileDB-VCF dataset. @@ -15,7 +14,7 @@ cloup.option( "--gwas-vcf-path", default=None, - help="Path to a folder where all the vcf.gz files to ingest are present", + help="Path to a folder where all the vcf.gz files to ingest are stored", ), cloup.option( "-a", @@ -24,62 +23,30 @@ ), cloup.option( "-to", - "--tiledb-path-out", + "--tiledb-out-path", default=None, - help="s3 path where the tiledb dataset is created", + help="s3 path for the tiledb dataset", ), ) @cloup.option_group( "TileDB configurations", cloup.option("-b", "--mem-budget-mb", default=20480, help="The memory budget in MiB when ingesting the data"), cloup.option("-tr", "--threads", default=16, help="The number fo threads used for ingestion"), - cloup.option("--aws-access-key-id", default=None, help="aws access key id"), - cloup.option("--aws-secret-access-key", default=None, help="aws access key"), - cloup.option("--aws-endpoint-override", default="https://storage.fht.org:9021", help="endpoint where to connect"), - cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"), - cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"), - cloup.option("--aws-region", default="", help="region where the s3 bucket is located"), - cloup.option("--aws-verify-ssl", default="false", help="if ssl verification is needed"), ) @click.pass_context -def ingest( - ctx, - gwas_vcf_path, - attrs, - tiledb_path_out, - mem_budget_mb, - threads, - aws_access_key_id, - aws_secret_access_key, - aws_endpoint_override, - aws_use_virtual_addressing, - aws_scheme, - aws_region, - aws_verify_ssl, -): +def ingest(ctx, gwas_vcf_path, attrs, tiledb_out_path, mem_budget_mb, threads): if ctx.obj["DISTRIBUTE"]: pass else: _attrs = [a.strip() for a in attrs.split(",")] - cfg = tiledb.Config( - { - "vfs.s3.aws_access_key_id": aws_access_key_id, - "vfs.s3.aws_secret_access_key": aws_secret_access_key, - "vfs.s3.endpoint_override": aws_endpoint_override, - "vfs.s3.use_virtual_addressing": aws_use_virtual_addressing, - "vfs.s3.scheme": aws_scheme, - "vfs.s3.region": aws_region, - "vfs.s3.verify_ssl": aws_verify_ssl, - } - ) - read_cfg = tiledbvcf.ReadConfig(tiledb_config=cfg) + cfg = ctx.obj["cfg"] # vfs = tiledb.VFS(config=cfg) # if (vfs.is_dir(tiledb_path_out)): # print(f"Deleting existing array '{tiledb_path_out}'") # vfs.remove_dir(tiledb_path_out) # print("Done.") - ds = tiledbvcf.Dataset(tiledb_path_out, mode="w", cfg=read_cfg) - ds.create_dataset(extra_attrs=[attrs]) + ds = tiledbvcf.Dataset(tiledb_out_path, mode="w", tiledb_config=cfg) + ds.create_dataset(extra_attrs=[_attrs]) p = pathlib.Path(gwas_vcf_path) ds.ingest_samples( total_memory_budget_mb=mem_budget_mb, diff --git a/gwasstudio/cli/query.py b/gwasstudio/cli/query.py index a27f956..d194b83 100755 --- a/gwasstudio/cli/query.py +++ b/gwasstudio/cli/query.py @@ -1,6 +1,5 @@ import click import cloup -import tiledb import tiledbvcf help_doc = """ @@ -11,12 +10,6 @@ @cloup.command("query", no_args_is_help=True, help=help_doc) @cloup.option_group( "TileDB options", - cloup.option( - "-b", - "--mem-budget-mb", - default=20480, - help="The memory budget in MB when query a TileDB dataset", - ), cloup.option( "-i", "--information", @@ -26,45 +19,14 @@ ), cloup.option("-u", "--uri", help="TileDB-VCF dataset URI"), ) -@cloup.option_group( - "TileDB configurations", - cloup.option("-b", "--mem-budget-mb", default=20480, help="The memory budget in MB when ingesting the data"), - cloup.option("-t", "--threads", default=16, help="The number fo threwads used for ingestion"), - cloup.option("--aws-access-key-id", default=None, help="aws access key id"), - cloup.option("--aws-secret-access-key", default=None, help="aws access key"), - cloup.option("--aws-endpoint-override", default="https://storage.fht.org:9021", help="endpoint where to connect"), - cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"), - cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"), - cloup.option("--aws-region", default="", help="region where the s3 bucket is located"), - cloup.option("--aws-verify-ssl", default="false", help="if ssl verification is needed"), -) @click.pass_context def query( ctx, - mem_budget_mb, - aws_access_key_id, - aws_secret_access_key, - aws_endpoint_override, - aws_use_virtual_addressing, - aws_scheme, - aws_region, - aws_verify_ssl, information, uri, ): - cfg = tiledbvcf.ReadConfig(memory_budget_mb=mem_budget_mb) - cfg = tiledb.Config( - { - "vfs.s3.aws_access_key_id": aws_access_key_id, - "vfs.s3.aws_secret_access_key": aws_secret_access_key, - "vfs.s3.endpoint_override": aws_endpoint_override, - "vfs.s3.use_virtual_addressing": aws_use_virtual_addressing, - "vfs.s3.scheme": aws_scheme, - "vfs.s3.region": aws_region, - "vfs.s3.verify_ssl": aws_verify_ssl, - } - ) - ds = tiledbvcf.Dataset(uri, mode="r", cfg=cfg) + cfg = ctx.obj["cfg"] + ds = tiledbvcf.Dataset(uri, mode="r", tiledb_config=cfg) if information == "samples": for s in ds.samples(): print(s) diff --git a/gwasstudio/dask_client.py b/gwasstudio/dask_client.py index f7fcc29..429690b 100644 --- a/gwasstudio/dask_client.py +++ b/gwasstudio/dask_client.py @@ -1,4 +1,3 @@ -import dask from dask.distributed import Client from dask_jobqueue import SLURMCluster as Cluster from gwasstudio import logger @@ -11,15 +10,20 @@ def __init__(self, **kwargs): _max = kwargs.get("maximum_workers") _mem = kwargs.get("memory_workers") _cpu = kwargs.get("cpu_workers") - logger.info(f"Dask cluster: starting from {_min} to {_max} workers, {_mem} of memory and {_cpu} cpus per worker") - cluster = Cluster(memory = _mem, cores = _cpu) + logger.info( + f"Dask cluster: starting from {_min} to {_max} workers, {_mem} of memory and {_cpu} cpus per worker" + ) + cluster = Cluster(memory=_mem, cores=_cpu) cluster.adapt(minimum=_min, maximum=_max) - client = Client(cluster) # Connect to that cluster - + self.client = Client(cluster) # Connect to that cluster + self.dashboard = "" # client.dashboard_link # dask.config.set({"dataframe.convert-string": False}) + def get_client(self): + return self.client + def get_dashboard(self): return self.dashboard diff --git a/gwasstudio/main.py b/gwasstudio/main.py index 7bbb274..6675d50 100644 --- a/gwasstudio/main.py +++ b/gwasstudio/main.py @@ -4,8 +4,9 @@ from gwasstudio import __appname__, __version__, context_settings, log_file, logger from gwasstudio.cli.export import export from gwasstudio.cli.info import info -from gwasstudio.cli.query import query from gwasstudio.cli.ingest import ingest +from gwasstudio.cli.query import query +from gwasstudio.dask_client import DaskClient as Client @cloup.group(name="main", help="GWASStudio", no_args_is_help=True, context_settings=context_settings) @@ -19,24 +20,67 @@ cloup.option("--memory_workers", help="Memory amount per worker", default="12G"), cloup.option("--cpu_workers", help="CPU numbers per worker", default=6), ) +@cloup.option_group( + "TileDB configurations", + cloup.option("--aws-access-key-id", default=None, help="aws access key id"), + cloup.option("--aws-secret-access-key", default=None, help="aws access key"), + cloup.option( + "--aws-endpoint-override", + default="https://storage.fht.org:9021", + help="endpoint where to connect", + ), + cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"), + cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"), + cloup.option("--aws-region", default="", help="region where the s3 bucket is located"), + cloup.option("--aws-verify-ssl", default="false", help="if ssl verfication is needed"), +) @click.pass_context -def cli_init(ctx, distribute, minimum_workers, maximum_workers, memory_workers, cpu_workers, quiet): +def cli_init( + ctx, + aws_access_key_id, + aws_secret_access_key, + aws_endpoint_override, + aws_use_virtual_addressing, + aws_scheme, + aws_region, + aws_verify_ssl, + distribute, + minimum_workers, + maximum_workers, + memory_workers, + cpu_workers, + quiet, +): if quiet: logger.add(log_file, level="INFO", retention="30 days") else: logger.add(log_file, level="DEBUG", retention="30 days") logger.info("{} started".format(__appname__.capitalize())) - # if distribute: - # client = Client( - # minimum_workers=minimum_workers, - # maximum_workers=maximum_workers, - # memory_workers=memory_workers, - # cpu_workers=cpu_workers, - # ) - # logger.info("Dask dashboard available at {}".format(client.get_dashboard())) + + cfg = { + "vfs.s3.aws_access_key_id": aws_access_key_id, + "vfs.s3.aws_secret_access_key": aws_secret_access_key, + "vfs.s3.endpoint_override": aws_endpoint_override, + "vfs.s3.use_virtual_addressing": aws_use_virtual_addressing, + "vfs.s3.scheme": aws_scheme, + "vfs.s3.region": aws_region, + "vfs.s3.verify_ssl": aws_verify_ssl, + } + ctx.ensure_object(dict) + ctx.obj["cfg"] = cfg ctx.obj["DISTRIBUTE"] = distribute + if distribute: + client = Client( + minimum_workers=minimum_workers, + maximum_workers=maximum_workers, + memory_workers=memory_workers, + cpu_workers=cpu_workers, + ).get_client() + ctx.obj["client"] = client + logger.info("Dask dashboard available at {}".format(client.get_dashboard())) + def main(): cli_init.add_command(info)