Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC for CID store annotations and workflow outputs structure #5885

Open
wants to merge 13 commits into
base: cid-store
Choose a base branch
from
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ task compile {

def getRuntimeConfigs() {
def names = subprojects
.findAll { prj -> prj.name in ['nextflow','nf-cid','nf-commons','nf-httpfs','nf-lang'] }
.findAll { prj -> prj.name in ['nextflow','nf-commons','nf-httpfs','nf-lang', 'nf-cid'] }
.collect { it.name }

FileCollection result = null
Expand All @@ -263,7 +263,7 @@ task exportClasspath {
def home = System.getProperty('user.home')
def all = getRuntimeConfigs()
def libs = all.collect { File file -> /*println file.canonicalPath.replace(home, '$HOME');*/ file.canonicalPath; }
['nextflow','nf-cid','nf-commons','nf-httpfs','nf-lang'].each {libs << file("modules/$it/build/libs/${it}-${version}.jar").canonicalPath }
['nextflow','nf-commons','nf-httpfs','nf-lang', 'nf-cid'].each {libs << file("modules/$it/build/libs/${it}-${version}.jar").canonicalPath }
file('.launch.classpath').text = libs.unique().join(':')
}
}
Expand All @@ -276,7 +276,7 @@ ext.nexusEmail = project.findProperty('nexusEmail')
// `signing.keyId` property needs to be defined in the `gradle.properties` file
ext.enableSignArchives = project.findProperty('signing.keyId')

ext.coreProjects = projects( ':nextflow', ':nf-cid', ':nf-commons', ':nf-httpfs', ':nf-lang' )
ext.coreProjects = projects( ':nextflow', ':nf-commons', ':nf-httpfs', ':nf-lang', ':nf-cid' )

configure(coreProjects) {
group = 'io.nextflow'
Expand Down
8 changes: 4 additions & 4 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1126,22 +1126,22 @@ class Session implements ISession {
}
}

void notifyWorkflowPublish(Object value) {
void notifyWorkflowPublish(String name, Object value) {
for( final observer : observers ) {
try {
observer.onWorkflowPublish(value)
observer.onWorkflowPublish(name, value)
}
catch( Exception e ) {
log.error "Failed to invoke observer on workflow publish: $observer", e
}
}
}

void notifyFilePublish(Path destination, Path source=null) {
void notifyFilePublish(Path destination, Path source, Map annotations) {
def copy = new ArrayList<TraceObserver>(observers)
for( TraceObserver observer : copy ) {
try {
observer.onFilePublish(destination, source)
observer.onFilePublish(destination, source, annotations)
}
catch( Exception e ) {
log.error "Failed to invoke observer on file publish: $observer", e
Expand Down
30 changes: 30 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CmdCid extends CmdBase implements UsageAware {
void log(ConfigMap config)
void show(ConfigMap config, List<String> args)
void lineage(ConfigMap config, List<String> args)
void diff(ConfigMap config, List<String> args)
}

interface SubCmd {
Expand All @@ -62,6 +63,7 @@ class CmdCid extends CmdBase implements UsageAware {
commands << new CmdLog()
commands << new CmdShow()
commands << new CmdLineage()
commands << new CmdDiff()
}

@Parameter(hidden = true)
Expand Down Expand Up @@ -228,4 +230,32 @@ class CmdCid extends CmdBase implements UsageAware {
}

}

class CmdDiff implements SubCmd {

@Override
String getName() { 'diff' }

@Override
String getDescription() {
return 'Show differences between two CID descriptions'
}

void apply(List<String> args) {
if (args.size() != 2) {
println("ERROR: Incorrect number of parameters")
usage()
return
}
operation.diff(config, args)
}

@Override
void usage() {
println description
println "Usage: nextflow $NAME $name <CID 1> <CID 2>"
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class PublishOp {

private Session session

private String name

private DataflowReadChannel source

private Map opts
Expand All @@ -52,8 +54,9 @@ class PublishOp {

private volatile boolean complete

PublishOp(Session session, DataflowReadChannel source, Map opts) {
PublishOp(Session session, String name, DataflowReadChannel source, Map opts) {
this.session = session
this.name = name
this.source = source
this.opts = opts
this.path = opts.path as String
Expand Down Expand Up @@ -89,13 +92,14 @@ class PublishOp {
if( targetResolver == null )
return

// emit workflow publish event
session.notifyWorkflowPublish(value)

// create publisher
final overrides = targetResolver instanceof Closure
? [saveAs: targetResolver]
: [path: targetResolver]
if (opts.annotations instanceof Closure){
final annotations = opts.annotations as Closure
overrides.annotations = annotations.call(value) as Map
}
final publisher = PublishDir.create(opts + overrides)

// publish files
Expand All @@ -106,13 +110,10 @@ class PublishOp {
publisher.apply(files, sourceDir)
}

// append record to index file
if( indexOpts ) {
final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value
final normalized = normalizePaths(record, targetResolver)
log.trace "Normalized record for index file: ${normalized}"
indexRecords << normalized
}
// append record to index
final normalized = normalizePaths(value, targetResolver)
log.trace "Normalized record for index file: ${normalized}"
indexRecords << normalized
}

/**
Expand Down Expand Up @@ -151,12 +152,21 @@ class PublishOp {
}

/**
* Once all values have been published, write the
* index file (if enabled).
* Once all values have been published, publish the index
* and write it to a file (if enabled).
*/
protected void onComplete(nope) {
if( indexOpts && indexRecords.size() > 0 ) {
log.trace "Saving records to index file: ${indexRecords}"
// publish individual record if source is a value channel
final index = CH.isValue(source)
? indexRecords.first()
: indexRecords

// publish workflow output
session.notifyWorkflowPublish(name, index)

// write index file
if( indexOpts && index ) {
log.trace "Saving records to index file: ${index}"
final indexPath = indexOpts.path
final ext = indexPath.getExtension()
indexPath.parent.mkdirs()
Expand All @@ -169,7 +179,7 @@ class PublishOp {
else {
log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be 'csv' or 'json'"
}
session.notifyFilePublish(indexPath)
session.notifyFilePublish(indexPath, null, opts.tags as Map)
}

log.trace "Publish operator complete"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ class PublishDir {
*/
private def tags

/**
* Annotations to be associated to the target file
*/
private Map annotations

/**
* The content type of the file. Currently only supported by AWS S3.
* This can be either a MIME type content type string or a Boolean value
Expand Down Expand Up @@ -211,6 +216,9 @@ class PublishDir {
if( params.tags != null )
result.tags = params.tags

if( params.annotations != null )
result.annotations = params.annotations as Map

if( params.contentType instanceof Boolean )
result.contentType = params.contentType
else if( params.contentType )
Expand Down Expand Up @@ -581,7 +589,7 @@ class PublishDir {
}

protected void notifyFilePublish(Path destination, Path source=null) {
session.notifyFilePublish(destination, source)
session.notifyFilePublish(destination, source, annotations)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class OutputDsl {
final opts = publishOptions(name, defaults, overrides)

if( opts.enabled == null || opts.enabled )
ops << new PublishOp(session, CH.getReadChannel(mixed), opts).apply()
ops << new PublishOp(session, name, CH.getReadChannel(mixed), opts).apply()
}
}

Expand Down Expand Up @@ -171,6 +171,14 @@ class OutputDsl {
setOption('tags', value)
}

void annotations(Map value) {
setOption('annotations', value)
}

void annotations(Closure value) {
setOption('annotations', value)
}
Comment on lines +174 to +180
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: decide whether to use tags or phase it out in favor of annotations


private void setOption(String name, Object value) {
if( opts.containsKey(name) )
throw new ScriptRuntimeException("Publish option `${name}` cannot be defined more than once for a given target")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,15 @@ trait TraceObserver {
void onFlowError(TaskHandler handler, TraceRecord trace){}

/**
* Method that is invoked when a value is published from a channel.
* Method that is invoked when a workflow output is published.
*
* @param name
* The name of the workflow output
* @param value
* A list if the published channel was a queue channel,
* otherwise an object if the channel was a value channel
*/
void onWorkflowPublish(Object value){}
void onWorkflowPublish(String name, Object value){}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the workflow run name? is it really necessary ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the name of the workflow output, of which there can be several

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of workflow output target, right? it would be better declare it as

onWorkflowPublish(Object value) {
  onWorkflowPublish(value, null) 
}

onWorkflowPublish(Object value, String name) { } 

to no break the API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think that is needed because it is an undocumented preview feature (part of workflow outputs preview)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the name given to the output in the workflow. We use it as key for the outputs map, and it is also used to refer to an specific output by cid:///#output.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the API change, let's move the discussion to #5909 which is the source of truth for this change


/**
* Method that is invoke when an output file is published
Expand All @@ -150,4 +154,15 @@ trait TraceObserver {
void onFilePublish(Path destination, Path source){
onFilePublish(destination)
}
/**
* Method that is invoked when a output file is annotated
* @param destination
* The destination path at `publishDir` folder.
* @param annotations
* The annotations attached to this file
*/
void onFilePublish(Path destination, Path source, Map annotations){
onFilePublish(destination, source)
}

}
53 changes: 47 additions & 6 deletions modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package nextflow.cli

import nextflow.data.cid.serde.CidEncoder

import java.nio.file.Files

import nextflow.SysEnv
Expand All @@ -28,11 +30,13 @@ import nextflow.data.cid.model.Parameter
import nextflow.data.cid.model.TaskOutput
import nextflow.data.cid.model.TaskRun
import nextflow.data.cid.model.WorkflowOutput
import nextflow.data.cid.serde.CidEncoder
import nextflow.plugin.Plugins
import org.junit.Rule
import spock.lang.Specification
import test.OutputCapture

import java.time.Instant

/**
* CLI cid Tests
*
Expand Down Expand Up @@ -132,9 +136,10 @@ class CmdCidTest extends Specification {
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}
def time = Instant.ofEpochMilli(123456789).toString()
def encoder = new CidEncoder().withPrettyPrint(true)
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, 123456789, 123456789, null)
"cid://123987/file.bam", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
cidFile.text = jsonSer
Expand Down Expand Up @@ -177,7 +182,7 @@ class CmdCidTest extends Specification {

then:
stdout.size() == 1
stdout[0] == "No entry found for cid://12345."
stdout[0] == "No entries found for cid://12345."

cleanup:
folder?.deleteDir()
Expand All @@ -203,19 +208,20 @@ class CmdCidTest extends Specification {
Files.createDirectories(cidFile4.parent)
Files.createDirectories(cidFile5.parent)
def encoder = new CidEncoder()
def time = Instant.ofEpochMilli(123456789).toString()
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, 123456789, 123456789, null)
"cid://123987/file.bam", 1234, time, time, null)
cidFile.text = encoder.encode(entry)
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987", 1234, 123456789, 123456789, null)
"cid://123987", 1234, time, time, null)
cidFile2.text = encoder.encode(entry)
entry = new TaskRun("u345-2346-1stw2", "foo", new Checksum("abcde2345","nextflow","standard"),
[new Parameter( "ValueInParam", "sample_id","ggal_gut"),
new Parameter("FileInParam","reads",["cid://45678/output.txt"])],
null, null, null, null, [:],[], null)
cidFile3.text = encoder.encode(entry)
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://45678", 1234, 123456789, 123456789, null)
"cid://45678", 1234, time, time, null)
cidFile4.text = encoder.encode(entry)
entry = new TaskRun("u345-2346-1stw2", "bar", new Checksum("abfs2556","nextflow","standard"),
null,null, null, null, null, [:],[], null)
Expand Down Expand Up @@ -258,4 +264,39 @@ class CmdCidTest extends Specification {

}

def 'should show query results'(){
given:
def folder = Files.createTempDirectory('test').toAbsolutePath()
def configFile = folder.resolve('nextflow.config')
configFile.text = "workflow.data.enabled = true\nworkflow.data.store.location = '$folder'".toString()
def cidFile = folder.resolve(".meta/12345/.data.json")
Files.createDirectories(cidFile.parent)
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}
def encoder = new CidEncoder().withPrettyPrint(true)
def time = Instant.ofEpochMilli(123456789).toString()
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
cidFile.text = jsonSer
when:
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid:///?type=WorkflowOutput"])
cidCmd.run()
def stdout = capture
.toString()
.readLines()// remove the log part
.findResults { line -> !line.contains('DEBUG') ? line : null }
.findResults { line -> !line.contains('INFO') ? line : null }
.findResults { line -> !line.contains('plugin') ? line : null }

then:
stdout.size() == expectedOutput.readLines().size()
stdout.join('\n') == expectedOutput

cleanup:
folder?.deleteDir()
}

}
Loading