Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serde interfaces #5893

Merged
merged 6 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 32 additions & 41 deletions modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@

package nextflow.cli

import groovy.json.JsonOutput

import java.nio.file.Files

import nextflow.SysEnv
import nextflow.dag.MermaidHtmlRenderer
import nextflow.data.cid.CidHistoryRecord
import nextflow.data.cid.CidStoreFactory
import nextflow.data.cid.model.Checksum
import nextflow.data.cid.model.Parameter
import nextflow.data.cid.model.TaskOutput
import nextflow.data.cid.model.TaskRun
import nextflow.data.cid.model.WorkflowOutput
import nextflow.data.cid.serde.CidEncoder
import nextflow.plugin.Plugins

import org.junit.Rule
import spock.lang.Specification
import test.OutputCapture

/**
* CLI cid Tests
*
Expand Down Expand Up @@ -130,16 +132,12 @@ class CmdCidTest extends Specification {
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}

def recordEntry = JsonOutput.prettyPrint('{"type":"WorkflowOutput",' +
'"path":"/path/to/file",' +
'"checksum":"45372qe",' +
'"source":"cid://123987/file.bam",' +
'"size": 1234,' +
'"createdAt": 123456789,' +
'"modifiedAt": 123456789,' +
'"annotations":null}')
cidFile.text = recordEntry
def encoder = new CidEncoder().withPrettyPrint(true)
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, 123456789, 123456789, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
cidFile.text = jsonSer
when:
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid://12345"])
cidCmd.run()
Expand All @@ -151,8 +149,8 @@ class CmdCidTest extends Specification {
.findResults { line -> !line.contains('plugin') ? line : null }

then:
stdout.size() == recordEntry.readLines().size()
stdout.join('\n') == recordEntry
stdout.size() == expectedOutput.readLines().size()
stdout.join('\n') == expectedOutput

cleanup:
folder?.deleteDir()
Expand Down Expand Up @@ -204,31 +202,24 @@ class CmdCidTest extends Specification {
Files.createDirectories(cidFile3.parent)
Files.createDirectories(cidFile4.parent)
Files.createDirectories(cidFile5.parent)

def recordEntry = JsonOutput.prettyPrint('{"type":"WorkflowOutput",' +
'"path":"/path/to/file","checksum":"45372qe","source":"cid://123987/file.bam",' +
'"size": 1234,"createdAt": 123456789, "modifiedAt": 123456789,"annotations":null}')
cidFile.text = recordEntry
recordEntry = JsonOutput.prettyPrint('{"type":"TaskOutput",' +
'"path":"/path/to/file","checksum":"45372qe","source":"cid://123987",' +
'"size": 1234,"createdAt": 123456789,"modifiedAt": 123456789,"annotations":null}')
cidFile2.text = recordEntry
recordEntry = JsonOutput.prettyPrint('{"type":"TaskRun",' +
'"sessionId":"u345-2346-1stw2", "name":"foo","code":"abcde2345",' +
'"inputs": [{"type": "ValueInParam","name": "sample_id","value": "ggal_gut"},' +
'{"type": "FileInParam","name": "reads","value": ["cid://45678/output.txt"]}],' +
'"container": null,"conda": null,"spack": null,"architecture": null,' +
'"globalVars": {},"binEntries": [],"annotations":null}')
cidFile3.text = recordEntry
recordEntry = JsonOutput.prettyPrint('{"type":"TaskOutput",' +
'"path":"/path/to/file","checksum":"45372qe","source":"cid://45678",' +
'"size": 1234,"createdAt": 123456789,"modifiedAt": 123456789,"annotations":null}')
cidFile4.text = recordEntry
recordEntry = JsonOutput.prettyPrint('{"type":"TaskRun",' +
'"sessionId":"u345-2346-1stw2", "name":"bar","code":"abfs2556",' +
'"inputs": null,"container": null,"conda": null,"spack": null,"architecture": null,' +
'"globalVars": {},"binEntries": [],"annotations":null}')
cidFile5.text = recordEntry
def encoder = new CidEncoder()
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, 123456789, 123456789, null)
cidFile.text = encoder.encode(entry)
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987", 1234, 123456789, 123456789, null)
cidFile2.text = encoder.encode(entry)
entry = new TaskRun("u345-2346-1stw2", "foo", new Checksum("abcde2345","nextflow","standard"),
[new Parameter( "ValueInParam", "sample_id","ggal_gut"),
new Parameter("FileInParam","reads",["cid://45678/output.txt"])],
null, null, null, null, [:],[], null)
cidFile3.text = encoder.encode(entry)
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://45678", 1234, 123456789, 123456789, null)
cidFile4.text = encoder.encode(entry)
entry = new TaskRun("u345-2346-1stw2", "bar", new Checksum("abfs2556","nextflow","standard"),
null,null, null, null, null, [:],[], null)
cidFile5.text = encoder.encode(entry)
final network = """flowchart BT
cid://12345/file.bam@{shape: document, label: "cid://12345/file.bam"}
cid://123987/file.bam@{shape: document, label: "cid://123987/file.bam"}
Expand Down
81 changes: 37 additions & 44 deletions modules/nf-cid/src/main/nextflow/data/cid/CidObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,38 @@

package nextflow.data.cid

import static nextflow.data.cid.fs.CidPath.*

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.attribute.BasicFileAttributes

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.data.cid.model.Checksum
import nextflow.data.cid.model.DataPath
import nextflow.data.cid.model.Parameter
import nextflow.data.cid.model.WorkflowResults
import nextflow.data.cid.model.TaskOutput
import nextflow.data.cid.model.Workflow
import nextflow.data.cid.model.WorkflowOutput
import nextflow.data.cid.model.WorkflowResults
import nextflow.data.cid.model.WorkflowRun
import nextflow.data.cid.serde.CidEncoder
import nextflow.file.FileHelper
import nextflow.file.FileHolder
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.script.ScriptMeta
import nextflow.script.params.DefaultInParam
import nextflow.script.params.FileInParam
import nextflow.script.params.InParam
import nextflow.util.PathNormalizer
import nextflow.util.TestOnly

import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.attribute.BasicFileAttributes

import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import nextflow.Session
import nextflow.data.cid.model.DataType
import nextflow.data.cid.model.Output
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
import nextflow.script.params.InParam
import nextflow.trace.TraceObserver
import nextflow.trace.TraceRecord
import nextflow.util.CacheHelper

import static nextflow.data.cid.fs.CidPath.CID_PROT

import nextflow.util.PathNormalizer
import nextflow.util.TestOnly
/**
* Observer to write the generated workflow metadata in a CID store.
*
Expand All @@ -65,6 +63,7 @@ class CidObserver implements TraceObserver {
private Session session
private WorkflowResults workflowResults
private Map<String,String> outputsStoreDirCid = new HashMap<String,String>(10)
private CidEncoder encoder = new CidEncoder()

CidObserver(Session session, CidStore store){
this.session = session
Expand All @@ -83,18 +82,17 @@ class CidObserver implements TraceObserver {
void onFlowBegin() {
this.executionHash = storeWorkflowRun()
workflowResults = new WorkflowResults(
DataType.WorkflowResults,
"$CID_PROT${executionHash}",
new ArrayList<Parameter>())
new ArrayList<String>())
this.store.getHistoryLog().updateRunCid(session.uniqueId, "${CID_PROT}${this.executionHash}")
}

@Override
void onFlowComplete(){
if (this.workflowResults){
final content = JsonOutput.prettyPrint(JsonOutput.toJson(workflowResults))
final wfResultsHash = CacheHelper.hasher(content).hash().toString()
this.store.save(wfResultsHash, content)
final json = encoder.encode(workflowResults)
final wfResultsHash = CacheHelper.hasher(json).hash().toString()
this.store.save(wfResultsHash, workflowResults)
this.store.getHistoryLog().updateResultsCid(session.uniqueId, "${CID_PROT}${wfResultsHash}")
}
}
Expand All @@ -121,23 +119,21 @@ class CidObserver implements TraceObserver {
}
}
final workflow = new Workflow(
DataType.Workflow,
mainScript,
otherScripts,
session.workflowMetadata.repository,
session.workflowMetadata.commitId
)
final value = new WorkflowRun(
DataType.WorkflowRun,
workflow,
session.uniqueId.toString(),
session.runName,
getNormalizedParams(session.params, normalizer)
)

final content = JsonOutput.prettyPrint(JsonOutput.toJson(value))
final executionHash = CacheHelper.hasher(content).hash().toString()
store.save(executionHash, content)
final json = encoder.encode(value)
final executionHash = CacheHelper.hasher(json).hash().toString()
store.save(executionHash, value)
return executionHash
}

Expand Down Expand Up @@ -184,7 +180,6 @@ class CidObserver implements TraceObserver {
final codeChecksum = new Checksum(CacheHelper.hasher(session.stubRun ? task.stubSource: task.source).hash().toString(),
"nextflow", CacheHelper.HashMode.DEFAULT().toString().toLowerCase())
final value = new nextflow.data.cid.model.TaskRun(
DataType.TaskRun,
session.uniqueId.toString(),
task.getName(),
codeChecksum,
Expand All @@ -203,7 +198,7 @@ class CidObserver implements TraceObserver {

// store in the underlying persistence
final key = task.hash.toString()
store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value)))
store.save(key, value)
return key
}

Expand All @@ -215,15 +210,14 @@ class CidObserver implements TraceObserver {
final key = cid.toString()
final checksum = new Checksum( CacheHelper.hasher(path).hash().toString(),
"nextflow", CacheHelper.HashMode.DEFAULT().toString().toLowerCase() )
final value = new Output(
DataType.TaskOutput,
final value = new TaskOutput(
path.toUriString(),
checksum,
"$CID_PROT$task.hash",
attrs.size(),
attrs.creationTime().toMillis(),
attrs.lastModifiedTime().toMillis())
store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value)))
store.save(key, value)
} catch (Throwable e) {
log.warn("Exception storing CID output $path for task ${task.name}. ${e.getLocalizedMessage()}")
}
Expand Down Expand Up @@ -273,20 +267,20 @@ class CidObserver implements TraceObserver {
CacheHelper.HashMode.DEFAULT().toString().toLowerCase()
)
final rel = getWorkflowRelative(destination)
final key = "$executionHash/${rel}"
final key = "$executionHash/${rel}" as String
final sourceReference = getSourceReference(source)
final attrs = readAttributes(destination)
final value = new Output(
DataType.WorkflowOutput,
final value = new WorkflowOutput(
destination.toUriString(),
checksum,
sourceReference,
attrs.size(),
attrs.creationTime().toMillis(),
attrs.lastModifiedTime().toMillis())
store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value)))
workflowResults.outputs.add("${CID_PROT}${key}")
} catch (Throwable e) {
store.save(key, value)
workflowResults.outputs.add("${CID_PROT}${key}".toString())
}
catch (Throwable e) {
log.warn("Exception storing CID output $destination for workflow ${executionHash}.", e)
}
}
Expand Down Expand Up @@ -315,16 +309,15 @@ class CidObserver implements TraceObserver {
final rel = getWorkflowRelative(destination)
final key = "$executionHash/${rel}"
final attrs = readAttributes(destination)
final value = new Output(
DataType.WorkflowOutput,
final value = new WorkflowOutput(
destination.toUriString(),
checksum,
"${CID_PROT}${executionHash}".toString(),
attrs.size(),
attrs.creationTime().toMillis(),
attrs.lastModifiedTime().toMillis())
store.save(key, JsonOutput.prettyPrint(JsonOutput.toJson(value)))
workflowResults.outputs.add("${CID_PROT}${key}")
store.save(key, value)
workflowResults.outputs.add("${CID_PROT}${key}" as String)
}catch (Throwable e) {
log.warn("Exception storing CID output $destination for workflow ${executionHash}. ${e.getLocalizedMessage()}")
}
Expand Down
6 changes: 3 additions & 3 deletions modules/nf-cid/src/main/nextflow/data/cid/CidStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package nextflow.data.cid


import groovy.transform.CompileStatic
import nextflow.data.cid.serde.CidSerializable
import nextflow.data.config.DataConfig
/**
* Interface for the CID store
Expand All @@ -38,14 +38,14 @@ interface CidStore extends Closeable {
* @param key Entry key.
* @param value Entry object.
*/
void save(String key, Object value)
void save(String key, CidSerializable value)

