From 0193cfe82610a6b7f747e8b8653be585701c36d5 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 9 May 2025 21:54:14 -0500 Subject: [PATCH] Replace `#output` fragment with WorkflowLaunch/WorkflowRun Signed-off-by: Ben Sherman --- docs/reference/channel.md | 4 +- .../nextflow/extension/LinExtension.groovy | 2 +- .../groovy/nextflow/cli/CmdLineageTest.groovy | 4 +- .../lineage/DefaultLinHistoryLog.groovy | 29 ++- .../nextflow/lineage/LinExtensionImpl.groovy | 4 +- .../nextflow/lineage/LinHistoryLog.groovy | 13 +- .../nextflow/lineage/LinHistoryRecord.groovy | 12 +- .../main/nextflow/lineage/LinObserver.groovy | 118 +++++------- .../lineage/LinPropertyValidator.groovy | 6 +- .../src/main/nextflow/lineage/LinUtils.groovy | 59 +----- .../lineage/cli/LinCommandImpl.groovy | 4 +- .../lineage/cli/LinDagRenderer.groovy | 12 +- .../lineage/fs/LinMetadataPath.groovy | 14 +- .../main/nextflow/lineage/fs/LinPath.groovy | 181 +++++------------- .../nextflow/lineage/model/FileOutput.groovy | 6 +- .../nextflow/lineage/model/TaskOutput.groovy | 53 ----- .../nextflow/lineage/model/TaskRun.groovy | 4 +- ...lowOutput.groovy => WorkflowLaunch.groovy} | 26 ++- .../nextflow/lineage/model/WorkflowRun.groovy | 24 ++- .../nextflow/lineage/serde/LinEncoder.groovy | 6 +- .../lineage/DefaultLinHistoryLogTest.groovy | 40 +++- .../lineage/DefaultLinStoreTest.groovy | 4 +- .../lineage/LinExtensionImplTest.groovy | 6 +- .../lineage/LinHistoryRecordTest.groovy | 12 +- .../nextflow/lineage/LinObserverTest.groovy | 55 ++---- .../test/nextflow/lineage/LinUtilsTest.groovy | 48 +---- .../lineage/cli/LinCommandImplTest.groovy | 38 +--- .../fs/LinFileSystemProviderTest.groovy | 9 +- .../nextflow/lineage/fs/LinPathTest.groovy | 97 ++-------- .../lineage/serde/LinEncoderTest.groovy | 49 ++--- 30 files changed, 314 insertions(+), 625 deletions(-) delete mode 100644 modules/nf-lineage/src/main/nextflow/lineage/model/TaskOutput.groovy rename modules/nf-lineage/src/main/nextflow/lineage/model/{WorkflowOutput.groovy => WorkflowLaunch.groovy} (67%) diff --git a/docs/reference/channel.md b/docs/reference/channel.md index b3385ba45b..d6a9f3740e 100644 --- a/docs/reference/channel.md +++ b/docs/reference/channel.md @@ -72,7 +72,7 @@ The `channel.fromLineage` factory creates a channel that emits files from the {r ```nextflow channel - .fromLineage(workflowRun: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta']) + .fromLineage(workflowLaunch: 'lid://0d1d1622ced3e4edc690bec768919b45', label: ['alpha', 'beta']) .view() ``` @@ -86,7 +86,7 @@ Available options: `taskRun` : LID of the task run that produced the desired files. -`workflowRun` +`workflowLaunch` : LID of the workflow run that produced the desired files. (channel-fromlist)= diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy index 97335e1421..9d7c928f6b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/LinExtension.groovy @@ -29,7 +29,7 @@ interface LinExtension { static final Map PARAMS = [ label: [List,String,GString], taskRun: [String,GString], - workflowRun: [String,GString], + workflowLaunch: [String,GString], ] /** diff --git a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy index 50e0814355..a328c2a2a8 100644 --- a/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/cli/CmdLineageTest.groovy @@ -74,8 +74,8 @@ class CmdLineageTest extends Specification { def launcher = Mock(Launcher){ getOptions() >> new CliOptions(config: [configFile.toString()]) } - lidLog.write("run_name", uniqueId, "lid://123456", date) - def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456".toString() + lidLog.write("run_name", uniqueId, "lid://123456","lid://567890", date) + def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456\tlid://567890".toString() when: def lidCmd = new CmdLineage(launcher: launcher, args: ["list"]) lidCmd.run() diff --git a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy index 745dbab4b8..35059713df 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/DefaultLinHistoryLog.groovy @@ -40,19 +40,34 @@ class DefaultLinHistoryLog implements LinHistoryLog { Files.createDirectories(path) } - void write(String name, UUID key, String runLid, Date date = null) { + @Override + void write(String name, UUID id, String launchLid, Date date = null) { assert name - assert key - def timestamp = date ?: new Date() - final recordFile = path.resolve(runLid.substring(LID_PROT.size())) + assert id + final timestamp = date ?: new Date() + final recordFile = path.resolve(launchId.substring(LID_PROT.size())) try { - recordFile.text = new LinHistoryRecord(timestamp, name, key, runLid).toString() - log.trace("Record for $key written in lineage history log ${FilesEx.toUriString(this.path)}") + recordFile.text = new LinHistoryRecord(timestamp, name, id, null, launchLid, null).toString() + log.trace("Record for $launchLid written in lineage history log ${FilesEx.toUriString(this.path)}") }catch (Throwable e) { - log.warn("Can't write record $key file ${FilesEx.toUriString(recordFile)}", e.message) + log.warn("Can't write record $launchLid file ${FilesEx.toUriString(recordFile)}", e.message) } } + @Override + void finalize(String launchLid, String runLid, String status) { + assert id + final recordFile = path.resolve(launchId.substring(LID_PROT.size())) + try { + final current = LinHistoryRecord.parse(recordFile.text) + recordFile.text = new LinHistoryRecord(current.timestamp, current.runName, id, status, current.launchLid, runLid).toString() + } + catch (Throwable e) { + log.warn("Can't read record $launchId file: ${FilesEx.toUriString(recordFile)}", e.message) + } + } + + @Override List getRecords(){ List list = new LinkedList() try { diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy index 6b95047040..6ae6593cb6 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinExtensionImpl.groovy @@ -49,8 +49,8 @@ class LinExtensionImpl implements LinExtension { private static Map> buildQueryParams(Map opts) { final queryParams = [type: [FileOutput.class.simpleName] ] - if( opts.workflowRun ) - queryParams['workflowRun'] = [opts.workflowRun as String] + if( opts.workflowLaunch ) + queryParams['workflowLaunch'] = [opts.workflowLaunch as String] if( opts.taskRun ) queryParams['taskRun'] = [opts.taskRun as String] if( opts.label ) { diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy index 33c31da30f..c28444b475 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryLog.groovy @@ -26,9 +26,18 @@ interface LinHistoryLog { * * @param name Workflow execution name. * @param sessionId Workflow session ID. - * @param runLid Workflow run ID. + * @param launchLid Workflow launch Lineage ID. */ - void write(String name, UUID sessionId, String runLid) + void write(String name, UUID sessionId, String launchLid) + + /** + * Finalize the log record for a given run. + * + * @param launchLid Workflow launch Lineage ID. + * @param runLid Workflow run Lineage ID. + * @param status Workflow run completion status. + */ + void finalize(String launchLid, String runLid, String status) /** * Get the store records in the Lineage History Log. diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryRecord.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryRecord.groovy index 470fe79b3e..e3122b7fe8 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryRecord.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinHistoryRecord.groovy @@ -35,12 +35,16 @@ class LinHistoryRecord { final Date timestamp final String runName final UUID sessionId + final String status + final String launchLid final String runLid - LinHistoryRecord(Date timestamp, String name, UUID sessionId, String runLid) { + LinHistoryRecord(Date timestamp, String name, UUID sessionId, String status, String launchLid, String runLid) { this.timestamp = timestamp this.runName = name this.sessionId = sessionId + this.status = status + this.launchLid = launchLid this.runLid = runLid } @@ -51,6 +55,8 @@ class LinHistoryRecord { timestamp ? TIMESTAMP_FMT.format(timestamp) : '-', runName ?: '-', sessionId.toString(), + status ?: '-', + launchLid ?: '-', runLid ?: '-', ) } @@ -62,8 +68,8 @@ class LinHistoryRecord { static LinHistoryRecord parse(String line) { final cols = line.tokenize('\t') - if (cols.size() == 4) { - return new LinHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3]) + if (cols.size() == 6) { + return new LinHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3], cols[4], cols[5]) } throw new IllegalArgumentException("Not a valid history entry: `$line`") } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy index ab46418c39..f897cf8fe7 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinObserver.groovy @@ -32,9 +32,8 @@ import nextflow.lineage.model.Checksum import nextflow.lineage.model.FileOutput import nextflow.lineage.model.DataPath import nextflow.lineage.model.Parameter -import nextflow.lineage.model.TaskOutput import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun import nextflow.file.FileHelper import nextflow.file.FileHolder @@ -84,10 +83,10 @@ class LinObserver implements TraceObserverV2 { (EachInParam) : "each" ] - private String executionHash + private String launchId private LinStore store private Session session - private WorkflowOutput workflowOutput + private List outputs = new LinkedList() private Map outputsStoreDirLid = new HashMap(10) private PathNormalizer normalizer @@ -97,10 +96,7 @@ class LinObserver implements TraceObserverV2 { } @TestOnly - String getExecutionHash(){ executionHash } - - @TestOnly - String setExecutionHash(String hash){ this.executionHash = hash } + String setLaunchId(String hash){ this.launchId = hash } @TestOnly String setNormalizer(PathNormalizer normalizer){ this.normalizer = normalizer } @@ -108,23 +104,24 @@ class LinObserver implements TraceObserverV2 { @Override void onFlowBegin() { normalizer = new PathNormalizer(session.workflowMetadata) - executionHash = storeWorkflowRun(normalizer) - final executionUri = asUriString(executionHash) - workflowOutput = new WorkflowOutput( - OffsetDateTime.now(), - executionUri, - new LinkedList() - ) - this.store.getHistoryLog().write(session.runName, session.uniqueId, executionUri) + launchId = storeWorkflowLaunch(normalizer) + this.store.getHistoryLog().write(session.runName, session.uniqueId, asUriString(launchId)) } @Override - void onFlowComplete(){ - if(workflowOutput?.output ){ - workflowOutput.createdAt = OffsetDateTime.now() - final key = executionHash + '#output' - this.store.save(key, workflowOutput) - } + void onFlowComplete() { + final status = session.isCancelled() + ? "CANCELLED" + : session.isSuccess() ? "SUCCEEDED" : "FAILED" + final workflowRun = new WorkflowRun( + OffsetDateTime.now(), + asUriString(launchId), + status, + outputs + ) + final runId = CacheHelper.hasher(workflowRun).hash().toString() + this.store.save(runId, workflowRun) + this.store.getHistoryLog().finalize(session.uniqueId, asUriString(runId), status) } protected Collection allScriptFiles() { @@ -150,7 +147,7 @@ class LinObserver implements TraceObserverV2 { return result.sort{it.path} } - protected String storeWorkflowRun(PathNormalizer normalizer) { + protected String storeWorkflowLaunch(PathNormalizer normalizer) { // create the workflow object holding script files and repo tracking info final workflow = new Workflow( collectScriptDataPaths(normalizer), @@ -158,16 +155,16 @@ class LinObserver implements TraceObserverV2 { session.workflowMetadata.commitId ) // create the workflow run main object - final value = new WorkflowRun( + final value = new WorkflowLaunch( workflow, session.uniqueId.toString(), session.runName, getNormalizedParams(session.params, normalizer), SecretHelper.hideSecrets(session.config.deepClone()) as Map ) - final executionHash = CacheHelper.hasher(value).hash().toString() - store.save(executionHash, value) - return executionHash + final launchId = CacheHelper.hasher(value).hash().toString() + store.save(launchId, value) + return launchId } protected static List getNormalizedParams(Map params, PathNormalizer normalizer){ @@ -180,43 +177,6 @@ class LinObserver implements TraceObserverV2 { return normalizedParams } - @Override - void onTaskComplete(TaskEvent event) { - storeTaskInfo(event.handler.task) - } - - protected void storeTaskInfo(TaskRun task) { - // store the task run entry - storeTaskRun(task, normalizer) - // store all task results - storeTaskResults(task, normalizer) - } - - protected String storeTaskResults(TaskRun task, PathNormalizer normalizer){ - final outputParams = getNormalizedTaskOutputs(task, normalizer) - final value = new TaskOutput( asUriString(task.hash.toString()), asUriString(executionHash), OffsetDateTime.now(), outputParams ) - final key = task.hash.toString() + '#output' - store.save(key,value) - return key - } - - private List getNormalizedTaskOutputs(TaskRun task, PathNormalizer normalizer){ - final outputs = task.getOutputs() - final outputParams = new LinkedList() - for( Map.Entry entry : outputs ) { - manageTaskOutputParameter(entry.key, outputParams, entry.value, task, normalizer) - } - return outputParams - } - - private void manageTaskOutputParameter(OutParam key, LinkedList outputParams, value, TaskRun task, PathNormalizer normalizer) { - if (key instanceof FileOutParam) { - outputParams.add(new Parameter(getParameterType(key), key.name, manageFileOutParam(value, task))) - } else { - outputParams.add(new Parameter(getParameterType(key), key.name, normalizeValue(value, normalizer))) - } - } - private static Object normalizeValue(Object value, PathNormalizer normalizer) { if (value instanceof Path) return normalizer.normalizePath((Path)value) @@ -226,6 +186,11 @@ class LinObserver implements TraceObserverV2 { return value } + @Override + void onTaskComplete(TaskEvent event) { + storeTaskRun(event.handler.task, normalizer) + } + private Object manageFileOutParam(Object value, TaskRun task) { if (value == null) { log.debug "Unexpected lineage File output value null" @@ -247,7 +212,7 @@ class LinObserver implements TraceObserverV2 { protected String storeTaskRun(TaskRun task, PathNormalizer normalizer) { final codeChecksum = Checksum.ofNextflow(session.stubRun ? task.stubSource : task.source) - final value = new nextflow.lineage.model.TaskRun( + final taskRun = new nextflow.lineage.model.TaskRun( session.uniqueId.toString(), task.getName(), codeChecksum, @@ -262,12 +227,19 @@ class LinObserver implements TraceObserverV2 { normalizer.normalizePath(p.normalize()), Checksum.ofNextflow(p) ) }, - asUriString(executionHash) + asUriString(launchId) ) // store in the underlying persistence final key = task.hash.toString() - store.save(key, value) + store.save(key, taskRun) + + // store file outputs + task.outputs.forEach { OutParam param, Object value -> + if (param instanceof FileOutParam) + manageFileOutParam(value, task) + } + return key } @@ -280,7 +252,7 @@ class LinObserver implements TraceObserverV2 { path.toUriString(), checksum, asUriString(task.hash.toString()), - asUriString(executionHash), + asUriString(launchId), asUriString(task.hash.toString()), attrs.size(), LinUtils.toDate(attrs?.creationTime()), @@ -300,7 +272,7 @@ class LinObserver implements TraceObserverV2 { protected String getWorkflowOutputKey(Path target) { final rel = getWorkflowRelative(target) - return executionHash + SEPARATOR + rel + return launchId + SEPARATOR + rel } protected String getTaskRelative(TaskRun task, Path path){ @@ -345,13 +317,13 @@ class LinObserver implements TraceObserverV2 { final key = getWorkflowOutputKey(event.target) final sourceReference = event.source ? getSourceReference(event.source) - : asUriString(executionHash) + : asUriString(launchId) final attrs = readAttributes(event.target) final value = new FileOutput( event.target.toUriString(), checksum, sourceReference, - asUriString(executionHash), + asUriString(launchId), null, attrs.size(), LinUtils.toDate(attrs?.creationTime()), @@ -363,7 +335,7 @@ class LinObserver implements TraceObserverV2 { log.warn1("Lineage for workflow output is not supported by publishDir directive") } catch (Throwable e) { - log.warn("Unexpected error storing published file '${event.target.toUriString()}' for workflow '${executionHash}'", e) + log.warn("Unexpected error storing published file '${event.target.toUriString()}' for workflow '${launchId}'", e) } } @@ -381,7 +353,7 @@ class LinObserver implements TraceObserverV2 { void onWorkflowOutput(WorkflowOutputEvent event) { final type = getParameterType(event.value) final value = convertPathsToLidReferences(event.index ?: event.value) - workflowOutput.output.add(new Parameter(type, event.name, value)) + outputs.add(new Parameter(type, event.name, value)) } protected static String getParameterType(Object param) { diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinPropertyValidator.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinPropertyValidator.groovy index 5f02fb3019..c28f526e1a 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinPropertyValidator.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinPropertyValidator.groovy @@ -21,10 +21,9 @@ import nextflow.lineage.model.Checksum import nextflow.lineage.model.DataPath import nextflow.lineage.model.FileOutput import nextflow.lineage.model.Parameter -import nextflow.lineage.model.TaskOutput import nextflow.lineage.model.TaskRun import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun /** @@ -40,10 +39,9 @@ class LinPropertyValidator { DataPath, FileOutput, Parameter, - TaskOutput, TaskRun, Workflow, - WorkflowOutput, + WorkflowLaunch, WorkflowRun, ] diff --git a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy index 2c110eebc7..21ab46bb95 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/LinUtils.groovy @@ -25,8 +25,6 @@ import java.time.ZoneId import groovy.transform.CompileStatic import groovy.util.logging.Slf4j -import nextflow.lineage.model.TaskRun -import nextflow.lineage.model.WorkflowRun import nextflow.lineage.serde.LinEncoder import nextflow.lineage.serde.LinSerializable import nextflow.serde.gson.GsonEncoder @@ -42,14 +40,11 @@ class LinUtils { private static final String[] EMPTY_ARRAY = new String[] {} /** - * Get a lineage record or fragment from the Lineage store. + * Get a lineage record from the Lineage store. * * @param store Lineage store. - * @param uri Object or fragment to retrieve in URI-like format. - * Format 'lid://[#fragment]' where: - * - Key: Metadata Element key - * - Fragment: Element fragment to retrieve. - * @return Lineage record or fragment. + * @param uri Lineage record URI. + * @return Lineage record. */ static Object getMetadataObject(LinStore store, URI uri) { if( uri.scheme != SCHEME ) @@ -57,56 +52,16 @@ class LinUtils { final key = uri.authority ? uri.authority + uri.path : uri.path if( key == SEPARATOR ) throw new IllegalArgumentException("Cannot get record from the root LID URI") - if ( uri.query ) + if( uri.query ) log.warn("Query string is not supported for Lineage URI: `$uri` -- it will be ignored") - return getMetadataObject0(store, key, uri.fragment ) - } - - private static Object getMetadataObject0(LinStore store, String key, String fragment) { + if( uri.fragment ) + log.warn("Query fragment is not supported for Lineage URI: `$uri` -- it will be ignored") final record = store.load(key) - if( !record ) { + if( !record ) throw new FileNotFoundException("Lineage record $key not found") - } - if( fragment ) { - new LinPropertyValidator().validate(fragment.tokenize('.')) - return getSubObject(store, key, record, fragment) - } return record } - /** - * Get a lineage sub-record. - * - * If the requested sub-record is the workflow or task outputs, retrieves the outputs from the outputs description. - * - * @param store Store to retrieve lineage records. - * @param key Parent key. - * @param record Parent record. - * @param fragment String in indicating the properties to navigate to get the sub-record. - * @return Sub-record or null in it does not exist. - */ - static Object getSubObject(LinStore store, String key, LinSerializable record, String fragment) { - if( isSearchingOutputs(record, fragment) ) { - // When asking for a Workflow or task output retrieve the outputs description - final outputs = store.load("${key}#output") - if (!outputs) - return [] - return navigate(outputs, fragment) - } - return navigate(record, fragment) - } - - /** - * Check if the Lid pseudo path or query is for Task or Workflow outputs. - * - * @param record Parent lineage record - * @param fragment Fragment indicating the properties to navigate to get the sub-record. - * @return return 'true' if the parent is a Task/Workflow run and the first element in fragment is 'output'. Otherwise 'false' - */ - static boolean isSearchingOutputs(LinSerializable record, String fragment) { - return (record instanceof WorkflowRun || record instanceof TaskRun) && fragment && fragment.tokenize('.')[0] == 'output' - } - /** * Evaluates record or the records in a collection matches a set of parameter-value pairs. It includes in the results collection in case of match. * diff --git a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy index c5e05f5070..fb4c670d6a 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinCommandImpl.groovy @@ -70,7 +70,9 @@ class LinCommandImpl implements CmdLineage.LinCommand { .head('TIMESTAMP') .head('RUN NAME') .head('SESSION ID') - .head('LINEAGE ID') + .head('STATUS') + .head('LAUNCH LID') + .head('RUN LID') for (LinHistoryRecord record : records) { table.append(record.toList()) } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinDagRenderer.groovy b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinDagRenderer.groovy index 17d230d4c9..a737e52bc4 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/cli/LinDagRenderer.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/cli/LinDagRenderer.groovy @@ -25,7 +25,7 @@ import nextflow.dag.MermaidHtmlRenderer import nextflow.lineage.LinStore import nextflow.lineage.model.FileOutput import nextflow.lineage.model.TaskRun -import nextflow.lineage.model.WorkflowRun +import nextflow.lineage.model.WorkflowLaunch import static nextflow.lineage.fs.LinPath.LID_PROT import static nextflow.lineage.fs.LinPath.isLidUri @@ -102,10 +102,10 @@ class LinDagRenderer { visitFileOutput(lid, record) else if( record instanceof TaskRun ) visitTaskRun(lid, record) - else if( record instanceof WorkflowRun ) + else if( record instanceof WorkflowLaunch ) visitWorkflowRun(lid, record) else - throw new Exception("Cannot render lineage for type ${record.getClass().getSimpleName()} -- must be a FileOutput, TaskRun, or WorkflowRun") + throw new Exception("Cannot render lineage for type ${record.getClass().getSimpleName()} -- must be a FileOutput, TaskRun, or WorkflowLaunch") } private void visitFileOutput(String lid, FileOutput fileOutput) { @@ -132,9 +132,9 @@ class LinDagRenderer { } } - private void visitWorkflowRun(String lid, WorkflowRun workflowRun) { - addNode(lid, "${workflowRun.name} [${lid}]", NodeType.TASK) - for( final param : workflowRun.params ) { + private void visitWorkflowRun(String lid, WorkflowLaunch workflowLaunch) { + addNode(lid, "${workflowLaunch.name} [${lid}]", NodeType.TASK) + for( final param : workflowLaunch.params ) { visitParameter0(lid, param.value.toString()) } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinMetadataPath.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinMetadataPath.groovy index a4347bd5fb..34f993e345 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinMetadataPath.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinMetadataPath.groovy @@ -29,27 +29,27 @@ import java.nio.file.attribute.FileTime */ @CompileStatic class LinMetadataPath extends LinPath { - private byte[] results + private byte[] bytes private FileTime creationTime - LinMetadataPath(String resultsObject, FileTime creationTime, LinFileSystem fs, String path, String fragment) { - super(fs, "${path}${fragment ? '#'+ fragment : ''}") - this.results = resultsObject.getBytes("UTF-8") + LinMetadataPath(String metadata, FileTime creationTime, LinFileSystem fs, String path) { + super(fs, path) + this.bytes = metadata.getBytes("UTF-8") this.creationTime = creationTime } InputStream newInputStream() { - return new ByteArrayInputStream(results) + return new ByteArrayInputStream(bytes) } SeekableByteChannel newSeekableByteChannel(){ - return new LinMetadataSeekableByteChannel(results) + return new LinMetadataSeekableByteChannel(bytes) } A readAttributes(Class type){ return (A) new BasicFileAttributes() { @Override - long size() { return results.length } + long size() { return bytes.length } @Override FileTime lastModifiedTime() { return creationTime } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy index b827314d2a..e6784c436d 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/fs/LinPath.groovy @@ -20,11 +20,8 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.file.FileHelper import nextflow.file.LogicalDataPath -import nextflow.lineage.LinPropertyValidator -import nextflow.lineage.LinStore import nextflow.lineage.model.Checksum import nextflow.lineage.model.FileOutput -import nextflow.lineage.model.TaskRun import nextflow.lineage.model.WorkflowRun import nextflow.lineage.serde.LinSerializable import nextflow.util.CacheHelper @@ -62,10 +59,6 @@ class LinPath implements Path, LogicalDataPath { // String with the lineage file path private String filePath - private String query - - private String fragment - /* * Only needed to prevent serialization issues - see https://github.com/nextflow-io/nextflow/issues/5208 */ @@ -76,29 +69,18 @@ class LinPath implements Path, LogicalDataPath { throw new IllegalArgumentException("Invalid LID URI - scheme is different for $SCHEME") } this.fileSystem = fs - setFieldsFormURI(uri) - // Check if query and fragment are with filePath - if( query == null && fragment == null ) - setFieldsFormURI(new URI(toUriString())) + this.filePath = resolve0(fileSystem, norm0("${uri.authority?:''}${uri.path}") ) // Warn if query is specified - if( query ) + if( uri.query ) log.warn("Query string is not supported for Lineage URI: `$uri` -- it will be ignored") - // Validate fragment - if( fragment ) - new LinPropertyValidator().validate(fragment.tokenize('.')) - } - - private void setFieldsFormURI(URI uri){ - this.query = uri.query - this.fragment = uri.fragment - this.filePath = resolve0(fileSystem, norm0("${uri.authority?:''}${uri.path}") ) + // Warn if fragment is specified + if( uri.fragment ) + log.warn("Fragment is not supported for Lineage URI: `$uri` -- it will be ignored") } - protected LinPath(String query, String fragment, String filepath, LinFileSystem fs) { + protected LinPath(String filePath, LinFileSystem fs) { this.fileSystem = fs - this.query = query - this.fragment = fragment - this.filePath = filepath + this.filePath = filePath } LinPath(LinFileSystem fs, String path) { @@ -126,7 +108,7 @@ class LinPath implements Path, LogicalDataPath { return first } - protected static void validateDataOutput(FileOutput lidObject) { + protected static void validateFileOutput(FileOutput lidObject) { final hashedPath = FileHelper.toCanonicalPath(lidObject.path as String) if( !hashedPath.exists() ) throw new FileNotFoundException("Target path $lidObject.path does not exist") @@ -176,16 +158,14 @@ class LinPath implements Path, LogicalDataPath { * * @param fs LinFileSystem associated to the LinPath to find * @param filePath Path to look for the target path - * @param fragment String with path to sub-object inside the description * @param asMetadata Flag to indicate if other metadata descriptions must be returned as LinMetadataPath. * @param asIntermediate Flag to indicate if WorkflowRun and TaskRun subpaths must be returned as LinIntermediatePath. - * @param subpath subpath associated to the target path to find. Used when looking for a parent path * @return Real Path, LinMetadataPath or LinIntermediatePath path associated to the LinPath * @throws Exception - * IllegalArgumentException if the filepath, filesystem or its LinStore are null. - * FileNotFoundException if the filePath, subpath and fragment is not found. + * IllegalArgumentException if the filePath, filesystem or its LinStore are null. + * FileNotFoundException if the filePath is not found in the LinStore. */ - protected static Path findTarget(LinFileSystem fs, String filePath, String fragment, boolean asMetadata, boolean asIntermediate) throws Exception { + protected static Path findTarget(LinFileSystem fs, String filePath, boolean asMetadata, String subPath=null) throws Exception { if( !fs ) throw new IllegalArgumentException("Cannot get target path for a relative lineage path") if( filePath.isEmpty() || filePath == SEPARATOR ) @@ -193,111 +173,38 @@ class LinPath implements Path, LogicalDataPath { final store = fs.getStore() if( !store ) throw new Exception("Lineage store not found - Check Nextflow configuration") - findTarget0(fs, store, filePath, fragment, asMetadata, asIntermediate, []) - } - - private static Path findTarget0(LinFileSystem fs, LinStore store, String filePath, String fragment, boolean asMetadata, boolean asIntermediate, List subpath) { - final object = store.load(filePath) - if( object ) { - return getTargetPathFromObject(object, fs, filePath, fragment, asMetadata, asIntermediate, subpath) - } else { - if( fragment ) { - // If object doesn't exit, it's not possible to get fragment. - throw new FileNotFoundException("Target path '$filePath#$fragment' does not exist") - } - return findTargetFromParent(fs, store, filePath, asIntermediate, subpath) - } - } - - private static Path findTargetFromParent(LinFileSystem fs, LinStore store, String filePath, boolean asIntermediate, List subpath) { + final record = store.load(filePath) + if( record instanceof FileOutput ) + return getFileOutputAsTargetPath(record, subPath) + if( record && asMetadata ) + return getMetadataAsTargetPath(record, fs, filePath) + // recursively check parent paths for metadata descriptions final currentPath = Path.of(filePath) - final parent = Path.of(filePath).getParent() - if( !parent ) { - throw new FileNotFoundException("Target path '$filePath/${subpath.join('/')} does not exist") + final parent = currentPath.getParent() + if( parent ) { + final filename = currentPath.getFileName().toString() + subPath = subPath + ? "${filename}${SEPARATOR}${subPath}".toString() + : filename + return findTarget(fs, parent.toString(), false, subPath) } - ArrayList newChildren = new ArrayList() - newChildren.add(currentPath.getFileName().toString()) - newChildren.addAll(subpath) - //As Metadata set as false because parent path only inspected for FileOutput or intermediate. - return findTarget0(fs, store, parent.toString(), null, false, asIntermediate, newChildren) + throw new FileNotFoundException("Target path '${filePath}' does not exist") } - private static Path getTargetPathFromObject(LinSerializable object, LinFileSystem fs, String filePath, String fragment, boolean asMetadataPath, boolean asIntermediatePath,List subpath) { - // It's not possible to get a target path with both fragment and subpath - if( fragment && subpath ) { - throw new FileNotFoundException("Unable to get a target path for '$filePath' with fragments and subpath") - } - // If metadata flag is active and looks for a fragment returns the metadata despite the type of object - if( asMetadataPath && fragment ){ - return getMetadataAsTargetPath(object, fs, filePath, fragment) - } - // Return real files when FileOutput sub-path - if( object instanceof FileOutput ) { - return getTargetPathFromOutput(object, subpath) - } - // Intermediate run case - if( asIntermediatePath && (object instanceof WorkflowRun || object instanceof TaskRun) ) { - return new LinIntermediatePath(fs, "$filePath/${subpath.join('/')}") - } - - // It is not possible to get a metadata path with subpath. For other cases return metadata path if activated or throw exception - if( asMetadataPath && !subpath) - return getMetadataAsTargetPath(object, fs, filePath, fragment) - else - throw new FileNotFoundException("Target path '${filePath}/${subpath ? '/' + subpath.join('/') : ''}${fragment ? '#' + fragment : ''}' does not exist") - } - - protected static Path getMetadataAsTargetPath(LinSerializable results, LinFileSystem fs, String filePath, String fragment) { - if( !results ) { + protected static Path getMetadataAsTargetPath(record, LinFileSystem fs, String filePath) { + if( !record ) throw new FileNotFoundException("Target path '$filePath' does not exist") - } - if( fragment ) { - return getSubObjectAsPath(fs, filePath, results, fragment) - } else { - return generateLinMetadataPath(fs, filePath, results, fragment) - } - } - - /** - * Get a metadata sub-object as LinMetadataPath. - * If the requested sub-object is the workflow or task outputs, retrieves the outputs from the outputs description. - * - * @param fs LinFilesystem for the te. - * @param key Parent metadata key. - * @param object Parent object. - * @param children Array of string in indicating the properties to navigate to get the sub-object. - * @return LinMetadataPath or null in it does not exist - */ - static LinMetadataPath getSubObjectAsPath(LinFileSystem fs, String key, LinSerializable object, String fragment) { - if( isSearchingOutputs(object, fragment) ) { - // When asking for a Workflow or task output retrieve the outputs description - final outputs = fs.store.load("${key}#output") - if( !outputs ) { - throw new FileNotFoundException("Target path '$key#output' does not exist") - } - return generateLinMetadataPath(fs, key, outputs, fragment) - } else { - return generateLinMetadataPath(fs, key, object, fragment) - } - } - - private static LinMetadataPath generateLinMetadataPath(LinFileSystem fs, String key, Object object, String fragment) { - def creationTime = toFileTime(navigate(object, 'createdAt') as OffsetDateTime ?: OffsetDateTime.now()) - final output = fragment ? navigate(object, fragment) : object - if( !output ) { - throw new FileNotFoundException("Target path '$key#${fragment}' does not exist") - } - return new LinMetadataPath(encodeSearchOutputs(output, true), creationTime, fs, key, fragment) + final creationTime = toFileTime(navigate(record, 'createdAt') as OffsetDateTime ?: OffsetDateTime.now()) + return new LinMetadataPath(encodeSearchOutputs(record, true), creationTime, fs, filePath) } - private static Path getTargetPathFromOutput(FileOutput object, List children) { - final lidObject = object as FileOutput + private static Path getFileOutputAsTargetPath(FileOutput record, String subPath) { // return the real path stored in the metadata - validateDataOutput(lidObject) - def realPath = FileHelper.toCanonicalPath(lidObject.path as String) - if( children && children.size() > 0 ) - realPath = realPath.resolve(children.join(SEPARATOR)) - if( !realPath.exists() ) + validateFileOutput(record) + def realPath = FileHelper.toCanonicalPath(record.path as String) + if( subPath ) + realPath = realPath.resolve(subPath) + if (!realPath.exists()) throw new FileNotFoundException("Target path '$realPath' does not exist") return realPath } @@ -368,7 +275,7 @@ class LinPath implements Path, LogicalDataPath { @Override Path getFileName() { final result = Path.of(filePath).getFileName()?.toString() - return result ? new LinPath(query, fragment, result, null) : null + return result ? new LinPath(result, null) : null } @Override @@ -392,7 +299,7 @@ class LinPath implements Path, LogicalDataPath { throw new IllegalArgumentException("Path name index cannot be less than zero - offending value: $index") final path = Path.of(filePath) if( index == path.nameCount - 1 ) { - return new LinPath( query, fragment, path.getName(index).toString(), null) + return new LinPath(path.getName(index).toString(), null) } return new LinPath(index == 0 ? fileSystem : null, path.getName(index).toString()) } @@ -443,7 +350,7 @@ class LinPath implements Path, LogicalDataPath { return that } else { final newPath = Path.of(filePath).resolve(that.toString()) - return new LinPath(that.query, that.fragment, newPath.toString(), fileSystem) + return new LinPath(newPath.toString(), fileSystem) } } @@ -479,12 +386,12 @@ class LinPath implements Path, LogicalDataPath { // Compare 'filePath' as relative paths path = Path.of(filePath).relativize(Path.of(lidOther.filePath)) } - return new LinPath(lidOther.query, lidOther.fragment, path.getNameCount() > 0 ? path.toString() : SEPARATOR, null) + return new LinPath(path.getNameCount() > 0 ? path.toString() : SEPARATOR, null) } @Override URI toUri() { - return asUri("${SCHEME}://${filePath}${query ? '?' + query : ''}${fragment ? '#' + fragment : ''}") + return asUri("${SCHEME}://${filePath}") } String toUriString() { @@ -512,7 +419,7 @@ class LinPath implements Path, LogicalDataPath { * @throws FileNotFoundException if the record does not exist or its type is not a FileOutput. */ protected Path getTargetPath() { - return findTarget(fileSystem, filePath, fragment, false, false) + return findTarget(fileSystem, filePath, false, false) } /** @@ -522,7 +429,7 @@ class LinPath implements Path, LogicalDataPath { * @throws FileNotFoundException if the record does not exist or its type is not a FileOutput or a intermediate directory */ protected Path getTargetOrIntermediatePath() { - return findTarget(fileSystem, filePath, fragment, false, true) + return findTarget(fileSystem, filePath, false, true) } /** @@ -532,7 +439,7 @@ class LinPath implements Path, LogicalDataPath { * @throws FileNotFoundException if the record does not exist */ protected Path getTargetOrMetadataPath() { - return findTarget(fileSystem, filePath, fragment,true, false) + return findTarget(fileSystem, filePath, true, false) } @Override @@ -581,7 +488,7 @@ class LinPath implements Path, LogicalDataPath { @Override String toString() { - return "$filePath${query ? '?' + query : ''}${fragment ? '#' + fragment : ''}".toString() + return filePath } } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/FileOutput.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/FileOutput.groovy index 6fe9eb831a..1f17d07419 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/FileOutput.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/model/FileOutput.groovy @@ -42,13 +42,13 @@ class FileOutput implements LinSerializable { * Entity that generated the data. Possible entities are: * - a FileOutput if the workflow published from a task data. * - a TaskRun if the data is a task output. - * - a WorkflowRun if the data is generated by the workflow (e.g., an index file). + * - a WorkflowLaunch if the data is generated by the workflow (e.g., an index file). */ String source /** - * Reference to the WorkflowRun that generated the data. + * Reference to the WorkflowLaunch that generated the data. */ - String workflowRun + String workflowLaunch /** * Reference to the task that generated the data. */ diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/TaskOutput.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/TaskOutput.groovy deleted file mode 100644 index 00ef2254ff..0000000000 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/TaskOutput.groovy +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2013-2025, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.lineage.model - -import groovy.transform.Canonical -import groovy.transform.CompileStatic -import nextflow.lineage.serde.LinSerializable - -import java.time.OffsetDateTime - -/** - * Models task results. - * - * @author Jorge Ejarque - */ -@Canonical -@CompileStatic -class TaskOutput implements LinSerializable { - /** - * Reference to the task that generated the output. - */ - String taskRun - /** - * Reference to the WorkflowRun that generated the output. - */ - String workflowRun - /** - * Creation date of this task output description - */ - OffsetDateTime createdAt - /** - * Output of the task - */ - List output - /** - * Labels attached to the task output - */ - List labels -} diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/TaskRun.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/TaskRun.groovy index 225a969dea..95c6b5f302 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/TaskRun.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/model/TaskRun.groovy @@ -73,7 +73,7 @@ class TaskRun implements LinSerializable { */ List binEntries /** - * Workflow run associated to the task run + * Workflow launch associated to the task run */ - String workflowRun + String workflowLaunch } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowOutput.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowLaunch.groovy similarity index 67% rename from modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowOutput.groovy rename to modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowLaunch.groovy index 42ce7d7f46..660bd308b5 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowOutput.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowLaunch.groovy @@ -20,26 +20,32 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic import nextflow.lineage.serde.LinSerializable -import java.time.OffsetDateTime - /** - * Models the results of a workflow execution. + * Models the launch of a workflow execution * * @author Jorge Ejarque params /** - * Workflow output + * Resolved Configuration */ - List output + Map config } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowRun.groovy b/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowRun.groovy index 056c6cf3b6..6e7f9e7f42 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowRun.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/model/WorkflowRun.groovy @@ -16,12 +16,14 @@ package nextflow.lineage.model +import java.time.OffsetDateTime + import groovy.transform.Canonical import groovy.transform.CompileStatic import nextflow.lineage.serde.LinSerializable /** - * Models a Workflow Execution + * Models the completion of a workflow execution. * * @author Jorge Ejarque params + String status /** - * Resolved Configuration + * Workflow output */ - Map config + List output } diff --git a/modules/nf-lineage/src/main/nextflow/lineage/serde/LinEncoder.groovy b/modules/nf-lineage/src/main/nextflow/lineage/serde/LinEncoder.groovy index a971aec4a7..77f80a9923 100644 --- a/modules/nf-lineage/src/main/nextflow/lineage/serde/LinEncoder.groovy +++ b/modules/nf-lineage/src/main/nextflow/lineage/serde/LinEncoder.groovy @@ -18,10 +18,9 @@ package nextflow.lineage.serde import groovy.transform.CompileStatic import nextflow.lineage.model.FileOutput -import nextflow.lineage.model.TaskOutput import nextflow.lineage.model.TaskRun import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun import nextflow.serde.gson.GsonEncoder import nextflow.serde.gson.RuntimeTypeAdapterFactory @@ -42,11 +41,10 @@ class LinEncoder extends GsonEncoder { static RuntimeTypeAdapterFactory newTypeAdapterFactory(){ RuntimeTypeAdapterFactory.of(LinSerializable.class, "type") + .registerSubtype(WorkflowLaunch, WorkflowLaunch.simpleName) .registerSubtype(WorkflowRun, WorkflowRun.simpleName) - .registerSubtype(WorkflowOutput, WorkflowOutput.simpleName) .registerSubtype(Workflow, Workflow.simpleName) .registerSubtype(TaskRun, TaskRun.simpleName) - .registerSubtype(TaskOutput, TaskOutput.simpleName) .registerSubtype(FileOutput, FileOutput.simpleName) } diff --git a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy index 24741f96ad..23b973b7d4 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinHistoryLogTest.groovy @@ -44,11 +44,11 @@ class DefaultLinHistoryLogTest extends Specification { def "write should add a new file to the history folder"() { given: UUID sessionId = UUID.randomUUID() - String runName = "TestRun" - String runLid = "lid://123" + def runName = "TestRun" + def launchLid = "lid://123" when: - linHistoryLog.write(runName, sessionId, runLid) + linHistoryLog.write(runName, sessionId, launchLid) then: def files = historyFile.listFiles() @@ -56,16 +56,40 @@ class DefaultLinHistoryLogTest extends Specification { def parsedRecord = LinHistoryRecord.parse(files[0].text) parsedRecord.sessionId == sessionId parsedRecord.runName == runName + parsedRecord.launchLid == launchLid + parsedRecord.runLid == '-' + parsedRecord.status == '-' + } + + def "update should modify log record for given lid"() { + given: + UUID sessionId = UUID.randomUUID() + def runName = "Run1" + def launchLid = "launch-lid" + def runLid = "run-lid" + def status = "SUCCEEDED" + + and: + linHistoryLog.write(runName, sessionId, launchLid) + + when: + linHistoryLog.finalize(launchLid, runLid, status) + + then: + def files = historyFile.listFiles() + files.size() == 1 + def parsedRecord = LinHistoryRecord.parse(files[0].text) parsedRecord.runLid == runLid + parsedRecord.status == status } def 'should get records' () { given: UUID sessionId = UUID.randomUUID() - String runName = "Run1" - String runLid = "lid://123" + def runName = "Run1" + def launchLid = "lid://123" and: - linHistoryLog.write(runName, sessionId, runLid) + linHistoryLog.write(runName, sessionId, launchLid) when: def records = linHistoryLog.getRecords() @@ -73,7 +97,9 @@ class DefaultLinHistoryLogTest extends Specification { records.size() == 1 records[0].sessionId == sessionId records[0].runName == runName - records[0].runLid == runLid + records[0].launchLid == launchLid + records[0].runLid == '-' + records[0].status == '-' } } diff --git a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy index 52b809f546..c122eab178 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/DefaultLinStoreTest.groovy @@ -27,7 +27,7 @@ import nextflow.lineage.model.DataPath import nextflow.lineage.model.FileOutput import nextflow.lineage.model.Parameter import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowRun +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.serde.LinEncoder import nextflow.lineage.config.LineageConfig import spock.lang.Specification @@ -107,7 +107,7 @@ class DefaultLinStoreTest extends Specification { def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" ) def key = "testKey" - def value1 = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [ new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")] ) + def value1 = new WorkflowLaunch(workflow, uniqueId.toString(), "test_run", [ new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")] ) def key2 = "testKey2" def value2 = new FileOutput("/path/tp/file1", new Checksum("78910", "nextflow", "standard"), "testkey", "testkey", null, 1234, time, time, ["value1", "value2"]) def key3 = "testKey3" diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinExtensionImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinExtensionImplTest.groovy index 6eb0ee2bbf..8f40c1dc2b 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinExtensionImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinExtensionImplTest.groovy @@ -31,7 +31,7 @@ import nextflow.lineage.model.DataPath import nextflow.lineage.model.FileOutput import nextflow.lineage.model.Parameter import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowRun +import nextflow.lineage.model.WorkflowLaunch import spock.lang.Specification import spock.lang.TempDir @@ -62,7 +62,7 @@ class LinExtensionImplTest extends Specification { def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" ) def key = "testKey" - def value1 = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [ new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")] ) + def value1 = new WorkflowLaunch(workflow, uniqueId.toString(), "test_run", [ new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")] ) def key2 = "testKey2" def value2 = new FileOutput("/path/tp/file1", new Checksum("78910", "nextflow", "standard"), "testkey", "testkey", "taskid", 1234, time, time, ["value1","value2"]) def key3 = "testKey3" @@ -99,7 +99,7 @@ class LinExtensionImplTest extends Specification { when: results = CH.create() - linExt.fromLineage(session, results, [workflowRun: "testkey", taskRun: "taskid", label: "value2"]) + linExt.fromLineage(session, results, [workflowLaunch: "testkey", taskRun: "taskid", label: "value2"]) then: linExt.getStore(session) >> lidStore and: diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinHistoryRecordTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinHistoryRecordTest.groovy index c874ed2b1f..5c5c6c0505 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinHistoryRecordTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinHistoryRecordTest.groovy @@ -31,11 +31,11 @@ class LinHistoryRecordTest extends Specification { thrown(IllegalArgumentException) } - def "LinHistoryRecord parse should handle 4-column record"() { + def "LinHistoryRecord parse should handle TSV record"() { given: def timestamp = new Date() def formattedTimestamp = LinHistoryRecord.TIMESTAMP_FMT.format(timestamp) - def line = "${formattedTimestamp}\trun-1\t${UUID.randomUUID()}\tlid://123" + def line = "${formattedTimestamp}\trun-1\t${UUID.randomUUID()}\tSUCCEEDED\tlid://123\tlid://456" when: def record = LinHistoryRecord.parse(line) @@ -43,19 +43,21 @@ class LinHistoryRecordTest extends Specification { then: record.timestamp != null record.runName == "run-1" - record.runLid == "lid://123" + record.status == "SUCCEEDED" + record.launchLid == "lid://123" + record.runLid == "lid://456" } def "LinHistoryRecord toString should produce tab-separated format"() { given: UUID sessionId = UUID.randomUUID() - def record = new LinHistoryRecord(new Date(), "TestRun", sessionId, "lid://123") + def record = new LinHistoryRecord(new Date(), "TestRun", sessionId, "SUCCEEDED", "lid://123", "lid://456") when: def line = record.toString() then: line.contains("\t") - line.split("\t").size() == 4 + line.split("\t").size() == 6 } } diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy index e9e261250e..379d5d5c7c 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinObserverTest.groovy @@ -30,9 +30,8 @@ import nextflow.lineage.model.Checksum import nextflow.lineage.model.FileOutput import nextflow.lineage.model.DataPath import nextflow.lineage.model.Parameter -import nextflow.lineage.model.TaskOutput import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun import nextflow.lineage.serde.LinEncoder import nextflow.lineage.config.LineageConfig @@ -184,12 +183,12 @@ class LinObserverTest extends Specification { def observer = new LinObserver(session, store) def mainScript = new DataPath("file://${scriptFile.toString()}", new Checksum("78910", "nextflow", "standard")) def workflow = new Workflow([mainScript],"https://nextflow.io/nf-test/", "123456" ) - def workflowRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [], config) + def workflowLaunch = new WorkflowLaunch(workflow, uniqueId.toString(), "test_run", [], config) when: observer.onFlowCreate(session) observer.onFlowBegin() then: - folder.resolve("${observer.executionHash}/.data.json").text == new LinEncoder().encode(workflowRun) + folder.resolve("${observer.launchId}/.data.json").text == new LinEncoder().encode(workflowLaunch) cleanup: folder?.deleteDir() @@ -237,7 +236,7 @@ class LinObserverTest extends Specification { and: def observer = new LinObserver(session, store) def normalizer = new PathNormalizer(metadata) - observer.executionHash = "hash" + observer.launchId = "hash" observer.normalizer = normalizer and: def hash = HashCode.fromString("1234567890") @@ -314,23 +313,10 @@ class LinObserverTest extends Specification { def taskRunResult = store.load("${hash.toString()}") def dataOutputResult1 = store.load("${hash}/fileOut1.txt") as FileOutput def dataOutputResult2 = store.load("${hash}/fileOut2.txt") as FileOutput - def taskOutputsResult = store.load("${hash}#output") as TaskOutput then: taskRunResult == taskDescription dataOutputResult1 == dataOutput1 dataOutputResult2 == dataOutput2 - taskOutputsResult.taskRun == "lid://1234567890" - taskOutputsResult.workflowRun == "lid://hash" - taskOutputsResult.output.size() == 3 - taskOutputsResult.output.get(0).type == "path" - taskOutputsResult.output.get(0).name == "file1" - taskOutputsResult.output.get(0).value == "lid://1234567890/fileOut1.txt" - taskOutputsResult.output.get(1).type == "path" - taskOutputsResult.output.get(1).name == "file2" - taskOutputsResult.output.get(1).value == ["lid://1234567890/fileOut2.txt"] - taskOutputsResult.output.get(2).type == "val" - taskOutputsResult.output.get(2).name == "id" - taskOutputsResult.output.get(2).value == "value" cleanup: folder?.deleteDir() @@ -346,7 +332,7 @@ class LinObserverTest extends Specification { } store.open(LineageConfig.create(session)) def observer = Spy(new LinObserver(session, store)) - observer.executionHash = "hash" + observer.launchId = "hash" and: def workDir = folder.resolve('12/34567890') Files.createDirectories(workDir) @@ -483,7 +469,7 @@ class LinObserverTest extends Specification { Path.of('/path/to/outDir') | Path.of('../relative') | "relative" } - def 'should save workflow output'() { + def 'should save workflow run'() { given: def folder = Files.createTempDirectory('test') def config = [lineage:[enabled: true, store:[location:folder.toString()]]] @@ -508,19 +494,14 @@ class LinObserverTest extends Specification { getUniqueId()>>uniqueId getRunName()>>"test_run" getParams() >> new ScriptBinding.ParamsMap() + isSuccess() >> true } store.open(LineageConfig.create(session)) def observer = new LinObserver(session, store) def encoder = new LinEncoder() - - when: 'Starting workflow' - observer.onFlowCreate(session) - observer.onFlowBegin() - then: 'History file should contain execution hash' - def lid = LinHistoryRecord.parse(folder.resolve(".history/${observer.executionHash}").text) - lid.runLid == asUriString(observer.executionHash) - lid.sessionId == uniqueId - lid.runName == "test_run" + and: + observer.onFlowCreate(session) + observer.onFlowBegin() when: ' publish output with source file' def outFile1 = outputDir.resolve('foo/file.bam') @@ -536,9 +517,9 @@ class LinObserverTest extends Specification { def attrs1 = Files.readAttributes(outFile1, BasicFileAttributes) def fileHash1 = CacheHelper.hasher(outFile1).hash().toString() def output1 = new FileOutput(outFile1.toString(), new Checksum(fileHash1, "nextflow", "standard"), - "lid://123987/file.bam", "$LID_PROT${observer.executionHash}", null, + "lid://123987/file.bam", "$LID_PROT${observer.launchId}", null, attrs1.size(), LinUtils.toDate(attrs1.creationTime()), LinUtils.toDate(attrs1.lastModifiedTime()) ) - folder.resolve("${observer.executionHash}/foo/file.bam/.data.json").text == encoder.encode(output1) + folder.resolve("${observer.launchId}/foo/file.bam/.data.json").text == encoder.encode(output1) when: 'publish without source path' def outFile2 = outputDir.resolve('foo/file2.bam') @@ -550,15 +531,17 @@ class LinObserverTest extends Specification { observer.onWorkflowOutput(new WorkflowOutputEvent("b", outFile2)) then: 'Check outFile2 metadata in lid store' def output2 = new FileOutput(outFile2.toString(), new Checksum(fileHash2, "nextflow", "standard"), - "lid://${observer.executionHash}" , "lid://${observer.executionHash}", null, + "lid://${observer.launchId}" , "lid://${observer.launchId}", null, attrs2.size(), LinUtils.toDate(attrs2.creationTime()), LinUtils.toDate(attrs2.lastModifiedTime()) ) - folder.resolve("${observer.executionHash}/foo/file2.bam/.data.json").text == encoder.encode(output2) + folder.resolve("${observer.launchId}/foo/file2.bam/.data.json").text == encoder.encode(output2) when: 'Workflow complete' observer.onFlowComplete() - then: 'Check WorkflowOutput is written in the lid store' - def resultsRetrieved = store.load("${observer.executionHash}#output") as WorkflowOutput - resultsRetrieved.output == [new Parameter(Path.simpleName, "a", "lid://${observer.executionHash}/foo/file.bam"), new Parameter(Path.simpleName, "b", "lid://${observer.executionHash}/foo/file2.bam")] + then: 'Check history file is updated and Workflow Result is written in the lid store' + def finalLid = store.getHistoryLog().getRecord(uniqueId).runLid.substring(LID_PROT.size()) + def workflowRun = store.load(finalLid) as WorkflowRun + workflowRun.status == "SUCCEEDED" + workflowRun.output == [new Parameter(Path.simpleName, "a", "lid://${observer.launchId}/foo/file.bam"), new Parameter(Path.simpleName, "b", "lid://${observer.launchId}/foo/file2.bam")] cleanup: folder?.deleteDir() diff --git a/modules/nf-lineage/src/test/nextflow/lineage/LinUtilsTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/LinUtilsTest.groovy index fc118e13a5..6aae9792c9 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/LinUtilsTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/LinUtilsTest.groovy @@ -22,7 +22,7 @@ import nextflow.lineage.model.Checksum import nextflow.lineage.model.DataPath import nextflow.lineage.model.Parameter import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun import nextflow.lineage.config.LineageConfig import spock.lang.Specification @@ -72,53 +72,25 @@ class LinUtilsTest extends Specification{ def uniqueId = UUID.randomUUID() def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) def workflow = new Workflow([mainScript], "https://nextflow.io/nf-test/", "123456") - def key1 = "testKey" - def value1 = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")]) - def outputs1 = new WorkflowOutput(OffsetDateTime.now(), "lid://testKey", [new Parameter( "String", "output", "name")] ) - def key2 = "testKey2" - def value2 = new WorkflowRun(workflow, uniqueId.toString(), "test_run2", [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")]) + def launchId = "testLaunch" + def workflowLaunch = new WorkflowLaunch(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")]) def lidStore = new DefaultLinStore() lidStore.open(config) - lidStore.save(key1, value1) - lidStore.save("$key1#output", outputs1) - lidStore.save(key2, value2) + lidStore.save(launchId, workflowLaunch) when: - def params = LinUtils.getMetadataObject(lidStore, new URI('lid://testKey#params')) + def record = LinUtils.getMetadataObject(lidStore, new URI('lid://testLaunch')) then: - params instanceof List - (params as List).size() == 2 + record instanceof WorkflowLaunch + record.name == "test_run" + record.params.size() == 2 when: - def outputs = LinUtils.getMetadataObject(lidStore, new URI('lid://testKey#output')) - then: - outputs instanceof List - def param = (outputs as List)[0] as Parameter - param.name == "output" - - when: - outputs = LinUtils.getMetadataObject(lidStore, new URI('lid://testKey2#output')) - then: - outputs instanceof List - (outputs as List).isEmpty() - - when: - LinUtils.getMetadataObject(lidStore, new URI('lid://testKey#no-exist')) - then: - thrown(IllegalArgumentException) - - when: - LinUtils.getMetadataObject(lidStore, new URI('lid://testKey#outputs.no-exist')) - then: - thrown(IllegalArgumentException) - - when: - LinUtils.getMetadataObject(lidStore, new URI('lid://no-exist#something')) + LinUtils.getMetadataObject(lidStore, new URI('lid://no-exist')) then: thrown(FileNotFoundException) } - def "should check params in an object"() { given: def obj = [ "type": "value", "workflow": ["repository": "subvalue"], "output" : [ ["path":"/to/file"],["path":"file2"] ], "labels": ["a","b"] ] @@ -187,7 +159,7 @@ class LinUtilsTest extends Specification{ def uniqueId = UUID.randomUUID() def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) def workflow = new Workflow([mainScript], "https://nextflow.io/nf-test/", "123456") - def wfRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", [key: "value1"]), new Parameter("String", "param2", "value2")]) + def wfRun = new WorkflowLaunch(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", [key: "value1"]), new Parameter("String", "param2", "value2")]) expect: LinUtils.navigate(wfRun, "workflow.commitId") == "123456" diff --git a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy index 0d7c8767af..b0591607f8 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/cli/LinCommandImplTest.groovy @@ -28,7 +28,7 @@ import nextflow.lineage.model.DataPath import nextflow.lineage.model.Parameter import nextflow.lineage.model.TaskRun import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowRun +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.serde.LinEncoder import nextflow.plugin.Plugins import org.junit.Rule @@ -77,8 +77,9 @@ class LinCommandImplTest extends Specification{ def lidLog = new DefaultLinHistoryLog(historyFile) def uniqueId = UUID.randomUUID() def date = new Date(); - def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tlid://123456".toString() + def recordEntry = "${LinHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tSUCCEEDED\tlid://123456\tlid://456789".toString() lidLog.write("run_name", uniqueId, "lid://123456", date) + lidLog.finalize(uniqueId, "lid://456789", "SUCCEEDED") when: new LinCommandImpl().list(configMap) def stdout = capture @@ -95,7 +96,7 @@ class LinCommandImplTest extends Specification{ def 'should print no history' (){ given: - def historyFile = storeLocation.resolve(".meta/.history") + def historyFile = storeLocation.resolve(".history") Files.createDirectories(historyFile.parent) when: @@ -295,7 +296,7 @@ class LinCommandImplTest extends Specification{ "lid://12345", "lid://12345", null, 1234, time, time, null) lidFile.text = encoder.encode(entry) def wf = new Workflow([new DataPath("/path/to/main.nf)")], "hello-nf", "aasdklk") - entry = new WorkflowRun(wf,"sessionId","run_name", + entry = new WorkflowLaunch(wf,"sessionId","run_name", [new Parameter( "String", "sample_id","ggal_gut"), new Parameter("Integer","reads",2)]) lidFile3.text = encoder.encode(entry) @@ -327,31 +328,6 @@ class LinCommandImplTest extends Specification{ outputHtml.text == expectedOutput } - def 'should show an error if trying to do a query'(){ - given: - def lidFile = storeLocation.resolve("12345/.data.json") - Files.createDirectories(lidFile.parent) - def encoder = new LinEncoder().withPrettyPrint(true) - def time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(123456789), ZoneOffset.UTC) - def entry = new FileOutput("path/to/file",new Checksum("45372qe","nextflow","standard"), - "lid://123987/file.bam", "lid://123987/", null, 1234, time, time, null) - def jsonSer = encoder.encode(entry) - def expectedOutput = "Error loading lid:///?type=FileOutput - Cannot get record from the root LID URI" - lidFile.text = jsonSer - when: - new LinCommandImpl().view(configMap, ["lid:///?type=FileOutput"]) - def stdout = capture - .toString() - .readLines()// remove the log part - .findResults { line -> !line.contains('DEBUG') ? line : null } - .findResults { line -> !line.contains('INFO') ? line : null } - .findResults { line -> !line.contains('plugin') ? line : null } - - then: - stdout.size() == expectedOutput.readLines().size() - stdout.join('\n') == expectedOutput - } - def 'should diff'(){ given: def lidFile = storeLocation.resolve("12345/.data.json") @@ -382,7 +358,7 @@ class LinCommandImplTest extends Specification{ }, - "source": "lid://123987/file.bam", + "source": "lid://123987/file2.bam", - "workflowRun": "lid://123987/", + "workflowLaunch": "lid://123987/", "taskRun": null, - "size": 1234, + "size": 1235, @@ -459,7 +435,7 @@ class LinCommandImplTest extends Specification{ Files.createDirectories(lidFile.parent) def lidFile2 = storeLocation.resolve("123987/file2.bam/.data.json") Files.createDirectories(lidFile2.parent) - def lidFile3 = storeLocation.resolve(".meta/123987/file3.bam/.data.json") + def lidFile3 = storeLocation.resolve("123987/file3.bam/.data.json") Files.createDirectories(lidFile3.parent) def encoder = new LinEncoder().withPrettyPrint(true) def time = OffsetDateTime.ofInstant(Instant.ofEpochMilli(123456789), ZoneOffset.UTC) diff --git a/modules/nf-lineage/src/test/nextflow/lineage/fs/LinFileSystemProviderTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/fs/LinFileSystemProviderTest.groovy index 192e1c02d2..96e7a456e4 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/fs/LinFileSystemProviderTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/fs/LinFileSystemProviderTest.groovy @@ -181,19 +181,20 @@ class LinFileSystemProviderTest extends Specification { def config = [lineage:[store:[location:wdir.toString()]]] def outputMeta = wdir.resolve("12345") outputMeta.mkdirs() - outputMeta.resolve(".data.json").text = '{"type":"WorkflowRun","sessionId":"session","name":"run_name","params":[{"type":"String","name":"param1","value":"value1"}]}' + def metadata = '{\n "type": "WorkflowRun",\n "createdAt": null,\n "workflowLaunch": null,\n "status": null,\n "output": null\n}' + outputMeta.resolve(".data.json").text = metadata Global.session = Mock(Session) { getConfig()>>config } and: def provider = new LinFileSystemProvider() - def lid = provider.getPath(LinPath.asUri('lid://12345#name')) + def lid = provider.getPath(LinPath.asUri('lid://12345')) when: def channel = provider.newByteChannel(lid, Set.of(StandardOpenOption.READ)) then: channel.isOpen() channel.position() == 0 - channel.size() == '"run_name"'.getBytes().size() + channel.size() == metadata.getBytes().size() when: channel.truncate(25) @@ -206,7 +207,7 @@ class LinFileSystemProviderTest extends Specification { def bytes = new byte[read] buffer.get(0,bytes) then: - bytes =='"run_name"'.getBytes() + bytes == metadata.getBytes() when: channel.position(2) diff --git a/modules/nf-lineage/src/test/nextflow/lineage/fs/LinPathTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/fs/LinPathTest.groovy index 4fc87ccfc7..cf7a558bb6 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/fs/LinPathTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/fs/LinPathTest.groovy @@ -18,10 +18,10 @@ package nextflow.lineage.fs import nextflow.lineage.LinUtils import nextflow.lineage.model.Checksum +import nextflow.lineage.model.FileOutput import nextflow.lineage.model.Parameter import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput -import nextflow.lineage.model.FileOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun import nextflow.lineage.serde.LinEncoder import nextflow.util.CacheHelper @@ -68,25 +68,15 @@ class LinPathTest extends Specification { def path = new LinPath(fs, new URI( URI_STRING )) then: path.filePath == PATH - path.fragment == FRAGMENT - path.query == QUERY where: - URI_STRING | PATH | QUERY | FRAGMENT - "lid://1234/hola" | "1234/hola" | null | null - "lid://1234/hola#workflow.repository" | "1234/hola" | null | "workflow.repository" - "lid://1234/#workflow.repository" | "1234" | null | "workflow.repository" - "lid://1234/?q=a&b=c" | "1234" | "q=a&b=c" | null - "lid://1234/?q=a&b=c#workflow.repository" | "1234" | "q=a&b=c" | "workflow.repository" - "lid:///" | "/" | null | null - } - - def 'should throw exception if fragment contains an unknown property'() { - when: - new LinPath(fs, new URI ("lid://1234/hola#no-exist")) - then: - thrown(IllegalArgumentException) - + URI_STRING | PATH + "lid://1234/hola" | "1234/hola" + "lid://1234/hola#workflow.repository" | "1234/hola" + "lid://1234/#workflow.repository" | "1234" + "lid://1234/?q=a&b=c" | "1234" + "lid://1234/?q=a&b=c#workflow.repository" | "1234" + "lid:///" | "/" } def 'should warn if query is specified'() { @@ -168,14 +158,15 @@ class LinPathTest extends Specification { wdir.resolve('12345/output1/.data.json').text = '{"type":"FileOutput", "path": "' + outputFolder.toString() + '"}' wdir.resolve('12345/path/to/file2.txt/.data.json').text = '{"type":"FileOutput", "path": "' + outputFile.toString() + '"}' def time = OffsetDateTime.now() - def wfResultsMetadata = new LinEncoder().withPrettyPrint(true).encode(new WorkflowOutput(time, "lid://1234", [new Parameter( "Path", "a", "lid://1234/a.txt")])) + def workflowRun = new WorkflowRun(time, "lid://1234", "SUCCEEDED", [new Parameter( "Path", "a", "lid://1234/a.txt")]) + def wfResultsMetadata = new LinEncoder().withPrettyPrint(true).encode(workflowRun) wdir.resolve('5678/').mkdirs() wdir.resolve('5678/.data.json').text = wfResultsMetadata expect: 'Get real path when LinPath is the output data or a subfolder' new LinPath(lidFs, '12345/output1').getTargetPath() == outputFolder - new LinPath(lidFs,'12345/output1/some/path').getTargetPath() == outputSubFolder - new LinPath(lidFs,'12345/output1/some/path/file1.txt').getTargetPath().text == outputSubFolderFile.text + new LinPath(lidFs, '12345/output1/some/path').getTargetPath() == outputSubFolder + new LinPath(lidFs, '12345/output1/some/path/file1.txt').getTargetPath().text == outputSubFolderFile.text new LinPath(lidFs, '12345/path/to/file2.txt').getTargetPath().text == outputFile.text when: 'LinPath fs is null' @@ -223,58 +214,6 @@ class LinPathTest extends Specification { then: result instanceof LinMetadataPath result.text == wfResultsMetadata - - when: 'Lid description subobject' - def result2 = new LinPath(lidFs, '5678#output').getTargetOrMetadataPath() - then: - result2 instanceof LinMetadataPath - result2.text == LinUtils.encodeSearchOutputs([new Parameter("Path","a", "lid://1234/a.txt")], true) - - when: 'Lid subobject does not exist' - new LinPath(lidFs, '23456#notexists').getTargetOrMetadataPath() - then: - thrown(IllegalArgumentException) - } - - def 'should get subobjects as path' (){ - given: - def lidFs = new LinFileSystemProvider().newFileSystem(new URI("lid:///"), [enabled: true, store: [location: wdir.toString()]]) - def wf = new WorkflowRun(new Workflow([],"repo", "commit"), "sessionId", "runId", [new Parameter("String", "param1", "value1")]) - - when: 'workflow repo in workflow run' - Path p = LinPath.getMetadataAsTargetPath(wf, lidFs, "123456", "workflow.repository") - then: - p instanceof LinMetadataPath - p.text == '"repo"' - - when: 'outputs' - def outputs = new WorkflowOutput(OffsetDateTime.now(), "lid://123456", [new Parameter("Collection", "samples", ["sample1", "sample2"])]) - lidFs.store.save("123456#output", outputs) - Path p2 = LinPath.getMetadataAsTargetPath(wf, lidFs, "123456", "output") - then: - p2 instanceof LinMetadataPath - p2.text == LinUtils.encodeSearchOutputs([new Parameter("Collection", "samples", ["sample1", "sample2"])], true) - - when: 'child does not exists' - LinPath.getMetadataAsTargetPath(wf, lidFs, "123456", "no-exist") - then: - def exception = thrown(FileNotFoundException) - exception.message == "Target path '123456#no-exist' does not exist" - - when: 'outputs does not exists' - LinPath.getMetadataAsTargetPath(wf, lidFs, "6789", "output") - then: - def exception1 = thrown(FileNotFoundException) - exception1.message == "Target path '6789#output' does not exist" - - when: 'null object' - LinPath.getMetadataAsTargetPath(null, lidFs, "123456", "no-exist") - then: - def exception2 = thrown(FileNotFoundException) - exception2.message == "Target path '123456' does not exist" - - cleanup: - wdir.resolve("123456").deleteDir() } def 'should get file name' () { @@ -283,7 +222,6 @@ class LinPathTest extends Specification { where: PATH | EXPECTED '1234567890/this/file.bam' | new LinPath(null, 'file.bam') - '12345/hola?query#output' | new LinPath("query", "output", "hola", null) } @@ -321,7 +259,6 @@ class LinPathTest extends Specification { '123/a' | 1 | new LinPath(null, 'a') '123/a/' | 1 | new LinPath(null, 'a') '123/a/b' | 2 | new LinPath(null, 'b') - '123/a?q#output' | 1 | new LinPath(null, 'a?q#output') } @Unroll @@ -637,7 +574,7 @@ class LinPathTest extends Specification { file.text = "this is a data file" def hash = CacheHelper.hasher(file).hash().toString() def correctData = new FileOutput(file.toString(), new Checksum(hash,"nextflow", "standard")) - LinPath.validateDataOutput(correctData) + LinPath.validateFileOutput(correctData) def stdout = capture .toString() .readLines()// remove the log part @@ -658,7 +595,7 @@ class LinPathTest extends Specification { file.text = "this is a data file" def hash = CacheHelper.hasher(file).hash().toString() def correctData = new FileOutput(file.toString(), new Checksum("abscd","nextflow", "standard")) - LinPath.validateDataOutput(correctData) + LinPath.validateFileOutput(correctData) def stdout = capture .toString() .readLines()// remove the log part @@ -680,7 +617,7 @@ class LinPathTest extends Specification { file.text = "this is a data file" def hash = CacheHelper.hasher(file).hash().toString() def correctData = new FileOutput(file.toString(), new Checksum(hash,"not-supported", "standard")) - LinPath.validateDataOutput(correctData) + LinPath.validateFileOutput(correctData) def stdout = capture .toString() .readLines()// remove the log part @@ -699,7 +636,7 @@ class LinPathTest extends Specification { def 'should throw exception when file not found validating hash'(){ when: def correctData = new FileOutput("not/existing/file", new Checksum("120741","nextflow", "standard")) - LinPath.validateDataOutput(correctData) + LinPath.validateFileOutput(correctData) then: thrown(FileNotFoundException) diff --git a/modules/nf-lineage/src/test/nextflow/lineage/serde/LinEncoderTest.groovy b/modules/nf-lineage/src/test/nextflow/lineage/serde/LinEncoderTest.groovy index 978aac461b..51a1f920ba 100644 --- a/modules/nf-lineage/src/test/nextflow/lineage/serde/LinEncoderTest.groovy +++ b/modules/nf-lineage/src/test/nextflow/lineage/serde/LinEncoderTest.groovy @@ -20,10 +20,9 @@ import nextflow.lineage.model.Checksum import nextflow.lineage.model.DataPath import nextflow.lineage.model.Parameter import nextflow.lineage.model.FileOutput -import nextflow.lineage.model.TaskOutput import nextflow.lineage.model.TaskRun import nextflow.lineage.model.Workflow -import nextflow.lineage.model.WorkflowOutput +import nextflow.lineage.model.WorkflowLaunch import nextflow.lineage.model.WorkflowRun import spock.lang.Specification @@ -55,22 +54,22 @@ class LinEncoderTest extends Specification{ } - def 'should encode and decode WorkflowRuns'(){ + def 'should encode and decode WorkflowLaunch'(){ given: def encoder = new LinEncoder() and: def uniqueId = UUID.randomUUID() def mainScript = new DataPath("file://path/to/main.nf", new Checksum("78910", "nextflow", "standard")) def workflow = new Workflow([mainScript], "https://nextflow.io/nf-test/", "123456") - def wfRun = new WorkflowRun(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")]) + def wfRun = new WorkflowLaunch(workflow, uniqueId.toString(), "test_run", [new Parameter("String", "param1", "value1"), new Parameter("String", "param2", "value2")]) when: def encoded = encoder.encode(wfRun) def object = encoder.decode(encoded) then: - object instanceof WorkflowRun - def result = object as WorkflowRun + object instanceof WorkflowLaunch + def result = object as WorkflowLaunch result.workflow instanceof Workflow result.workflow.scriptFiles.first instanceof DataPath result.workflow.scriptFiles.first.path == "file://path/to/main.nf" @@ -83,21 +82,22 @@ class LinEncoderTest extends Specification{ result.params.get(0).name == "param1" } - def 'should encode and decode WorkflowResults'(){ + def 'should encode and decode WorkflowRun'(){ given: def encoder = new LinEncoder() and: def time = OffsetDateTime.now() - def wfResults = new WorkflowOutput(time, "lid://1234", [new Parameter("String", "a", "A"), new Parameter("String", "b", "B")]) + def wfResults = new WorkflowRun(time, "lid://1234", "SUCCEEDED", [new Parameter("String", "a", "A"), new Parameter("String", "b", "B")]) when: def encoded = encoder.encode(wfResults) def object = encoder.decode(encoded) then: - object instanceof WorkflowOutput - def result = object as WorkflowOutput + object instanceof WorkflowRun + def result = object as WorkflowRun result.createdAt == time - result.workflowRun == "lid://1234" + result.workflowLaunch == "lid://1234" + result.status == "SUCCEEDED" result.output == [new Parameter("String", "a", "A"), new Parameter("String", "b", "B")] } @@ -133,38 +133,17 @@ class LinEncoderTest extends Specification{ result.binEntries.get(0).checksum.value == "78910" } - def 'should encode and decode TaskResults'(){ - given: - def encoder = new LinEncoder() - and: - def time = OffsetDateTime.now() - def parameter = new Parameter("a","b", "c") - def wfResults = new TaskOutput("lid://1234", "lid://5678", time, [parameter], null) - when: - def encoded = encoder.encode(wfResults) - def object = encoder.decode(encoded) - - then: - object instanceof TaskOutput - def result = object as TaskOutput - result.createdAt == time - result.taskRun == "lid://1234" - result.workflowRun == "lid://5678" - result.output.size() == 1 - result.output[0] == parameter - } - def 'object with null date attributes' () { given: def encoder = new LinEncoder() and: - def wfResults = new WorkflowOutput(null, "lid://1234") + def wfResults = new WorkflowRun(null, "lid://1234") when: def encoded = encoder.encode(wfResults) def object = encoder.decode(encoded) then: - encoded == '{"type":"WorkflowOutput","createdAt":null,"workflowRun":"lid://1234","output":null}' - def result = object as WorkflowOutput + encoded == '{"type":"WorkflowRun","createdAt":null,"workflowLaunch":"lid://1234","status":null,"output":null}' + def result = object as WorkflowRun result.createdAt == null }