diff --git a/docs/executor.md b/docs/executor.md index 9e841fd6fe..0b7d708e56 100644 --- a/docs/executor.md +++ b/docs/executor.md @@ -227,6 +227,7 @@ The `local` executor is useful for developing and testing a pipeline script on y Resource requests and other job characteristics can be controlled via the following process directives: +- {ref}`process-accelerator` - {ref}`process-cpus` - {ref}`process-memory` - {ref}`process-time` @@ -241,6 +242,25 @@ The local executor supports two types of tasks: - Script tasks (processes with a `script` or `shell` block) - executed via a Bash wrapper - Native tasks (processes with an `exec` block) - executed directly in the JVM. +(local-accelerators)= + +### Accelerators + +:::{versionadded} 25.10.0 +::: + +The local executor can use the `accelerator` directive to allocate accelerators, such as GPUs. To use accelerators, set the corresponding environment variable: + +- `CUDA_VISIBLE_DEVICES` for [NVIDIA CUDA](https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#cuda-environment-variables) applications + +- `HIP_VISIBLE_DEVICES` for [HIP](https://rocm.docs.amd.com/projects/HIP/en/docs-develop/reference/env_variables.html) applications + +- `ROCR_VISIBLE_DEVICES` for [AMD ROCm](https://rocm.docs.amd.com/en/latest/conceptual/gpu-isolation.html) applications + +Set the environment variable to a comma-separated list of device IDs for Nextflow to access. Nextflow uses this environment variable to allocate accelerators for tasks that request them. + +For example, to use all GPUs on a node with four NVIDIA GPUs, set `CUDA_VISIBLE_DEVICES` to `0,1,2,3`. If four tasks each request one GPU, they will be executed with `CUDA_VISIBLE_DEVICES` set to `0`, `1`, `2`, and `3`, respectively. + (lsf-executor)= ## LSF diff --git a/docs/migrations/25-10.md b/docs/migrations/25-10.md index d25eec19c6..03f8e181fe 100644 --- a/docs/migrations/25-10.md +++ b/docs/migrations/25-10.md @@ -94,6 +94,12 @@ export NXF_PLUGINS_REGISTRY_URL="https://raw.githubusercontent.com/nextflow-io/p Plugin developers will not be able to submit PRs to the legacy plugin index once the plugin registry is generally available. Plugins should be updated to publish to the Nextflow plugin registry using the {ref}`Nextflow Gradle plugin ` instead. See {ref}`migrate-plugin-registry-page` for details. ::: +

GPU scheduling for local executor

+ +The local executor can now schedule GPUs using the `accelerator` directive. This feature is useful when running Nextflow on a single machine with multiple GPUs. + +See {ref}`local-accelerators` for details. +

New syntax for workflow handlers