/**
* Load an entry for a given CID key.
* @param key CID key.
* @return entry value, or null if key does not exists
*/
Object load(String key)
CidSerializable load(String key)

/**
* Get the {@link CidHistoryLog} object associated to the CidStore.
Expand Down
19 changes: 11 additions & 8 deletions modules/nf-cid/src/main/nextflow/data/cid/DefaultCidStore.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package nextflow.data.cid

import nextflow.file.FileHelper
import nextflow.util.TestOnly

import java.nio.file.Files
import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.data.cid.serde.CidEncoder
import nextflow.data.cid.serde.CidSerializable
import nextflow.data.config.DataConfig
import nextflow.exception.AbortOperationException

import nextflow.file.FileHelper
import nextflow.util.TestOnly
/**
* Default Implementation for the a CID store.
*
Expand All @@ -44,10 +44,13 @@ class DefaultCidStore implements CidStore {
private Path metaLocation
private Path location
private CidHistoryLog historyLog
private CidEncoder encoder


DefaultCidStore open(DataConfig config) {
location = toLocationPath(config.store.location)
metaLocation = location.resolve(METADATA_PATH)
encoder = new CidEncoder()
if( !Files.exists(metaLocation) && !Files.createDirectories(metaLocation) ) {
throw new AbortOperationException("Unable to create CID store directory: $metaLocation")
}
Expand All @@ -62,19 +65,19 @@ class DefaultCidStore implements CidStore {
}

@Override
void save(String key, Object value) {
void save(String key, CidSerializable value) {
final path = metaLocation.resolve("$key/$METADATA_FILE")
Files.createDirectories(path.parent)
log.debug "Save CID file path: $path"
path.text = value
path.text = encoder.encode(value)
}

@Override
Object load(String key) {
CidSerializable load(String key) {
final path = metaLocation.resolve("$key/$METADATA_FILE")
log.debug("Loading from path $path")
if (path.exists())
return path.text
return encoder.decode(path.text)
log.debug("File for key $key not found")
return null
}
Expand Down
Loading