Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple CID FileSystem from Local file system and other fixes #5866

Merged
merged 6 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdCid.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import groovy.transform.CompileStatic
import nextflow.Session
import nextflow.config.ConfigBuilder
import nextflow.dag.MermaidHtmlRenderer
import nextflow.data.cid.CidHistoryFile
import nextflow.data.cid.CidHistoryRecord
import nextflow.data.cid.CidStore
import nextflow.data.cid.CidStoreFactory
import nextflow.data.cid.model.DataType
Expand All @@ -37,7 +37,6 @@ import java.nio.file.Path
import java.nio.file.Paths

import static nextflow.data.cid.fs.CidPath.CID_PROT
import static nextflow.data.cid.fs.CidPath.METADATA_FILE

/**
*
Expand Down Expand Up @@ -165,14 +164,17 @@ class CmdCid extends CmdBase implements UsageAware{
}

private void printHistory(CidStore store) {
final historyFile = store.getHistoryFile()
if (historyFile.exists()) {
final records = store.historyLog?.records
if( records ) {
def table = new TableBuilder(cellSeparator: '\t')
.head('TIMESTAMP')
.head('RUN NAME')
.head('SESSION ID')
.head('RUN CID')
historyFile.eachLine { table.append(CidHistoryFile.CidRecord.parse(it).toList()) }
.head('RESULT CID')
for( CidHistoryRecord record: records ){
table.append(record.toList())
}
println table.toString()
} else {
println("No workflow runs CIDs found.")
Expand Down Expand Up @@ -207,15 +209,19 @@ class CmdCid extends CmdBase implements UsageAware{
}
if (!args[0].startsWith(CID_PROT))
throw new Exception("Identifier is not a CID URL")
final key = args[0].substring(CID_PROT.size()) + "/$METADATA_FILE"
final key = args[0].substring(CID_PROT.size())
final config = new ConfigBuilder()
.setOptions(getLauncher().getOptions())
.setBaseDir(Paths.get('.'))
.build()
final store = CidStoreFactory.getOrCreate(new Session(config))
if (store) {
try {
println store.load(key).toString()
final entry = store.load(key)
if( entry )
println entry.toString()
else
println "No entry found for ${args[0]}."
} catch (Throwable e) {
println "Error loading ${args[0]}."
}
Expand Down Expand Up @@ -292,7 +298,7 @@ class CmdCid extends CmdBase implements UsageAware{
if (!nodeToRender.startsWith(CID_PROT))
throw new Exception("Identifier is not a CID URL")
final slurper = new JsonSlurper()
final key = nodeToRender.substring(CID_PROT.size()) + "/$METADATA_FILE"
final key = nodeToRender.substring(CID_PROT.size())
final cidObject = slurper.parse(store.load(key).toString().toCharArray()) as Map
switch (DataType.valueOf(cidObject.type as String)) {
case DataType.TaskOutput:
Expand Down Expand Up @@ -357,10 +363,17 @@ class CmdCid extends CmdBase implements UsageAware{
}
if (value instanceof Map) {
if (value.path) {
final label = convertToLabel(value.path.toString())
lines << " ${value.path}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(value.path.toString(), nodeToRender))
return
final path = value.path.toString()
if (path.startsWith(CID_PROT)) {
nodes.add(path)
edges.add(new Edge(path, nodeToRender))
return
} else {
final label = convertToLabel(path)
lines << " ${path}@{shape: document, label: \"${label}\"}".toString();
edges.add(new Edge(path, nodeToRender))
return
}
}
}
final label = convertToLabel(value.toString())
Expand Down
179 changes: 119 additions & 60 deletions modules/nextflow/src/main/groovy/nextflow/data/cid/CidHistoryFile.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,129 +16,188 @@
*/
package nextflow.data.cid

import groovy.transform.EqualsAndHashCode
import groovy.util.logging.Slf4j
import nextflow.util.WithLockFile

import java.nio.channels.FileChannel
import java.nio.channels.FileLock
import java.nio.file.Files
import java.nio.file.Path
import java.text.DateFormat
import java.text.SimpleDateFormat
import java.nio.file.StandardOpenOption

