Skip to content

Commit

Permalink
simplify s3 configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gmauro committed Sep 9, 2024
1 parent b62fdbd commit 20f8e3f
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 150 deletions.
68 changes: 15 additions & 53 deletions gwasstudio/cli/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,75 +15,51 @@
@cloup.command("export", no_args_is_help=True, help=help_doc)
@cloup.option_group(
"Options for Locusbreaker",
cloup.option("--locus-breaker", is_flag=True, default=False, help="Flag to use locus breaker"),
cloup.option("--pvalue_sig", default=5, help="pvalue threshold to use for filtering the data"),
cloup.option("--pvalue_limit", default=5, help="pvalue threshold to use for filtering the data"),
cloup.option(
"--hole_size",
default=250000,
help="Minimum pair-base distance between SNPs in different loci (default: 250000)",
),
cloup.option("--locus-breaker", is_flag=True, default=False, help="Flag to use locus breaker"),
cloup.option("--pvalue_sig", default=5, help="pvalue threshold to use for filtering the data"),
cloup.option("--pvalue_limit", default=5, help="pvalue threshold to use for filtering the data"),
)
@cloup.option_group(
"Options for filtering using a list od SNPs ids",
cloup.option(
"-s", "--snp_list", is_flag=True, default=False, help="A txt file with a column containing the SNP ids"
),
cloup.option("--snp_list", is_flag=True, default=False, help="A txt file with a column containing the SNP ids"),
)
@cloup.option_group(
"TileDBVCF options",
cloup.option(
"-c", "--columns", default=False, help="List of columns to keep, provided as a single string comma separated"
),
cloup.option("-s", "--samples", default=False, help="A path of a txt file containing 1 sample name per line"),
cloup.option("-g", "--genome-version", help="Genome version to be used (either hg19 or hg38)", default="hg19"),
# Not yet implemented
cloup.option(
"-c",
"--chromosome",
help="Chromosomes list to use during processing. This can be a list of chromosomes separated by comma (Example: 1,2,3,4)",
default="1",
),
cloup.option("--window-size", default=50000000, help="Widnow size used by tiledbvcf for later queries"),
cloup.option(
"-s",
"--sample-partitions",
help="how many partition to divide the sample list with for computation (default is 1)",
default=1,
),
cloup.option("-g", "--genome-version", help="Genome version to be used (either hg19 or hg38)", default="hg19"),
cloup.option("-o", "--output-path", default="output", help="The name of the output file"),
cloup.option(
"-r",
"--region-partitions",
help="how many partition to divide the region list with for computation (default is 20)",
default=20,
),
cloup.option("-u", "--uri", default=False, help="TileDB-VCF dataset URI"),
cloup.option("-o", "--output-path", default="output", help="The name of the output file"),
)
@cloup.option_group(
"TileDB configurations",
cloup.option(
"-b",
"--mem-budget-mb",
default=20480,
help="The memory budget in MB when ingesting the data",
),
cloup.option("--aws-access-key-id", default=None, help="aws access key id"),
cloup.option("--aws-secret-access-key", default=None, help="aws access key"),
cloup.option("-s", "--samples", default=False, help="A path of a txt file containing 1 sample name per line"),
cloup.option(
"--aws-endpoint-override",
default="https://storage.fht.org:9021",
help="endpoint where to connect",
"--sample-partitions",
help="how many partition to divide the sample list with for computation (default is 1)",
default=1,
),
cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"),
cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"),
cloup.option("--aws-region", default="", help="region where the s3 bucket is located"),
cloup.option("--aws-verify-ssl", default="false", help="if ssl verfication is needed"),
cloup.option("-u", "--uri", default=False, help="TileDB-VCF dataset URI"),
cloup.option("--window-size", default=50000000, help="Window size used by tiledbvcf for later queries"),
)
@click.pass_context
def export(
ctx,
columns,
mem_budget_mb,
pvalue_limit,
pvalue_sig,
hole_size,
Expand All @@ -96,23 +72,9 @@ def export(
region_partitions,
samples,
uri,
aws_access_key_id,
aws_secret_access_key,
aws_endpoint_override,
aws_use_virtual_addressing,
aws_scheme,
aws_region,
aws_verify_ssl,
):
cfg = {}
cfg["vfs.s3.aws_access_key_id"] = aws_access_key_id
cfg["vfs.s3.aws_secret_access_key"] = aws_secret_access_key
cfg["vfs.s3.endpoint_override"] = aws_endpoint_override
cfg["vfs.s3.use_virtual_addressing"] = aws_use_virtual_addressing
cfg["vfs.s3.scheme"] = aws_scheme
cfg["vfs.s3.region"] = aws_region
cfg["vfs.s3.verify_ssl"] = aws_verify_ssl
cfg["memory_budget_mb"] = mem_budget_mb
cfg = ctx.obj["cfg"]

ds = tiledbvcf.Dataset(uri, mode="r", tiledb_config=cfg)
logger.info("TileDBVCF dataset loaded")

Expand Down
4 changes: 2 additions & 2 deletions gwasstudio/cli/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
def info():
print("{}, version {}\n".format(__appname__.capitalize(), __version__))

omnia_paths = {"log file": log_file}
paths = {"log file": log_file}
print("Paths: ")
for k, v in omnia_paths.items():
for k, v in paths.items():
print(" {}: {}".format(k, v))
print("\n")
47 changes: 7 additions & 40 deletions gwasstudio/cli/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import cloup
import pathlib
import tiledbvcf
import tiledb

help_doc = """
Ingest GWAS-VCF data in a TileDB-VCF dataset.
Expand All @@ -15,7 +14,7 @@
cloup.option(
"--gwas-vcf-path",
default=None,
help="Path to a folder where all the vcf.gz files to ingest are present",
help="Path to a folder where all the vcf.gz files to ingest are stored",
),
cloup.option(
"-a",
Expand All @@ -24,62 +23,30 @@
),
cloup.option(
"-to",
"--tiledb-path-out",
"--tiledb-out-path",
default=None,
help="s3 path where the tiledb dataset is created",
help="s3 path for the tiledb dataset",
),
)
@cloup.option_group(
"TileDB configurations",
cloup.option("-b", "--mem-budget-mb", default=20480, help="The memory budget in MiB when ingesting the data"),
cloup.option("-tr", "--threads", default=16, help="The number fo threads used for ingestion"),
cloup.option("--aws-access-key-id", default=None, help="aws access key id"),
cloup.option("--aws-secret-access-key", default=None, help="aws access key"),
cloup.option("--aws-endpoint-override", default="https://storage.fht.org:9021", help="endpoint where to connect"),
cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"),
cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"),
cloup.option("--aws-region", default="", help="region where the s3 bucket is located"),
cloup.option("--aws-verify-ssl", default="false", help="if ssl verification is needed"),
)
@click.pass_context
def ingest(
ctx,
gwas_vcf_path,
attrs,
tiledb_path_out,
mem_budget_mb,
threads,
aws_access_key_id,
aws_secret_access_key,
aws_endpoint_override,
aws_use_virtual_addressing,
aws_scheme,
aws_region,
aws_verify_ssl,
):
def ingest(ctx, gwas_vcf_path, attrs, tiledb_out_path, mem_budget_mb, threads):
if ctx.obj["DISTRIBUTE"]:
pass
else:
_attrs = [a.strip() for a in attrs.split(",")]
cfg = tiledb.Config(
{
"vfs.s3.aws_access_key_id": aws_access_key_id,
"vfs.s3.aws_secret_access_key": aws_secret_access_key,
"vfs.s3.endpoint_override": aws_endpoint_override,
"vfs.s3.use_virtual_addressing": aws_use_virtual_addressing,
"vfs.s3.scheme": aws_scheme,
"vfs.s3.region": aws_region,
"vfs.s3.verify_ssl": aws_verify_ssl,
}
)
read_cfg = tiledbvcf.ReadConfig(tiledb_config=cfg)
cfg = ctx.obj["cfg"]
# vfs = tiledb.VFS(config=cfg)
# if (vfs.is_dir(tiledb_path_out)):
# print(f"Deleting existing array '{tiledb_path_out}'")
# vfs.remove_dir(tiledb_path_out)
# print("Done.")
ds = tiledbvcf.Dataset(tiledb_path_out, mode="w", cfg=read_cfg)
ds.create_dataset(extra_attrs=[attrs])
ds = tiledbvcf.Dataset(tiledb_out_path, mode="w", tiledb_config=cfg)
ds.create_dataset(extra_attrs=[_attrs])
p = pathlib.Path(gwas_vcf_path)
ds.ingest_samples(
total_memory_budget_mb=mem_budget_mb,
Expand Down
42 changes: 2 additions & 40 deletions gwasstudio/cli/query.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import click
import cloup
import tiledb
import tiledbvcf

help_doc = """
Expand All @@ -11,12 +10,6 @@
@cloup.command("query", no_args_is_help=True, help=help_doc)
@cloup.option_group(
"TileDB options",
cloup.option(
"-b",
"--mem-budget-mb",
default=20480,
help="The memory budget in MB when query a TileDB dataset",
),
cloup.option(
"-i",
"--information",
Expand All @@ -26,45 +19,14 @@
),
cloup.option("-u", "--uri", help="TileDB-VCF dataset URI"),
)
@cloup.option_group(
"TileDB configurations",
cloup.option("-b", "--mem-budget-mb", default=20480, help="The memory budget in MB when ingesting the data"),
cloup.option("-t", "--threads", default=16, help="The number fo threwads used for ingestion"),
cloup.option("--aws-access-key-id", default=None, help="aws access key id"),
cloup.option("--aws-secret-access-key", default=None, help="aws access key"),
cloup.option("--aws-endpoint-override", default="https://storage.fht.org:9021", help="endpoint where to connect"),
cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"),
cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"),
cloup.option("--aws-region", default="", help="region where the s3 bucket is located"),
cloup.option("--aws-verify-ssl", default="false", help="if ssl verification is needed"),
)
@click.pass_context
def query(
ctx,
mem_budget_mb,
aws_access_key_id,
aws_secret_access_key,
aws_endpoint_override,
aws_use_virtual_addressing,
aws_scheme,
aws_region,
aws_verify_ssl,
information,
uri,
):
cfg = tiledbvcf.ReadConfig(memory_budget_mb=mem_budget_mb)
cfg = tiledb.Config(
{
"vfs.s3.aws_access_key_id": aws_access_key_id,
"vfs.s3.aws_secret_access_key": aws_secret_access_key,
"vfs.s3.endpoint_override": aws_endpoint_override,
"vfs.s3.use_virtual_addressing": aws_use_virtual_addressing,
"vfs.s3.scheme": aws_scheme,
"vfs.s3.region": aws_region,
"vfs.s3.verify_ssl": aws_verify_ssl,
}
)
ds = tiledbvcf.Dataset(uri, mode="r", cfg=cfg)
cfg = ctx.obj["cfg"]
ds = tiledbvcf.Dataset(uri, mode="r", tiledb_config=cfg)
if information == "samples":
for s in ds.samples():
print(s)
Expand Down
14 changes: 9 additions & 5 deletions gwasstudio/dask_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dask
from dask.distributed import Client
from dask_jobqueue import SLURMCluster as Cluster
from gwasstudio import logger
Expand All @@ -11,15 +10,20 @@ def __init__(self, **kwargs):
_max = kwargs.get("maximum_workers")
_mem = kwargs.get("memory_workers")
_cpu = kwargs.get("cpu_workers")
logger.info(f"Dask cluster: starting from {_min} to {_max} workers, {_mem} of memory and {_cpu} cpus per worker")
cluster = Cluster(memory = _mem, cores = _cpu)
logger.info(
f"Dask cluster: starting from {_min} to {_max} workers, {_mem} of memory and {_cpu} cpus per worker"
)
cluster = Cluster(memory=_mem, cores=_cpu)

cluster.adapt(minimum=_min, maximum=_max)
client = Client(cluster) # Connect to that cluster
self.client = Client(cluster) # Connect to that cluster

self.dashboard = "" # client.dashboard_link

# dask.config.set({"dataframe.convert-string": False})

def get_client(self):
return self.client

def get_dashboard(self):
return self.dashboard
64 changes: 54 additions & 10 deletions gwasstudio/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from gwasstudio import __appname__, __version__, context_settings, log_file, logger
from gwasstudio.cli.export import export
from gwasstudio.cli.info import info
from gwasstudio.cli.query import query
from gwasstudio.cli.ingest import ingest
from gwasstudio.cli.query import query
from gwasstudio.dask_client import DaskClient as Client


@cloup.group(name="main", help="GWASStudio", no_args_is_help=True, context_settings=context_settings)
Expand All @@ -19,24 +20,67 @@
cloup.option("--memory_workers", help="Memory amount per worker", default="12G"),
cloup.option("--cpu_workers", help="CPU numbers per worker", default=6),
)
@cloup.option_group(
"TileDB configurations",
cloup.option("--aws-access-key-id", default=None, help="aws access key id"),
cloup.option("--aws-secret-access-key", default=None, help="aws access key"),
cloup.option(
"--aws-endpoint-override",
default="https://storage.fht.org:9021",
help="endpoint where to connect",
),
cloup.option("--aws-use-virtual-addressing", default="false", help="virtual address option"),
cloup.option("--aws-scheme", default="https", help="type of scheme used at the endpoint"),
cloup.option("--aws-region", default="", help="region where the s3 bucket is located"),
cloup.option("--aws-verify-ssl", default="false", help="if ssl verfication is needed"),
)
@click.pass_context
def cli_init(ctx, distribute, minimum_workers, maximum_workers, memory_workers, cpu_workers, quiet):
def cli_init(
ctx,
aws_access_key_id,
aws_secret_access_key,
aws_endpoint_override,
aws_use_virtual_addressing,
aws_scheme,
aws_region,
aws_verify_ssl,
distribute,
minimum_workers,
maximum_workers,
memory_workers,
cpu_workers,
quiet,
):
if quiet:
logger.add(log_file, level="INFO", retention="30 days")
else:
logger.add(log_file, level="DEBUG", retention="30 days")
logger.info("{} started".format(__appname__.capitalize()))
# if distribute:
# client = Client(
# minimum_workers=minimum_workers,
# maximum_workers=maximum_workers,
# memory_workers=memory_workers,
# cpu_workers=cpu_workers,
# )
# logger.info("Dask dashboard available at {}".format(client.get_dashboard()))

cfg = {
"vfs.s3.aws_access_key_id": aws_access_key_id,
"vfs.s3.aws_secret_access_key": aws_secret_access_key,
"vfs.s3.endpoint_override": aws_endpoint_override,
"vfs.s3.use_virtual_addressing": aws_use_virtual_addressing,
"vfs.s3.scheme": aws_scheme,
"vfs.s3.region": aws_region,
"vfs.s3.verify_ssl": aws_verify_ssl,
}

ctx.ensure_object(dict)
ctx.obj["cfg"] = cfg
ctx.obj["DISTRIBUTE"] = distribute

if distribute:
client = Client(
minimum_workers=minimum_workers,
maximum_workers=maximum_workers,
memory_workers=memory_workers,
cpu_workers=cpu_workers,
).get_client()
ctx.obj["client"] = client
logger.info("Dask dashboard available at {}".format(client.get_dashboard()))


def main():
cli_init.add_command(info)
Expand Down

0 comments on commit 20f8e3f

Please sign in to comment.