From 498a85e59263936d6800955a1a345711908cb476 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 8 Jul 2025 10:20:57 -0500 Subject: [PATCH] Static types --- main.nf | 77 +++++--- modules/local/aspera_cli/main.nf | 42 ++-- modules/local/sra_fastq_ftp/main.nf | 46 +++-- modules/local/sra_ids_to_runinfo/main.nf | 10 +- modules/local/sra_runinfo_to_ftp/main.nf | 12 +- .../custom/sratoolsncbisettings/main.nf | 9 +- modules/nf-core/sratools/fasterqdump/main.nf | 37 ++-- .../sratools/fasterqdump/nextflow.config | 2 +- .../fasterqdump/tests/nextflow.config | 2 +- modules/nf-core/sratools/prefetch/main.nf | 26 +-- .../prefetch/templates/retry_with_backoff.sh | 4 +- nextflow.config | 16 +- nextflow_output.json | 53 +++++ nextflow_schema.json | 47 +---- .../utils_nfcore_fetchngs_pipeline/main.nf | 72 +++---- .../main.nf | 21 +- .../nf-core/utils_nextflow_pipeline/main.nf | 14 +- .../nf-core/utils_nfcore_pipeline/main.nf | 141 +++++++------- workflows/sra/main.nf | 181 ++++++++---------- 19 files changed, 417 insertions(+), 395 deletions(-) create mode 100644 nextflow_output.json diff --git a/main.nf b/main.nf index 468be723..151d3219 100644 --- a/main.nf +++ b/main.nf @@ -9,43 +9,40 @@ ---------------------------------------------------------------------------------------- */ -nextflow.preview.output = true +nextflow.preview.operators = true +nextflow.preview.typeChecking = true /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - IMPORT FUNCTIONS / MODULES / SUBWORKFLOWS / WORKFLOWS + IMPORT FUNCTIONS / MODULES / SUBWORKFLOWS / WORKFLOWS / TYPES ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ include { SRA } from './workflows/sra' include { PIPELINE_INITIALISATION } from './subworkflows/local/utils_nfcore_fetchngs_pipeline' include { PIPELINE_COMPLETION } from './subworkflows/local/utils_nfcore_fetchngs_pipeline' -include { softwareVersionsToYAML } from './subworkflows/nf-core/utils_nfcore_pipeline' +include { SOFTWARE_VERSIONS } from './subworkflows/nf-core/utils_nfcore_pipeline' +include { Sample } from './workflows/sra' /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - NAMED WORKFLOWS FOR PIPELINE + WORKFLOW INPUTS ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ -// -// WORKFLOW: Run main nf-core/fetchngs analysis pipeline depending on type of identifier provided -// -workflow NFCORE_FETCHNGS { +params { - take: - ids // channel: database ids read in from --input + // List of SRA/ENA/GEO/DDBJ identifiers to download their associated metadata and FastQ files + input: Path - main: + // Comma-separated list of ENA metadata fields to fetch before downloading data + ena_metadata_fields: String = '' - // - // WORKFLOW: Download FastQ files for SRA / ENA / GEO / DDBJ ids - // - SRA ( ids ) + // Only download metadata for public data database ids and don't download the FastQ files + skip_fastq_download: Boolean = false - emit: - samples = SRA.out.samples - metadata = SRA.out.metadata + // dbGaP repository key + dbgap_key: Path? } /* @@ -60,12 +57,12 @@ workflow { // // SUBWORKFLOW: Run initialisation tasks // - PIPELINE_INITIALISATION ( + ids = PIPELINE_INITIALISATION ( params.version, params.validate_params, params.monochrome_logs, args, - params.outdir, + workflow.outputDir, params.input, params.ena_metadata_fields ) @@ -73,9 +70,20 @@ workflow { // // WORKFLOW: Run primary workflows for the pipeline // - NFCORE_FETCHNGS ( - PIPELINE_INITIALISATION.out.ids + sra = SRA ( + channel.fromList(ids), + [ + ena_metadata_fields: params.ena_metadata_fields, + skip_fastq_download: params.skip_fastq_download, + dbgap_key: params.dbgap_key + ] ) + + // + // SUBWORKFLOW: Collect software versions + // + versions = SOFTWARE_VERSIONS() + // // SUBWORKFLOW: Run completion tasks // @@ -83,19 +91,27 @@ workflow { params.email, params.email_on_fail, params.plaintext_email, - params.outdir, + workflow.outputDir, params.monochrome_logs, params.hook_url ) publish: - samples = NFCORE_FETCHNGS.out.samples - metadata = NFCORE_FETCHNGS.out.metadata - versions = softwareVersionsToYAML() + samples = sra.samples + runinfo_ftp = sra.runinfo_ftp + versions = versions } +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + WORKFLOW OUTPUTS +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ + output { - samples { + + // List of FASTQ samples with optional MD5 checksums + samples: Channel { path { sample -> sample.fastq_1 >> 'fastq/' sample.fastq_2 >> 'fastq/' @@ -107,12 +123,13 @@ output { } } - metadata { + // List of download links for the given sample ids + runinfo_ftp: Channel { path 'metadata' } - versions { - path '.' + // Manifest of tool versions used by the pipeline for MultiQC + versions: Map { index { path 'nf_core_fetchngs_software_mqc_versions.yml' } diff --git a/modules/local/aspera_cli/main.nf b/modules/local/aspera_cli/main.nf index 104b55af..bb03abea 100644 --- a/modules/local/aspera_cli/main.nf +++ b/modules/local/aspera_cli/main.nf @@ -1,5 +1,5 @@ process ASPERA_CLI { - tag "$meta.id" + tag "$id" label 'process_medium' conda "${moduleDir}/environment.yml" @@ -8,18 +8,28 @@ process ASPERA_CLI { 'biocontainers/aspera-cli:4.14.0--hdfd78af_1' }" input: - tuple val(meta), val(fastq) - val user + id : String + single_end : Boolean + fastq_aspera : String + md5_1 : String + md5_2 : String? + user : String output: - tuple val(meta), path("*fastq.gz"), emit: fastq - tuple val(meta), path("*md5") , emit: md5 - tuple val("${task.process}"), val('aspera_cli'), eval('ascli --version'), topic: versions + id : String = id + fastq_1 : Path = file('*_1.fastq.gz') + fastq_2 : Path? = file('*_2.fastq.gz') + md5_1 : Path = file('*_1.fastq.gz.md5') + md5_2 : Path? = file('*_2.fastq.gz.md5') + + topic: + [process: task.process, name: 'aspera_cli', version: eval('ascli --version')] >> 'versions' script: def args = task.ext.args ?: '' def conda_prefix = ['singularity', 'apptainer'].contains(workflow.containerEngine) ? "export CONDA_PREFIX=/usr/local" : "" - if (meta.single_end) { + def fastq = fastq_aspera.tokenize(';') + if (single_end) { """ $conda_prefix @@ -27,10 +37,10 @@ process ASPERA_CLI { $args \\ -i \$CONDA_PREFIX/etc/aspera/aspera_bypass_dsa.pem \\ ${user}@${fastq[0]} \\ - ${meta.id}.fastq.gz + ${id}.fastq.gz - echo "${meta.md5_1} ${meta.id}.fastq.gz" > ${meta.id}.fastq.gz.md5 - md5sum -c ${meta.id}.fastq.gz.md5 + echo "${md5_1} ${id}.fastq.gz" > ${id}.fastq.gz.md5 + md5sum -c ${id}.fastq.gz.md5 """ } else { """ @@ -40,19 +50,19 @@ process ASPERA_CLI { $args \\ -i \$CONDA_PREFIX/etc/aspera/aspera_bypass_dsa.pem \\ ${user}@${fastq[0]} \\ - ${meta.id}_1.fastq.gz + ${id}_1.fastq.gz - echo "${meta.md5_1} ${meta.id}_1.fastq.gz" > ${meta.id}_1.fastq.gz.md5 - md5sum -c ${meta.id}_1.fastq.gz.md5 + echo "${md5_1} ${id}_1.fastq.gz" > ${id}_1.fastq.gz.md5 + md5sum -c ${id}_1.fastq.gz.md5 ascp \\ $args \\ -i \$CONDA_PREFIX/etc/aspera/aspera_bypass_dsa.pem \\ ${user}@${fastq[1]} \\ - ${meta.id}_2.fastq.gz + ${id}_2.fastq.gz - echo "${meta.md5_2} ${meta.id}_2.fastq.gz" > ${meta.id}_2.fastq.gz.md5 - md5sum -c ${meta.id}_2.fastq.gz.md5 + echo "${md5_2} ${id}_2.fastq.gz" > ${id}_2.fastq.gz.md5 + md5sum -c ${id}_2.fastq.gz.md5 """ } } diff --git a/modules/local/sra_fastq_ftp/main.nf b/modules/local/sra_fastq_ftp/main.nf index ae47c3a6..0dc0a25f 100644 --- a/modules/local/sra_fastq_ftp/main.nf +++ b/modules/local/sra_fastq_ftp/main.nf @@ -1,6 +1,6 @@ process SRA_FASTQ_FTP { - tag "$meta.id" + tag "$id" label 'process_low' label 'error_retry' @@ -10,42 +10,52 @@ process SRA_FASTQ_FTP { 'biocontainers/wget:1.21.4' }" input: - tuple val(meta), val(fastq) + id : String + single_end : Boolean + fastq_1 : String + fastq_2 : String? + md5_1 : String + md5_2 : String? output: - tuple val(meta), path("*fastq.gz"), emit: fastq - tuple val(meta), path("*md5") , emit: md5 - tuple val("${task.process}"), val('wget'), eval("echo \$(wget --version | head -n 1 | sed 's/^GNU Wget //; s/ .*\$//')"), topic: versions + id : String = id + fastq_1 : Path = file('*_1.fastq.gz') + fastq_2 : Path? = file('*_2.fastq.gz') + md5_1 : Path = file('*_1.fastq.gz.md5') + md5_2 : Path? = file('*_2.fastq.gz.md5') + + topic: + [process: task.process, name: 'wget', version: eval("echo \$(wget --version | head -n 1 | sed 's/^GNU Wget //; s/ .*\$//')")] >> 'versions' script: def args = task.ext.args ?: '' - if (meta.single_end) { + if (single_end) { """ wget \\ $args \\ - -O ${meta.id}.fastq.gz \\ - ${fastq[0]} + -O ${id}.fastq.gz \\ + ${fastq_1} - echo "${meta.md5_1} ${meta.id}.fastq.gz" > ${meta.id}.fastq.gz.md5 - md5sum -c ${meta.id}.fastq.gz.md5 + echo "${md5_1} ${id}.fastq.gz" > ${id}.fastq.gz.md5 + md5sum -c ${id}.fastq.gz.md5 """ } else { """ wget \\ $args \\ - -O ${meta.id}_1.fastq.gz \\ - ${fastq[0]} + -O ${id}_1.fastq.gz \\ + ${fastq_1} - echo "${meta.md5_1} ${meta.id}_1.fastq.gz" > ${meta.id}_1.fastq.gz.md5 - md5sum -c ${meta.id}_1.fastq.gz.md5 + echo "${md5_1} ${id}_1.fastq.gz" > ${id}_1.fastq.gz.md5 + md5sum -c ${id}_1.fastq.gz.md5 wget \\ $args \\ - -O ${meta.id}_2.fastq.gz \\ - ${fastq[1]} + -O ${id}_2.fastq.gz \\ + ${fastq_2} - echo "${meta.md5_2} ${meta.id}_2.fastq.gz" > ${meta.id}_2.fastq.gz.md5 - md5sum -c ${meta.id}_2.fastq.gz.md5 + echo "${md5_2} ${id}_2.fastq.gz" > ${id}_2.fastq.gz.md5 + md5sum -c ${id}_2.fastq.gz.md5 """ } } diff --git a/modules/local/sra_ids_to_runinfo/main.nf b/modules/local/sra_ids_to_runinfo/main.nf index 4e7f9e87..f74c5a48 100644 --- a/modules/local/sra_ids_to_runinfo/main.nf +++ b/modules/local/sra_ids_to_runinfo/main.nf @@ -9,12 +9,14 @@ process SRA_IDS_TO_RUNINFO { 'biocontainers/python:3.9--1' }" input: - val id - val fields + id : String + fields : String output: - path "*.tsv" , emit: tsv - tuple val("${task.process}"), val('python'), eval("python --version | sed 's/Python //g'"), topic: versions + file('*.runinfo.tsv') + + topic: + [process: task.process, name: 'python', version: eval("python --version | sed 's/Python //g'")] >> 'versions' script: def metadata_fields = fields ? "--ena_metadata_fields ${fields}" : '' diff --git a/modules/local/sra_runinfo_to_ftp/main.nf b/modules/local/sra_runinfo_to_ftp/main.nf index 40f6a03c..c09834d8 100644 --- a/modules/local/sra_runinfo_to_ftp/main.nf +++ b/modules/local/sra_runinfo_to_ftp/main.nf @@ -7,16 +7,18 @@ process SRA_RUNINFO_TO_FTP { 'biocontainers/python:3.9--1' }" input: - path runinfo + runinfo : Path output: - path "*.tsv" , emit: tsv - tuple val("${task.process}"), val('python'), eval("python --version | sed 's/Python //g'"), topic: versions + file('*.runinfo_ftp.tsv') + + topic: + [process: task.process, name: 'python', version: eval("python --version | sed 's/Python //g'")] >> 'versions' script: """ sra_runinfo_to_ftp.py \\ - ${runinfo.join(',')} \\ - ${runinfo.toString().tokenize(".")[0]}.runinfo_ftp.tsv + ${runinfo} \\ + ${runinfo.baseName.tokenize(".")[0]}.runinfo_ftp.tsv """ } diff --git a/modules/nf-core/custom/sratoolsncbisettings/main.nf b/modules/nf-core/custom/sratoolsncbisettings/main.nf index 7dcb66e9..1876c81a 100644 --- a/modules/nf-core/custom/sratoolsncbisettings/main.nf +++ b/modules/nf-core/custom/sratoolsncbisettings/main.nf @@ -8,14 +8,13 @@ process CUSTOM_SRATOOLSNCBISETTINGS { 'biocontainers/sra-tools:3.0.8--h9f5acd7_0' }" input: - val ids + ids : Bag> output: - path('*.mkfg') , emit: ncbi_settings - tuple val("${task.process}"), val('sratools'), eval("vdb-config --version 2>&1 | grep -Eo '[0-9.]+'"), topic: versions + file('*.mkfg') - when: - task.ext.when == null || task.ext.when + topic: + [process: task.process, name: 'sratools', version: eval("vdb-config --version 2>&1 | grep -Eo '[0-9.]+'")] >> 'versions' shell: config = "/LIBS/GUID = \"${UUID.randomUUID().toString()}\"\\n/libs/cloud/report_instance_identity = \"true\"\\n" diff --git a/modules/nf-core/sratools/fasterqdump/main.nf b/modules/nf-core/sratools/fasterqdump/main.nf index 4fdd07fe..e0e379a5 100644 --- a/modules/nf-core/sratools/fasterqdump/main.nf +++ b/modules/nf-core/sratools/fasterqdump/main.nf @@ -1,5 +1,5 @@ process SRATOOLS_FASTERQDUMP { - tag "$meta.id" + tag "$id" label 'process_medium' conda "${moduleDir}/environment.yml" @@ -8,41 +8,44 @@ process SRATOOLS_FASTERQDUMP { 'quay.io/biocontainers/mulled-v2-5f89fe0cd045cb1d615630b9261a1d17943a9b6a:6a9ff0e76ec016c3d0d27e0c0d362339f2d787e6-0' }" input: - tuple val(meta), path(sra) - path ncbi_settings - path certificate + id : String + single_end : Boolean + sra : Path + ncbi_settings : Path + certificate : Path? output: - tuple val(meta), path('*.fastq.gz'), emit: reads - tuple val("${task.process}"), val('sratools'), eval("fasterq-dump --version 2>&1 | grep -Eo '[0-9.]+'"), topic: versions - tuple val("${task.process}"), val('pigz'), eval("pigz --version 2>&1 | sed 's/pigz //g'"), topic: versions + id : String = id + fastq_1 : Path = files('*.fastq.gz').toSorted()[0] + fastq_2 : Path? = !single_end ? files('*.fastq.gz').toSorted()[1] : null - when: - task.ext.when == null || task.ext.when + topic: + [process: task.process, name: 'sratools', version: eval("fasterq-dump --version 2>&1 | grep -Eo '[0-9.]+'")] >> 'versions' + [process: task.process, name: 'pigz', version: eval("pigz --version 2>&1 | sed 's/pigz //g'")] >> 'versions' script: - def args = task.ext.args ?: '' - def args2 = task.ext.args2 ?: '' - def prefix = task.ext.prefix ?: "${meta.id}" - def outfile = meta.single_end ? "${prefix}.fastq" : prefix + def args_fasterqdump = task.ext.args_fasterqdump ?: '' + def args_pigz = task.ext.args_pigz ?: '' + def prefix = task.ext.prefix ?: "${id}" + def outfile = single_end ? "${prefix}.fastq" : prefix def key_file = '' - if (certificate.toString().endsWith('.jwt')) { + if (certificate.baseName.endsWith('.jwt')) { key_file += " --perm ${certificate}" - } else if (certificate.toString().endsWith('.ngc')) { + } else if (certificate.baseName.endsWith('.ngc')) { key_file += " --ngc ${certificate}" } """ export NCBI_SETTINGS="\$PWD/${ncbi_settings}" fasterq-dump \\ - $args \\ + $args_fasterqdump \\ --threads $task.cpus \\ --outfile $outfile \\ ${key_file} \\ ${sra} pigz \\ - $args2 \\ + $args_pigz \\ --no-name \\ --processes $task.cpus \\ *.fastq diff --git a/modules/nf-core/sratools/fasterqdump/nextflow.config b/modules/nf-core/sratools/fasterqdump/nextflow.config index 7e1649d1..6b14b7ba 100644 --- a/modules/nf-core/sratools/fasterqdump/nextflow.config +++ b/modules/nf-core/sratools/fasterqdump/nextflow.config @@ -1,5 +1,5 @@ process { withName: SRATOOLS_FASTERQDUMP { - ext.args = '--split-files --include-technical' + ext.args_fasterqdump = '--split-files --include-technical' } } \ No newline at end of file diff --git a/modules/nf-core/sratools/fasterqdump/tests/nextflow.config b/modules/nf-core/sratools/fasterqdump/tests/nextflow.config index 23e4100b..e62eb6ec 100644 --- a/modules/nf-core/sratools/fasterqdump/tests/nextflow.config +++ b/modules/nf-core/sratools/fasterqdump/tests/nextflow.config @@ -1,5 +1,5 @@ process { withName: SRATOOLS_FASTERQDUMP { - ext.args = '' + ext.args_fasterqdump = '' } } \ No newline at end of file diff --git a/modules/nf-core/sratools/prefetch/main.nf b/modules/nf-core/sratools/prefetch/main.nf index ac9d1e80..1e2624f2 100644 --- a/modules/nf-core/sratools/prefetch/main.nf +++ b/modules/nf-core/sratools/prefetch/main.nf @@ -8,26 +8,26 @@ process SRATOOLS_PREFETCH { 'biocontainers/sra-tools:3.1.0--h9f5acd7_0' }" input: - tuple val(meta), val(id) - path ncbi_settings - path certificate + id : String + ncbi_settings : Path + certificate : Path? output: - tuple val(meta), path(id, type: 'dir'), emit: sra - tuple val("${task.process}"), val('sratools'), eval("prefetch --version 2>&1 | grep -Eo '[0-9.]+'"), topic: versions + id : String = id + sra : Path = file(id) - when: - task.ext.when == null || task.ext.when + topic: + [process: task.process, name: 'sratools', version: eval("prefetch --version 2>&1 | grep -Eo '[0-9.]+'")] >> 'versions' shell: - args = task.ext.args ?: '' - args2 = task.ext.args2 ?: '5 1 100' // + args_prefetch = task.ext.args_prefetch ?: '' + args_retry = task.ext.args_retry ?: '5 1 100' // if (certificate) { - if (certificate.toString().endsWith('.jwt')) { - args += " --perm ${certificate}" + if (certificate.baseName.endsWith('.jwt')) { + args_prefetch += " --perm ${certificate}" } - else if (certificate.toString().endsWith('.ngc')) { - args += " --ngc ${certificate}" + else if (certificate.baseName.endsWith('.ngc')) { + args_prefetch += " --ngc ${certificate}" } } diff --git a/modules/nf-core/sratools/prefetch/templates/retry_with_backoff.sh b/modules/nf-core/sratools/prefetch/templates/retry_with_backoff.sh index 395593d6..ad5595a7 100755 --- a/modules/nf-core/sratools/prefetch/templates/retry_with_backoff.sh +++ b/modules/nf-core/sratools/prefetch/templates/retry_with_backoff.sh @@ -42,9 +42,9 @@ retry_with_backoff() { export NCBI_SETTINGS="$PWD/!{ncbi_settings}" -retry_with_backoff !{args2} \ +retry_with_backoff !{args_retry} \ prefetch \ - !{args} \ + !{args_prefetch} \ !{id} # check file integrity using vdb-validate or (when archive contains no checksums) md5sum diff --git a/nextflow.config b/nextflow.config index a09178b3..47193e36 100644 --- a/nextflow.config +++ b/nextflow.config @@ -6,21 +6,10 @@ ---------------------------------------------------------------------------------------- */ -// Global default params, used in configs +// Config params params { - // Input options - input = null - nf_core_pipeline = null - nf_core_rnaseq_strandedness = 'auto' - ena_metadata_fields = null - sample_mapping_fields = 'experiment_accession,run_accession,sample_accession,experiment_alias,run_alias,sample_alias,experiment_title,sample_title,sample_description' - download_method = 'ftp' - skip_fastq_download = false - dbgap_key = null - // Boilerplate options - outdir = null publish_dir_mode = 'copy' email = null email_on_fail = null @@ -34,7 +23,7 @@ params { modules_testdata_base_path = 's3://ngi-igenomes/testdata/nf-core/modules/' pipelines_testdata_base_path = 's3://ngi-igenomes/testdata/nf-core/pipelines/fetchngs/1.15.0/' - // Config options + // Institutional config options config_profile_name = null config_profile_description = null @@ -49,6 +38,7 @@ params { // Deprecated options // See: https://github.com/nf-core/fetchngs/pull/279/files#r1494459480 force_sratools_download = false + outdir = null } // Load base.config by default for all pipelines diff --git a/nextflow_output.json b/nextflow_output.json new file mode 100644 index 00000000..f4e188c3 --- /dev/null +++ b/nextflow_output.json @@ -0,0 +1,53 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://raw.githubusercontent.com/nf-core/fetchngs/master/nextflow_output.json", + "title": "nf-core/fetchngs pipeline outputs", + "description": "Pipeline to fetch metadata and raw FastQ files from public databases", + "type": "object", + "properties": { + "samples": { + "description": "List of FASTQ samples with optional MD5 checksums", + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "fastq_1": { + "type": "string", + "format": "file-path" + }, + "fastq_2": { + "type": "string", + "format": "file-path" + }, + "md5_1": { + "type": "string", + "format": "file-path" + }, + "md5_2": { + "type": "string", + "format": "file-path" + } + }, + "required": ["id", "fastq_1"] + }, + "path": "samplesheet/samplesheet.json" + }, + "runinfo_ftp": { + "description": "List of download links for the given sample ids", + "type": "array", + "items": { + "type": "string", + "format": "file-path" + }, + "path": "metadata" + }, + "versions": { + "description": "Manifest of tool versions used by the pipeline for MultiQC", + "type": "object", + "path": "nf_core_fetchngs_software_mqc_versions.yml" + } + } +} diff --git a/nextflow_schema.json b/nextflow_schema.json index ba7c196b..4a395572 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -5,12 +5,12 @@ "description": "Pipeline to fetch metadata and raw FastQ files from public databases", "type": "object", "$defs": { - "input_output_options": { - "title": "Input/output options", + "input_options": { + "title": "Input options", "type": "object", "fa_icon": "fas fa-terminal", "description": "Define where the pipeline should find input data and save output data.", - "required": ["input", "outdir"], + "required": ["input"], "properties": { "input": { "type": "string", @@ -28,33 +28,6 @@ "description": "Comma-separated list of ENA metadata fields to fetch before downloading data.", "help_text": "The default list of fields used by the pipeline can be found at the top of the [`bin/sra_ids_to_runinfo.py`](https://github.com/nf-core/fetchngs/blob/master/bin/sra_ids_to_runinfo.py) script within the pipeline repo. This pipeline requires a minimal set of fields to download FastQ files i.e. `'run_accession,experiment_accession,library_layout,fastq_ftp,fastq_md5'`. Full list of accepted metadata fields can be obtained from the [ENA API](https://www.ebi.ac.uk/ena/portal/api/returnFields?dataPortal=ena&format=tsv&result=read_run)." }, - "sample_mapping_fields": { - "type": "string", - "fa_icon": "fas fa-columns", - "description": "Comma-separated list of ENA metadata fields used to create a separate 'id_mappings.csv' and 'multiqc_config.yml' with selected fields that can be used to rename samples in general and in MultiQC.", - "default": "experiment_accession,run_accession,sample_accession,experiment_alias,run_alias,sample_alias,experiment_title,sample_title,sample_description" - }, - "nf_core_pipeline": { - "type": "string", - "fa_icon": "fab fa-apple", - "description": "Name of supported nf-core pipeline e.g. 'rnaseq'. A samplesheet for direct use with the pipeline will be created with the appropriate columns.", - "enum": ["rnaseq", "atacseq", "viralrecon", "taxprofiler"] - }, - "nf_core_rnaseq_strandedness": { - "type": "string", - "fa_icon": "fas fa-dna", - "description": "Value for 'strandedness' entry added to samplesheet created when using '--nf_core_pipeline rnaseq'.", - "help_text": "The default is 'auto' which can be used with nf-core/rnaseq v3.10 onwards to auto-detect strandedness during the pipeline execution.", - "default": "auto" - }, - "download_method": { - "type": "string", - "default": "ftp", - "fa_icon": "fas fa-download", - "enum": ["aspera", "ftp", "sratools"], - "description": "Method to download FastQ files. Available options are 'aspera', 'ftp' or 'sratools'. Default is 'ftp'.", - "help_text": "FTP and Aspera CLI download FastQ files directly from the ENA FTP whereas sratools uses sra-tools to download *.sra files and convert to FastQ." - }, "skip_fastq_download": { "type": "boolean", "fa_icon": "fas fa-fast-forward", @@ -67,12 +40,6 @@ "format": "file-path", "description": "dbGaP repository key." }, - "outdir": { - "type": "string", - "format": "directory-path", - "description": "The output directory where the results will be saved. You have to use absolute paths to storage on Cloud infrastructure.", - "fa_icon": "fas fa-folder-open" - }, "email": { "type": "string", "description": "Email address for completion summary.", @@ -215,13 +182,19 @@ "description": "This parameter has been deprecated. Please use '--download_method sratools' instead.", "enum": [false], "hidden": true + }, + "outdir": { + "type": "string", + "format": "directory-path", + "description": "This parameter has been deprecated. Please use '-output-dir' instead.", + "fa_icon": "fas fa-folder-open" } } } }, "allOf": [ { - "$ref": "#/$defs/input_output_options" + "$ref": "#/$defs/input_options" }, { "$ref": "#/$defs/institutional_config_options" diff --git a/subworkflows/local/utils_nfcore_fetchngs_pipeline/main.nf b/subworkflows/local/utils_nfcore_fetchngs_pipeline/main.nf index 4a5f5f40..0fdec902 100644 --- a/subworkflows/local/utils_nfcore_fetchngs_pipeline/main.nf +++ b/subworkflows/local/utils_nfcore_fetchngs_pipeline/main.nf @@ -26,13 +26,13 @@ include { UTILS_NFSCHEMA_PLUGIN } from '../../nf-core/utils_nfschema_plugin' workflow PIPELINE_INITIALISATION { take: - version // boolean: Display version and exit - validate_params // boolean: Boolean whether to validate parameters against the schema at runtime - monochrome_logs // boolean: Do not use coloured log outputs - nextflow_cli_args // array: List of positional nextflow CLI args - outdir // string: The output directory where the results will be saved - input // string: File containing SRA/ENA/GEO/DDBJ identifiers one per line to download their associated metadata and FastQ files - ena_metadata_fields // string: Comma-separated list of ENA metadata fields to fetch before downloading data + version : boolean // Display version and exit + validate_params : boolean // Validate parameters against the schema at runtime + monochrome_logs : boolean // Do not use coloured log outputs + nextflow_cli_args : List // List of positional nextflow CLI args + outdir : String // The output directory where the results will be saved + input : Path // File containing SRA/ENA/GEO/DDBJ identifiers one per line to download their associated metadata and FastQ files + ena_metadata_fields : String // Comma-separated list of ENA metadata fields to fetch before downloading data main: @@ -65,23 +65,19 @@ workflow PIPELINE_INITIALISATION { // // Auto-detect input id type // - ch_input = file(input) - if (isSraId(ch_input)) { - sraCheckENAMetadataFields(ena_metadata_fields) - } else { + ids = input + .splitCsv(header:false, sep:'', strip:true) + .collect { row -> row[0] } + .toUnique() + if (!isSraId(ids)) { error('Ids provided via --input not recognised please make sure they are either SRA / ENA / GEO / DDBJ ids!') } - - // Read in ids from --input file - Channel - .from(ch_input) - .splitCsv(header:false, sep:'', strip:true) - .map { it[0] } - .unique() - .set { ch_ids } + if (!sraCheckENAMetadataFields(ena_metadata_fields)) { + error("Invalid option: '${ena_metadata_fields}'. Minimally required fields for '--ena_metadata_fields': '${valid_ena_metadata_fields.join(',')}'") + } emit: - ids = ch_ids + ids : List } /* @@ -93,12 +89,12 @@ workflow PIPELINE_INITIALISATION { workflow PIPELINE_COMPLETION { take: - email // string: email address - email_on_fail // string: email address sent on pipeline failure - plaintext_email // boolean: Send plain-text email instead of HTML - outdir // path: Path to output directory where results will be published - monochrome_logs // boolean: Disable ANSI colour codes in log output - hook_url // string: hook URL for notifications + email : String // email address + email_on_fail : String // email address sent on pipeline failure + plaintext_email : boolean // Send plain-text email instead of HTML + outdir : Path // Path to output directory where results will be published + monochrome_logs : boolean // Disable ANSI colour codes in log output + hook_url : String // hook URL for notifications main: summary_params = paramsSummaryMap(workflow, parameters_schema: "nextflow_schema.json") @@ -140,39 +136,29 @@ workflow PIPELINE_COMPLETION { // // Check if input ids are from the SRA // -def isSraId(input) { - def is_sra = false +def isSraId(ids: List) -> boolean { def total_ids = 0 def no_match_ids = [] def pattern = /^(((SR|ER|DR)[APRSX])|(SAM(N|EA|D))|(PRJ(NA|EB|DB))|(GS[EM]))(\d+)$/ - input.eachLine { line -> + ids.each { id -> total_ids += 1 - if (!(line =~ pattern)) { - no_match_ids << line + if (!(id =~ pattern)) { + no_match_ids << id } } def num_match = total_ids - no_match_ids.size() - if (num_match > 0) { - if (num_match == total_ids) { - is_sra = true - } else { - error("Mixture of ids provided via --input: ${no_match_ids.join(', ')}\nPlease provide either SRA / ENA / GEO / DDBJ ids!") - } - } - return is_sra + return num_match > 0 && num_match == total_ids } // // Check and validate parameters // -def sraCheckENAMetadataFields(ena_metadata_fields) { +def sraCheckENAMetadataFields(ena_metadata_fields: List) -> boolean { // Check minimal ENA fields are provided to download FastQ files def valid_ena_metadata_fields = ['run_accession', 'experiment_accession', 'library_layout', 'fastq_ftp', 'fastq_md5'] def actual_ena_metadata_fields = ena_metadata_fields ? ena_metadata_fields.split(',').collect{ it.trim().toLowerCase() } : valid_ena_metadata_fields - if (!actual_ena_metadata_fields.containsAll(valid_ena_metadata_fields)) { - error("Invalid option: '${ena_metadata_fields}'. Minimally required fields for '--ena_metadata_fields': '${valid_ena_metadata_fields.join(',')}'") - } + return actual_ena_metadata_fields.containsAll(valid_ena_metadata_fields) } // // Print a warning after pipeline has completed diff --git a/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf b/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf index eb5ec390..df1bb857 100644 --- a/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf +++ b/subworkflows/nf-core/fastq_download_prefetch_fasterqdump_sratools/main.nf @@ -7,27 +7,32 @@ include { SRATOOLS_FASTERQDUMP } from '../../../modules/nf-core/sratools/ // workflow FASTQ_DOWNLOAD_PREFETCH_FASTERQDUMP_SRATOOLS { take: - ch_sra_ids // channel: [ val(meta), val(id) ] - ch_dbgap_key // channel: [ path(dbgap_key) ] + sra_metadata : Channel + dbgap_key : Path? main: - // // Detect existing NCBI user settings or create new ones. // - CUSTOM_SRATOOLSNCBISETTINGS ( ch_sra_ids.collect() ) - ch_ncbi_settings = CUSTOM_SRATOOLSNCBISETTINGS.out.ncbi_settings + ncbi_settings = CUSTOM_SRATOOLSNCBISETTINGS( sra_metadata.collect() ) // // Prefetch sequencing reads in SRA format. // - SRATOOLS_PREFETCH ( ch_sra_ids, ch_ncbi_settings, ch_dbgap_key ) + sra = sra_metadata.map(SRATOOLS_PREFETCH, ncbi_settings: ncbi_settings, certificate: dbgap_key) // // Convert the SRA format into one or more compressed FASTQ files. // - SRATOOLS_FASTERQDUMP ( SRATOOLS_PREFETCH.out.sra, ch_ncbi_settings, ch_dbgap_key ) + samples = sra_metadata + .join(sra, 'id') + .map(SRATOOLS_FASTERQDUMP, ncbi_settings: ncbi_settings, certificate: dbgap_key) emit: - reads = SRATOOLS_FASTERQDUMP.out.reads // channel: [ val(meta), [ reads ] ] + samples +} + +record SraMetadata { + id: String + single_end: Boolean } diff --git a/subworkflows/nf-core/utils_nextflow_pipeline/main.nf b/subworkflows/nf-core/utils_nextflow_pipeline/main.nf index 0fcbf7b3..06399014 100644 --- a/subworkflows/nf-core/utils_nextflow_pipeline/main.nf +++ b/subworkflows/nf-core/utils_nextflow_pipeline/main.nf @@ -10,10 +10,10 @@ workflow UTILS_NEXTFLOW_PIPELINE { take: - print_version // boolean: print version - dump_parameters // boolean: dump parameters - outdir // path: base directory used to publish pipeline results - check_conda_channels // boolean: check conda channels + print_version : boolean // print version + dump_parameters : boolean // dump parameters + outdir : String // base directory used to publish pipeline results + check_conda_channels: boolean // check conda channels main: @@ -52,8 +52,8 @@ workflow UTILS_NEXTFLOW_PIPELINE { // // Generate version string // -def getWorkflowVersion() { - def version_string = "" as String +def getWorkflowVersion() -> String { + def version_string = "" if (workflow.manifest.version) { def prefix_v = workflow.manifest.version[0] != 'v' ? 'v' : '' version_string += "${prefix_v}${workflow.manifest.version}" @@ -70,7 +70,7 @@ def getWorkflowVersion() { // // Dump pipeline parameters to a JSON file // -def dumpParametersToJSON(outdir) { +def dumpParametersToJSON(outdir: String) { def timestamp = new java.util.Date().format('yyyy-MM-dd_HH-mm-ss') def filename = "params_${timestamp}.json" def temp_pf = new File(workflow.launchDir.toString(), ".${filename}") diff --git a/subworkflows/nf-core/utils_nfcore_pipeline/main.nf b/subworkflows/nf-core/utils_nfcore_pipeline/main.nf index 6a4d6ca1..6bb0cb32 100644 --- a/subworkflows/nf-core/utils_nfcore_pipeline/main.nf +++ b/subworkflows/nf-core/utils_nfcore_pipeline/main.nf @@ -10,7 +10,7 @@ workflow UTILS_NFCORE_PIPELINE { take: - nextflow_cli_args + nextflow_cli_args : List main: valid_config = checkConfigProvided() @@ -20,6 +20,38 @@ workflow UTILS_NFCORE_PIPELINE { valid_config } +// +// Get channel of software versions used in pipeline +// +workflow SOFTWARE_VERSIONS { + main: + processVersions = channel.topic('versions') + workflowVersions = channel.of( + [process: 'Workflow', name: workflow.manifest.name, version: getWorkflowVersion()], + [process: 'Workflow', name: 'Nextflow', version: workflow.nextflow.version] + ) + + emit: + processVersions + .mix(workflowVersions) + .gather { t: ToolVersion -> + [key: t.process, value: [name: t.name, version: t.version]] + } + .map { g -> + def simpleName = g.process.tokenize(':').last() + def toolsMap = g.tools.toUnique().inject([:]) { acc, tool -> + acc + [ (tool.name): tool.version ] + } + return [ (simpleName): toolsMap ] + } +} + +record ToolVersion { + process : String + name : String + version : String +} + /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ FUNCTIONS @@ -29,21 +61,20 @@ workflow UTILS_NFCORE_PIPELINE { // // Warn if a -profile or Nextflow config has not been provided to run the pipeline // -def checkConfigProvided() { - def valid_config = true as Boolean +def checkConfigProvided() -> Boolean { if (workflow.profile == 'standard' && workflow.configFiles.size() <= 1) { log.warn( "[${workflow.manifest.name}] You are attempting to run the pipeline without any custom configuration!\n\n" + "This will be dependent on your local compute environment but can be achieved via one or more of the following:\n" + " (1) Using an existing pipeline profile e.g. `-profile docker` or `-profile singularity`\n" + " (2) Using an existing nf-core/configs for your Institution e.g. `-profile crick` or `-profile uppmax`\n" + " (3) Using your own local custom config e.g. `-c /path/to/your/custom.config`\n\n" + "Please refer to the quick start section and usage docs for the pipeline.\n " ) - valid_config = false + return false } - return valid_config + return true } // // Exit pipeline if --profile contains spaces // -def checkProfileProvided(nextflow_cli_args) { +def checkProfileProvided(nextflow_cli_args: List) { if (workflow.profile.endsWith(',')) { error( "The `-profile` option cannot end with a trailing comma, please remove it and re-run the pipeline!\n" + "HINT: A common mistake is to provide multiple values separated by spaces e.g. `-profile test, docker`.\n" @@ -59,7 +90,7 @@ def checkProfileProvided(nextflow_cli_args) { // // Citation string for pipeline // -def workflowCitation() { +def workflowCitation() -> String { def temp_doi_ref = "" def manifest_doi = workflow.manifest.doi.tokenize(",") // Handling multiple DOIs @@ -74,8 +105,8 @@ def workflowCitation() { // // Generate workflow version string // -def getWorkflowVersion() { - def version_string = "" as String +def getWorkflowVersion() -> String { + def version_string = "" if (workflow.manifest.version) { def prefix_v = workflow.manifest.version[0] != 'v' ? 'v' : '' version_string += "${prefix_v}${workflow.manifest.version}" @@ -89,36 +120,10 @@ def getWorkflowVersion() { return version_string } -// -// Get workflow version for pipeline -// -def workflowVersionToYAML() { - return Channel.of( - [ 'Workflow', workflow.manifest.name, getWorkflowVersion() ], - [ 'Workflow', 'Nextflow', workflow.nextflow.version ] - ) -} - -// -// Get channel of software versions used in pipeline in YAML format -// -def softwareVersionsToYAML() { - return Channel.topic('versions') - .unique() - .mix(workflowVersionToYAML()) - .map { process, name, version -> - [ - (process.tokenize(':').last()): [ - (name): version - ] - ] - } -} - // // Get workflow summary for MultiQC // -def paramsSummaryMultiqc(summary_params) { +def paramsSummaryMultiqc(summary_params: Map) -> String { def summary_section = '' summary_params .keySet() @@ -152,8 +157,8 @@ def paramsSummaryMultiqc(summary_params) { // // nf-core logo // -def nfCoreLogo(monochrome_logs=true) { - def colors = logColours(monochrome_logs) as Map +def nfCoreLogo(monochrome_logs: boolean = true) -> String { + def colors = logColours(monochrome_logs) String.format( """\n ${dashedLine(monochrome_logs)} @@ -171,16 +176,16 @@ def nfCoreLogo(monochrome_logs=true) { // // Return dashed line // -def dashedLine(monochrome_logs=true) { - def colors = logColours(monochrome_logs) as Map +def dashedLine(monochrome_logs: boolean = true) -> String { + def colors = logColours(monochrome_logs) return "-${colors.dim}----------------------------------------------------${colors.reset}-" } // // ANSII colours used for terminal logging // -def logColours(monochrome_logs=true) { - def colorcodes = [:] as Map +def logColours(monochrome_logs: boolean = true) -> Map { + def colorcodes = [:] // Reset / Meta colorcodes['reset'] = monochrome_logs ? '' : "\033[0m" @@ -244,34 +249,17 @@ def logColours(monochrome_logs=true) { return colorcodes } -// -// Attach the multiqc report to email -// -def attachMultiqcReport(multiqc_report) { - def mqc_report = null - try { - if (workflow.success) { - mqc_report = multiqc_report.getVal() - if (mqc_report.getClass() == ArrayList && mqc_report.size() >= 1) { - if (mqc_report.size() > 1) { - log.warn("[${workflow.manifest.name}] Found multiple reports from process 'MULTIQC', will use only one") - } - mqc_report = mqc_report[0] - } - } - } - catch (Exception all) { - if (multiqc_report) { - log.warn("[${workflow.manifest.name}] Could not attach MultiQC report to summary email") - } - } - return mqc_report -} - // // Construct and send completion email // -def completionEmail(summary_params, email, email_on_fail, plaintext_email, outdir, monochrome_logs=true, multiqc_report=null) { +def completionEmail( + summary_params: Map, + email: String, + email_on_fail: String, + plaintext_email: boolean, + outdir: String, + monochrome_logs: boolean = true, + multiqc_report: Path = null) { // Set up the e-mail variables def subject = "[${workflow.manifest.name}] Successful: ${workflow.runName}" @@ -319,13 +307,14 @@ def completionEmail(summary_params, email, email_on_fail, plaintext_email, outdi email_fields['summary'] = summary << misc_fields // On success try attach the multiqc report - def mqc_report = attachMultiqcReport(multiqc_report) + def mqc_report = workflow.success + ? multiqc_report + : null // Check if we are only sending emails on failure - def email_address = email - if (!email && email_on_fail && !workflow.success) { - email_address = email_on_fail - } + def email_address = !email && email_on_fail && !workflow.success + ? email_on_fail + : email // Render the TXT template def engine = new groovy.text.GStringTemplateEngine() @@ -346,7 +335,7 @@ def completionEmail(summary_params, email, email_on_fail, plaintext_email, outdi def sendmail_html = sendmail_template.toString() // Send the HTML e-mail - def colors = logColours(monochrome_logs) as Map + def colors = logColours(monochrome_logs) if (email_address) { try { if (plaintext_email) { @@ -381,8 +370,8 @@ new org.codehaus.groovy.GroovyException('Send plaintext e-mail, not HTML') // // Print pipeline summary on completion // -def completionSummary(monochrome_logs=true) { - def colors = logColours(monochrome_logs) as Map +def completionSummary(monochrome_logs: boolean = true) { + def colors = logColours(monochrome_logs) if (workflow.success) { if (workflow.stats.ignoredCount == 0) { log.info("-${colors.purple}[${workflow.manifest.name}]${colors.green} Pipeline completed successfully${colors.reset}-") @@ -399,7 +388,7 @@ def completionSummary(monochrome_logs=true) { // // Construct and send a notification to a web server as JSON e.g. Microsoft Teams and Slack // -def imNotification(summary_params, hook_url) { +def imNotification(summary_params: Map, hook_url: String) { def summary = [:] summary_params .keySet() diff --git a/workflows/sra/main.nf b/workflows/sra/main.nf index 1487b98a..95553ff7 100644 --- a/workflows/sra/main.nf +++ b/workflows/sra/main.nf @@ -26,118 +26,101 @@ include { FASTQ_DOWNLOAD_PREFETCH_FASTERQDUMP_SRATOOLS } from '../../subworkflow workflow SRA { take: - ids // channel: [ ids ] + ids : Channel + params : SraParams main: - // // MODULE: Get SRA run information for public database ids // - SRA_IDS_TO_RUNINFO ( - ids, - params.ena_metadata_fields ?: '' - ) + runinfo = ids.map(SRA_IDS_TO_RUNINFO, fields: params.ena_metadata_fields) // // MODULE: Parse SRA run information, create file containing FTP links and read into workflow as [ meta, [reads] ] // - SRA_RUNINFO_TO_FTP ( - SRA_IDS_TO_RUNINFO.out.tsv + runinfo_ftp = runinfo.map(SRA_RUNINFO_TO_FTP) + + sra_metadata = runinfo_ftp + .flatMap { tsv -> tsv.splitCsv(header:true, sep:'\t').toUnique() } + .map { meta -> meta } + + // + // MODULE: If FTP link is provided in run information then download FastQ directly via FTP and validate with md5sums + // + ftp_samples = sra_metadata + .filter { meta -> !params.skip_fastq_download && meta instanceof FtpMetadata } + .map(SRA_FASTQ_FTP) + + // + // SUBWORKFLOW: Download sequencing reads without FTP links using sra-tools. + // + sratools_metadata = sra_metadata + .filter { meta -> !params.skip_fastq_download && meta instanceof SratoolsMetadata } + + sratools_samples = FASTQ_DOWNLOAD_PREFETCH_FASTERQDUMP_SRATOOLS ( + sratools_metadata, + params.dbgap_key ) - SRA_RUNINFO_TO_FTP - .out - .tsv - .splitCsv(header:true, sep:'\t') - .map { - meta -> - def meta_clone = meta.clone() - meta_clone.single_end = meta_clone.single_end.toBoolean() - return meta_clone - } - .unique() - .set { ch_sra_metadata } - - if (!params.skip_fastq_download) { - - ch_sra_metadata - .branch { - meta -> - def download_method = 'ftp' - // meta.fastq_aspera is a metadata string with ENA fasp links supported by Aspera - // For single-end: 'fasp.sra.ebi.ac.uk:/vol1/fastq/ERR116/006/ERR1160846/ERR1160846.fastq.gz' - // For paired-end: 'fasp.sra.ebi.ac.uk:/vol1/fastq/SRR130/020/SRR13055520/SRR13055520_1.fastq.gz;fasp.sra.ebi.ac.uk:/vol1/fastq/SRR130/020/SRR13055520/SRR13055520_2.fastq.gz' - if (meta.fastq_aspera && params.download_method == 'aspera') { - download_method = 'aspera' - } - if ((!meta.fastq_aspera && !meta.fastq_1) || params.download_method == 'sratools') { - download_method = 'sratools' - } - - aspera: download_method == 'aspera' - return [ meta, meta.fastq_aspera.tokenize(';').take(2) ] - ftp: download_method == 'ftp' - return [ meta, [ meta.fastq_1, meta.fastq_2 ] ] - sratools: download_method == 'sratools' - return [ meta, meta.run_accession ] - } - .set { ch_sra_reads } - - // - // MODULE: If FTP link is provided in run information then download FastQ directly via FTP and validate with md5sums - // - SRA_FASTQ_FTP ( - ch_sra_reads.ftp - ) - - // - // SUBWORKFLOW: Download sequencing reads without FTP links using sra-tools. - // - FASTQ_DOWNLOAD_PREFETCH_FASTERQDUMP_SRATOOLS ( - ch_sra_reads.sratools, - params.dbgap_key ? file(params.dbgap_key, checkIfExists: true) : [] - ) - - // - // MODULE: If Aspera link is provided in run information then download FastQ directly via Aspera CLI and validate with md5sums - // - ASPERA_CLI ( - ch_sra_reads.aspera, - 'era-fasp' - ) - - // Isolate FASTQ channel which will be added to emit block - SRA_FASTQ_FTP.out.fastq - .mix(FASTQ_DOWNLOAD_PREFETCH_FASTERQDUMP_SRATOOLS.out.reads) - .mix(ASPERA_CLI.out.fastq) - .set { ch_fastq } - - SRA_FASTQ_FTP.out.md5 - .mix(ASPERA_CLI.out.md5) - .set { ch_md5 } - - ch_fastq - .join(ch_md5, remainder: true) - .map { - meta, fastq, md5 -> - fastq = fastq instanceof List ? fastq.flatten() : [ fastq ] - md5 = md5 instanceof List ? md5.flatten() : [ md5 ] - meta + [ - fastq_1: fastq[0], - fastq_2: fastq[1] && !meta.single_end ? fastq[1] : null, - md5_1: md5[0], - md5_2: md5[1] && !meta.single_end ? md5[1] : null, - ] - } - .set { ch_samples } - } - else { - ch_samples = Channel.empty() - } + // + // MODULE: If Aspera link is provided in run information then download FastQ directly via Aspera CLI and validate with md5sums + // + aspera_samples = sra_metadata + .filter { meta -> !params.skip_fastq_download && meta instanceof AsperaMetadata } + .map(ASPERA_CLI, user: 'era-fasp') + + samples = ftp_samples + .mix(sratools_samples) + .mix(aspera_samples) emit: - samples = ch_samples - metadata = SRA_RUNINFO_TO_FTP.out.tsv + samples : Channel = samples + runinfo_ftp : Channel = runinfo_ftp +} + +/* +======================================================================================== + TYPES +======================================================================================== +*/ + +record SraParams { + ena_metadata_fields : String + skip_fastq_download : boolean + dbgap_key : Path? +} + +// fastq_aspera is a metadata string with ENA fasp links supported by Aspera + // For single-end: 'fasp.sra.ebi.ac.uk:/vol1/fastq/ERR116/006/ERR1160846/ERR1160846.fastq.gz' + // For paired-end: 'fasp.sra.ebi.ac.uk:/vol1/fastq/SRR130/020/SRR13055520/SRR13055520_1.fastq.gz;fasp.sra.ebi.ac.uk:/vol1/fastq/SRR130/020/SRR13055520/SRR13055520_2.fastq.gz' +record AsperaMetadata { + id : String + single_end : Boolean + fastq_aspera : String + md5_1 : String + md5_2 : String? +} + +record FtpMetadata { + id : String + single_end : Boolean + fastq_1 : String + fastq_2 : String? + md5_1 : String + md5_2 : String? +} + +record SratoolsMetadata { + id : String + single_end : Boolean +} + +record Sample { + id : String + fastq_1 : Path + fastq_2 : Path? + md5_1 : Path? + md5_2 : Path? } /*