Skip to content

Commit

Permalink
Update process_datasets workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
KaiWaldrant committed Sep 20, 2024
1 parent b262a55 commit e2ffe14
Showing 1 changed file with 2 additions and 119 deletions.
121 changes: 2 additions & 119 deletions src/workflows/process_datasets/main.nf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
include { findArgumentSchema } from "${meta.resources_dir}/helper.nf"

workflow auto {
findStatesTemp(params, meta.config)
findStates(params, meta.config)
| meta.workflow.run(
auto: [publish: "state"]
)
Expand All @@ -14,7 +14,7 @@ workflow run_wf {
main:
output_ch = input_ch

| check_dataset_schema.run(
| verify_data_structure.run(
fromState: { id, state ->
def schema = findArgumentSchema(meta.config, "input")
def schemaYaml = tempFile("schema.yaml")
Expand Down Expand Up @@ -52,120 +52,3 @@ workflow run_wf {
emit:
output_ch
}

// temp fix for rename_keys typo

def findStatesTemp(Map params, Map config) {
def auto_config = deepClone(config)
def auto_params = deepClone(params)

auto_config = auto_config.clone()
// override arguments
auto_config.argument_groups = []
auto_config.arguments = [
[
type: "string",
name: "--id",
description: "A dummy identifier",
required: false
],
[
type: "file",
name: "--input_states",
example: "/path/to/input/directory/**/state.yaml",
description: "Path to input directory containing the datasets to be integrated.",
required: true,
multiple: true,
multiple_sep: ";"
],
[
type: "string",
name: "--filter",
example: "foo/.*/state.yaml",
description: "Regex to filter state files by path.",
required: false
],
// to do: make this a yaml blob?
[
type: "string",
name: "--rename_keys",
example: ["newKey1:oldKey1", "newKey2:oldKey2"],
description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.",
required: false,
multiple: true,
multiple_sep: ";"
],
[
type: "string",
name: "--settings",
example: '{"output_dataset": "dataset.h5ad", "k": 10}',
description: "Global arguments as a JSON glob to be passed to all components.",
required: false
]
]
if (!(auto_params.containsKey("id"))) {
auto_params["id"] = "auto"
}

// run auto config through processConfig once more
auto_config = processConfig(auto_config)

workflow findStatesTempWf {
helpMessage(auto_config)

output_ch =
channelFromParams(auto_params, auto_config)
| flatMap { autoId, args ->

def globalSettings = args.settings ? readYamlBlob(args.settings) : [:]

// look for state files in input dir
def stateFiles = args.input_states

// filter state files by regex
if (args.filter) {
stateFiles = stateFiles.findAll{ stateFile ->
def stateFileStr = stateFile.toString()
def matcher = stateFileStr =~ args.filter
matcher.matches()}
}

// read in states
def states = stateFiles.collect { stateFile ->
def state_ = readTaggedYaml(stateFile)
[state_.id, state_]
}

// construct renameMap
if (args.rename_keys) {
def renameMap = args.rename_keys.collectEntries{renameString ->
def split = renameString.split(":")
assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey,newKey:oldKey'"
split
}

// rename keys in state, only let states through which have all keys
// also add global settings
states = states.collectMany{id, state ->
def newState = [:]

for (key in renameMap.keySet()) {
def origKey = renameMap[key]
if (!(state.containsKey(origKey))) {
return []
}
newState[key] = state[origKey]
}

[[id, globalSettings + newState]]
}
}

states
}
emit:
output_ch
}

return findStatesTempWf
}

0 comments on commit e2ffe14

Please sign in to comment.