Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion toolchest_client/tools/shi7.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, tool_args, output_name, inputs, output_path):
max_inputs=10, # artificially constrained at 10 for now
parallel_enabled=True,
group_paired_ends=True,
max_input_bytes_per_node=1.5 * 1024 * 1024 * 1024,
max_input_bytes_per_file=1.5 * 1024 * 1024 * 1024,
)

def _merge_outputs(self, output_file_paths):
Expand Down
2 changes: 1 addition & 1 deletion toolchest_client/tools/star.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, tool_args, output_name, inputs, input_prefix_mapping,
database_name=database_name,
database_version=database_version,
parallel_enabled=True,
max_input_bytes_per_node=4.5 * 1024 * 1024 * 1024,
max_input_bytes_per_file=4.5 * 1024 * 1024 * 1024,
output_type=OutputType.SAM_FILE, # In many cases, this will change (e.g. a dangerous argument is passed)
)

Expand Down
2 changes: 1 addition & 1 deletion toolchest_client/tools/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ def __init__(self, tool_args, output_name, inputs, output_path):
inputs=inputs,
min_inputs=1,
max_inputs=100, # this limit is completely arbitrary
max_input_bytes_per_node=256 * 1024 * 1024 * 1024,
max_input_bytes_per_file=256 * 1024 * 1024 * 1024,
)
14 changes: 7 additions & 7 deletions toolchest_client/tools/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, tool_name, tool_version, tool_args, output_name,
output_path, inputs, min_inputs, max_inputs,
database_name=None, database_version=None,
input_prefix_mapping=None, parallel_enabled=False,
max_input_bytes_per_node=FOUR_POINT_FIVE_GIGABYTES,
max_input_bytes_per_file=FOUR_POINT_FIVE_GIGABYTES,
group_paired_ends=False, compress_inputs=False,
output_type=OutputType.FLAT_TEXT, output_is_directory=False):
self.tool_name = tool_name
Expand All @@ -56,7 +56,7 @@ def __init__(self, tool_name, tool_version, tool_args, output_name,
self.output_validation_enabled = True
self.group_paired_ends = group_paired_ends
self.compress_inputs = compress_inputs
self.max_input_bytes_per_node = max_input_bytes_per_node
self.max_input_bytes_per_file = max_input_bytes_per_file
self.query_threads = []
self.query_thread_statuses = dict()
self.terminating = False
Expand Down Expand Up @@ -308,7 +308,7 @@ def _generate_jobs(self, should_run_in_parallel):
# Arbitrary parallelization – assume only one input file which is to be split
adjusted_input_file_paths = split_file_by_lines(
input_file_path=self.input_files[0],
max_bytes=self.max_input_bytes_per_node,
max_bytes=self.max_input_bytes_per_file,
)
for _, file_path in adjusted_input_file_paths:
# This is assuming only one input file per parallel run.
Expand All @@ -318,15 +318,15 @@ def _generate_jobs(self, should_run_in_parallel):
# Grouped parallelization. Right now, this only supports grouping by R1/R2 for paired-end inputs
input_file_paths_pairs = split_paired_files_by_lines(
input_file_paths=self.input_files,
max_bytes=self.max_input_bytes_per_node,
max_bytes=self.max_input_bytes_per_file,
)
for input_file_path_pair in input_file_paths_pairs:
yield input_file_path_pair

else:
# Make sure we're below plan/multi-part limit for non-splittable files
# Make sure we're below tool limit for non-splittable files
for file_path in self.input_files:
check_file_size(file_path, max_size_bytes=FOUR_POINT_FIVE_GIGABYTES)
check_file_size(file_path, max_size_bytes=self.max_input_bytes_per_file)
# Note that for a tool like Unicycler, this would look like:
# [["r1.fastq", "r2.fastq", "unassembled.fasta"]]
# As there are multiple input files required for the job
Expand All @@ -348,7 +348,7 @@ def run(self):
should_run_in_parallel = self.parallel_enabled \
and not any(inputs_are_in_s3(self.input_files)) \
and (self.group_paired_ends or self.num_input_files == 1) \
and check_file_size(self.input_files[0]) > self.max_input_bytes_per_node \
and check_file_size(self.input_files[0]) > self.max_input_bytes_per_file \
and self._system_supports_parallel_execution()

jobs = self._generate_jobs(should_run_in_parallel)
Expand Down