/**
* File to store a history of the workflow executions and their corresponding CIDs
*
* @author Jorge Ejarque <jorge.ejarque@seqera.io>
*/
@Slf4j
class CidHistoryFile extends WithLockFile {
private static final DateFormat TIMESTAMP_FMT = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss')
class CidHistoryFile implements CidHistoryLog {

Path path

CidHistoryFile(Path file) {
super(file.toString())
this.path = file
}

void write(String name, UUID key, String runCid, Date date = null) {
void write(String name, UUID key, String runCid, String resultsCid, Date date = null) {
assert key

withFileLock {
def timestamp = date ?: new Date()
log.debug("Writting record for $key in CID history file $this")
this << new CidRecord(timestamp: timestamp, runName: name, sessionId: key, runCid: runCid).toString() << '\n'
log.trace("Writting record for $key in CID history file $this")
path << new CidHistoryRecord(timestamp, name, key, runCid, resultsCid).toString() << '\n'
}
}

void update(UUID sessionId, String runCid) {
void updateRunCid(UUID sessionId, String runCid) {
assert sessionId

try {
withFileLock { update0(sessionId, runCid) }
withFileLock { updateRunCid0(sessionId, runCid) }
}
catch (Throwable e) {
log.warn "Can't update cid history file: $this", e
log.warn "Can't update CID history file: $this", e.message
}
}

String getRunCid(UUID id){
void updateResultsCid(UUID sessionId, String resultsCid) {
assert sessionId

try {
withFileLock { updateResultsCid0(sessionId, resultsCid) }
}
catch (Throwable e) {
log.warn "Can't update CID history file: $this", e.message
}
}

List<CidHistoryRecord> getRecords(){
List<CidHistoryRecord> list = new LinkedList<CidHistoryRecord>()
try {
withFileLock { this.path.eachLine {list.add(CidHistoryRecord.parse(it)) } }
}
catch (Throwable e) {
log.warn "Can't read records from CID history file: $this", e.message
}
return list
}


CidHistoryRecord getRecord(UUID id) {
assert id

for (String line: this.readLines()){
def current = line ? CidRecord.parse(line) : null
for (String line : this.path.readLines()) {
def current = line ? CidHistoryRecord.parse(line) : null
if (current.sessionId == id) {
return current.runCid
return current
}
}
log.warn("Can't find session $id in CID history file $this")
return null
}

private void update0(UUID id, String runCid) {

private void updateRunCid0(UUID id, String runCid) {
assert id
def newHistory = new StringBuilder()

this.readLines().each { line ->
this.path.readLines().each { line ->
try {
def current = line ? CidRecord.parse(line) : null
def current = line ? CidHistoryRecord.parse(line) : null
if (current.sessionId == id) {
log.debug("Updating record for $id in CID history file $this")
current.runCid = runCid
newHistory << current.toString() << '\n'
log.trace("Updating record for $id in CID history file $this")
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, runCid, current.resultsCid)
newHistory << newRecord.toString() << '\n'
} else {
newHistory << line << '\n'
}
}
catch (IllegalArgumentException e) {
log.warn("Can't read CID history file: $this", e)
log.warn("Can't read CID history file: $this", e.message)
}
}

// rewrite the history content
this.setText(newHistory.toString())
this.path.setText(newHistory.toString())
}

@EqualsAndHashCode(includes = 'runName,sessionId')
static class CidRecord {
Date timestamp
String runName
UUID sessionId
String runCid
private void updateResultsCid0(UUID id, String resultsCid) {
assert id
def newHistory = new StringBuilder()

CidRecord(UUID sessionId, String name = null) {
this.runName = name
this.sessionId = sessionId
this.path.readLines().each { line ->
try {
def current = line ? CidHistoryRecord.parse(line) : null
if (current.sessionId == id) {
log.trace("Updating record for $id in CID history file $this")
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, current.runCid, resultsCid)
newHistory << newRecord.toString() << '\n'
} else {
newHistory << line << '\n'
}
}
catch (IllegalArgumentException e) {
log.warn("Can't read CID history file: $this", e.message)
}
}

protected CidRecord() {}
// rewrite the history content
this.path.setText(newHistory.toString())
}

List<String> toList() {
def line = new ArrayList<String>(4)
line << (timestamp ? TIMESTAMP_FMT.format(timestamp) : '-')
line << (runName ?: '-')
line << (sessionId.toString())
line << (runCid ?: '-')
/**
* Apply the given action by using a file lock
*
* @param action The closure implementing the action to be executed with a file lock
* @return The value returned by the action closure
*/
protected withFileLock(Closure action) {

def rnd = new Random()
long ts = System.currentTimeMillis()
final parent = this.path.parent ?: Path.of('.').toAbsolutePath()
Files.createDirectories(parent)
def file = parent.resolve("${this.path.name}.lock".toString())
FileChannel fos
try {
fos = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
} catch (UnsupportedOperationException e){
log.warn("File System Provider for ${this.path} do not support file locking. Continuing without lock...")
return action.call()
}

@Override
String toString() {
toList().join('\t')
if (!fos){
throw new IllegalStateException("Can't create a file channel for ${this.path.toAbsolutePath()}")
}
try {
Throwable error
FileLock lock = null

static CidRecord parse(String line) {
def cols = line.tokenize('\t')
if (cols.size() == 2)
return new CidRecord(UUID.fromString(cols[0]))

if (cols.size() == 4) {

return new CidRecord(
timestamp: TIMESTAMP_FMT.parse(cols[0]),
runName: cols[1],
sessionId: UUID.fromString(cols[2]),
runCid: cols[3]
)
try {
while (true) {
lock = fos.tryLock()
if (lock) break
if (System.currentTimeMillis() - ts < 1_000)
sleep rnd.nextInt(75)
else {
error = new IllegalStateException("Can't lock file: ${this.path.toAbsolutePath()} -- Nextflow needs to run in a file system that supports file locks")
break
}
}
if (lock) {
return action.call()
}
}
catch (Exception e) {
return action.call()
}
finally {
if (lock?.isValid()) lock.release()
}

throw new IllegalArgumentException("Not a valid history entry: `$line`")
if (error) throw error
}
finally {
fos.closeQuietly()
file.delete()
}
}

}
Loading