The workflow `onComplete` and `onError` handlers were previously defined by calling `workflow.onComplete` and `workflow.onError` in the pipeline script. You can now define handlers as `onComplete` and `onError` sections in an entry workflow: diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy index 0bad8bb9c7..7fb379c149 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy @@ -80,6 +80,10 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { private volatile TaskResult result + String acceleratorEnv + + List acceleratorIds + LocalTaskHandler(TaskRun task, LocalExecutor executor) { super(task) // create the task handler @@ -142,11 +146,13 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { final workDir = task.workDir.toFile() final logFile = new File(workDir, TaskRun.CMD_LOG) - return new ProcessBuilder() + final pb = new ProcessBuilder() .redirectErrorStream(true) .redirectOutput(logFile) .directory(workDir) .command(cmd) + prepareAccelerators(pb) + return pb } protected ProcessBuilder fusionProcessBuilder() { @@ -162,10 +168,18 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { final logPath = Files.createTempFile('nf-task','.log') - return new ProcessBuilder() + final pb = new ProcessBuilder() .redirectErrorStream(true) .redirectOutput(logPath.toFile()) .command(List.of('sh','-c', cmd)) + prepareAccelerators(pb) + return pb + } + + protected void prepareAccelerators(ProcessBuilder pb) { + if( !acceleratorEnv ) + return + pb.environment().put(acceleratorEnv, acceleratorIds.join(',')) } protected ProcessBuilder createLaunchProcessBuilder() { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/AcceleratorTracker.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/AcceleratorTracker.groovy new file mode 100644 index 0000000000..34d10e3ca4 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/AcceleratorTracker.groovy @@ -0,0 +1,84 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor + +import groovy.transform.CompileStatic +import nextflow.SysEnv +import nextflow.util.TrackingSemaphore + +/** + * Specialized semaphore that keeps track of accelerators by + * id. The id can be an integer or a UUID. + * + * @author Ben Sherman + */ +@CompileStatic +class AcceleratorTracker { + + private static final List DEVICE_ENV_NAMES = [ + 'CUDA_VISIBLE_DEVICES', + 'HIP_VISIBLE_DEVICES', + 'ROCR_VISIBLE_DEVICES' + ] + + static AcceleratorTracker create() { + return create(SysEnv.get()) + } + + static AcceleratorTracker create(Map env) { + return DEVICE_ENV_NAMES.stream() + .filter(name -> env.containsKey(name)) + .map((name) -> { + final ids = env.get(name).tokenize(',') + return new AcceleratorTracker(name, ids) + }) + .findFirst().orElse(new AcceleratorTracker()) + } + + private final String name + private final TrackingSemaphore semaphore + + private AcceleratorTracker(String name, List ids) { + this.name = name + this.semaphore = new TrackingSemaphore(ids) + } + + private AcceleratorTracker() { + this(null, []) + } + + String name() { + return name + } + + int total() { + return semaphore.totalPermits() + } + + int available() { + return semaphore.availablePermits() + } + + List acquire(int permits) { + return semaphore.acquire(permits) + } + + void release(List ids) { + semaphore.release(ids) + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy index c569b38a8f..8fdac7fed5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy @@ -24,6 +24,7 @@ import groovy.util.logging.Slf4j import nextflow.Session import nextflow.executor.ExecutorConfig import nextflow.exception.ProcessUnrecoverableException +import nextflow.executor.local.LocalTaskHandler import nextflow.util.Duration import nextflow.util.MemoryUnit @@ -59,6 +60,11 @@ class LocalPollingMonitor extends TaskPollingMonitor { */ private final long maxMemory + /** + * Tracks the total and available accelerators in the system + */ + private AcceleratorTracker acceleratorTracker + /** * Create the task polling monitor with the provided named parameters object. *

@@ -76,6 +82,7 @@ class LocalPollingMonitor extends TaskPollingMonitor { super(params) this.availCpus = maxCpus = params.cpus as int this.availMemory = maxMemory = params.memory as long + this.acceleratorTracker = AcceleratorTracker.create() assert availCpus>0, "Local avail `cpus` attribute cannot be zero" assert availMemory>0, "Local avail `memory` attribute cannot zero" } @@ -154,6 +161,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { handler.task.getConfig()?.getMemory()?.toBytes() ?: 1L } + /** + * @param handler + * A {@link TaskHandler} instance + * @return + * The number of accelerators requested to execute the specified task + */ + private static int accelerators(TaskHandler handler) { + handler.task.getConfig()?.getAccelerator()?.getRequest() ?: 0 + } + /** * Determines if a task can be submitted for execution checking if the resources required * (cpus and memory) match the amount of avail resource @@ -179,9 +196,13 @@ class LocalPollingMonitor extends TaskPollingMonitor { if( taskMemory>maxMemory) throw new ProcessUnrecoverableException("Process requirement exceeds available memory -- req: ${new MemoryUnit(taskMemory)}; avail: ${new MemoryUnit(maxMemory)}") - final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory + final taskAccelerators = accelerators(handler) + if( taskAccelerators > acceleratorTracker.total() ) + throw new ProcessUnrecoverableException("Process requirement exceeds available accelerators -- req: $taskAccelerators; avail: ${acceleratorTracker.total()}") + + final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory && taskAccelerators <= acceleratorTracker.available() if( !result && log.isTraceEnabled( ) ) { - log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)}" + log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)} && taskAccelerators: $taskAccelerators <= availAccelerators: ${acceleratorTracker.available()}" } return result } @@ -194,9 +215,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { */ @Override protected void submit(TaskHandler handler) { - super.submit(handler) availCpus -= cpus(handler) availMemory -= mem(handler) + + final taskAccelerators = accelerators(handler) + if( handler instanceof LocalTaskHandler && taskAccelerators > 0 ) { + handler.acceleratorEnv = acceleratorTracker.name() + handler.acceleratorIds = acceleratorTracker.acquire(taskAccelerators) + } + + super.submit(handler) } /** @@ -209,11 +237,14 @@ class LocalPollingMonitor extends TaskPollingMonitor { * {@code true} when the task is successfully removed from polling queue, * {@code false} otherwise */ + @Override protected boolean remove(TaskHandler handler) { final result = super.remove(handler) if( result ) { availCpus += cpus(handler) availMemory += mem(handler) + if( handler instanceof LocalTaskHandler ) + acceleratorTracker.release(handler.acceleratorIds ?: Collections.emptyList()) } return result } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy b/modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy new file mode 100644 index 0000000000..e56b8a167d --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy @@ -0,0 +1,69 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.util + +import java.util.concurrent.Semaphore + +import groovy.transform.CompileStatic + +/** + * Specialized semaphore that keeps track of which permits + * are being used. + * + * @author Ben Sherman + */ +@CompileStatic +class TrackingSemaphore { + private final Semaphore semaphore + private final Map availIds + + TrackingSemaphore(List ids) { + semaphore = new Semaphore(ids.size()) + availIds = new HashMap<>(ids.size()) + for( final id : ids ) + availIds.put(id, true) + } + + int totalPermits() { + return availIds.size() + } + + int availablePermits() { + return semaphore.availablePermits() + } + + List acquire(int permits) { + semaphore.acquire(permits) + final result = new ArrayList(permits) + for( final entry : availIds.entrySet() ) { + if( entry.getValue() ) { + entry.setValue(false) + result.add(entry.getKey()) + } + if( result.size() == permits ) + break + } + return result + } + + void release(List ids) { + semaphore.release(ids.size()) + for( final id : ids ) + availIds.put(id, true) + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/AcceleratorTrackerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/AcceleratorTrackerTest.groovy new file mode 100644 index 0000000000..e97b1d780a --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/AcceleratorTrackerTest.groovy @@ -0,0 +1,138 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor + +import spock.lang.Specification +import spock.lang.Unroll +/** + * + * @author Ben Sherman + */ +class AcceleratorTrackerTest extends Specification { + + @Unroll + def 'should create tracker from device environment variables'() { + given: + def tracker = AcceleratorTracker.create(ENV) + + expect: + tracker.name() == NAME + tracker.total() == TOTAL + tracker.available() == TOTAL + + where: + ENV | NAME | TOTAL + [:] | null | 0 + ['CUDA_VISIBLE_DEVICES': '0,1,2'] | 'CUDA_VISIBLE_DEVICES' | 3 + ['HIP_VISIBLE_DEVICES': 'gpu0,gpu1'] | 'HIP_VISIBLE_DEVICES' | 2 + ['ROCR_VISIBLE_DEVICES': 'device1,device2,device3,device4'] | 'ROCR_VISIBLE_DEVICES' | 4 + } + + def 'should handle single device ID'() { + given: + def env = ['CUDA_VISIBLE_DEVICES': '0'] + + when: + def tracker = AcceleratorTracker.create(env) + + then: + tracker.name() == 'CUDA_VISIBLE_DEVICES' + tracker.total() == 1 + tracker.available() == 1 + } + + def 'should handle UUID device IDs'() { + given: + def env = ['CUDA_VISIBLE_DEVICES': 'GPU-12345678-1234-1234-1234-123456789abc,GPU-87654321-4321-4321-4321-cba987654321'] + + when: + def tracker = AcceleratorTracker.create(env) + + then: + tracker.name() == 'CUDA_VISIBLE_DEVICES' + tracker.total() == 2 + tracker.available() == 2 + } + + def 'should acquire and release permits correctly'() { + given: + def env = ['CUDA_VISIBLE_DEVICES': '0,1,2'] + def tracker = AcceleratorTracker.create(env) + + when: + def acquired1 = tracker.acquire(2) + + then: + acquired1.size() == 2 + acquired1.containsAll(['0', '1']) + tracker.available() == 1 + + when: + def acquired2 = tracker.acquire(1) + + then: + acquired2.size() == 1 + acquired2.contains('2') + tracker.available() == 0 + + when: + tracker.release(acquired1) + + then: + tracker.available() == 2 + + when: + tracker.release(acquired2) + + then: + tracker.available() == 3 + } + + def 'should handle acquiring all permits'() { + given: + def env = ['CUDA_VISIBLE_DEVICES': '0,1,2'] + def tracker = AcceleratorTracker.create(env) + + when: + def acquired = tracker.acquire(3) + + then: + acquired.size() == 3 + acquired.containsAll(['0', '1', '2']) + tracker.available() == 0 + + when: + tracker.release(acquired) + + then: + tracker.available() == 3 + } + + def 'should handle empty device list'() { + given: + def env = ['CUDA_VISIBLE_DEVICES': ''] + + when: + def tracker = AcceleratorTracker.create(env) + + then: + tracker.name() == 'CUDA_VISIBLE_DEVICES' + tracker.total() == 0 + tracker.available() == 0 + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy index 244f66288f..69f916053b 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy @@ -20,6 +20,7 @@ import java.lang.management.ManagementFactory import com.sun.management.OperatingSystemMXBean import nextflow.Session +import nextflow.SysEnv import nextflow.exception.ProcessUnrecoverableException import nextflow.executor.ExecutorConfig import nextflow.util.MemoryUnit @@ -214,6 +215,38 @@ class LocalPollingMonitorTest extends Specification { } + def 'should throw an exception for missing accelerators' () { + + given: + SysEnv.push(CUDA_VISIBLE_DEVICES: '0,1,2,3') + and: + def _20_GB = MemoryUnit.of('20GB').toBytes() + def session = new Session() + def monitor = new LocalPollingMonitor( + cpus: 10, + capacity: 20, + memory: _20_GB, + session: session, + name: 'local', + pollInterval: 100 + ) + and: + def task = new TaskRun() + task.config = new TaskConfig(accelerator: 8) + def handler = Mock(TaskHandler) + handler.getTask() >> { task } + + when: + monitor.canSubmit(handler) + then: + def e2 = thrown(ProcessUnrecoverableException) + e2.message == 'Process requirement exceeds available accelerators -- req: 8; avail: 4' + + cleanup: + SysEnv.pop() + + } + def 'should get the number of cpus' () { given: diff --git a/tests/checks/local-gpu.nf/.checks b/tests/checks/local-gpu.nf/.checks new file mode 100644 index 0000000000..36bc8a5f9b --- /dev/null +++ b/tests/checks/local-gpu.nf/.checks @@ -0,0 +1,7 @@ + +export CUDA_VISIBLE_DEVICES="0,1,2,3,4,5,6,7" + +$NXF_RUN | tee .stdout + +[[ `grep INFO .nextflow.log | grep -c 'Submitted process'` == 16 ]] || false +cmp .expected <(sort .stdout) || false diff --git a/tests/checks/local-gpu.nf/.expected b/tests/checks/local-gpu.nf/.expected new file mode 100644 index 0000000000..6a423cfc79 --- /dev/null +++ b/tests/checks/local-gpu.nf/.expected @@ -0,0 +1,16 @@ +matmul1: gpu 0 +matmul1: gpu 1 +matmul1: gpu 2 +matmul1: gpu 3 +matmul1: gpu 4 +matmul1: gpu 5 +matmul1: gpu 6 +matmul1: gpu 7 +matmul2: gpu 0 +matmul2: gpu 1 +matmul2: gpu 2 +matmul2: gpu 3 +matmul2: gpu 4 +matmul2: gpu 5 +matmul2: gpu 6 +matmul2: gpu 7 diff --git a/tests/local-gpu.nf b/tests/local-gpu.nf new file mode 100644 index 0000000000..c6eb57c5d9 --- /dev/null +++ b/tests/local-gpu.nf @@ -0,0 +1,51 @@ +#!/usr/bin/env nextflow +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +process matmul1 { + accelerator 1 + debug true + + input: + val id + + output: + val id + + script: + ''' + echo "matmul1: gpu $CUDA_VISIBLE_DEVICES" + sleep 5 + ''' +} + +process matmul2 { + accelerator 1 + debug true + + input: + val id + + script: + ''' + echo "matmul2: gpu $CUDA_VISIBLE_DEVICES" + sleep 5 + ''' +} + +workflow { + matmul1( channel.of(1..8) ) | matmul2 +}