From 1082f6e30a97a12ab6c78c67d4ff4616a9778767 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 4 Mar 2025 10:59:53 -0600 Subject: [PATCH 1/4] Manage NVIDIA GPU slots in local executor Signed-off-by: Ben Sherman --- docs/reference/config.md | 6 ++ .../executor/local/LocalTaskHandler.groovy | 16 ++++- .../processor/LocalPollingMonitor.groovy | 44 ++++++++++++- .../nextflow/util/TrackingSemaphore.groovy | 65 +++++++++++++++++++ 4 files changed, 126 insertions(+), 5 deletions(-) create mode 100644 modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy diff --git a/docs/reference/config.md b/docs/reference/config.md index 6a8f1ded47..15c2ec8d41 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -645,6 +645,12 @@ The following settings are available: `executor.exitReadTimeout` : Determines how long to wait before returning an error status when a process is terminated but the `.exitcode` file does not exist or is empty (default: `270 sec`). Used only by grid executors. +`executor.gpus` +: :::{versionadded} 25.04.0 + ::: +: *Used only by the `local` executor.* +: The maximum number of NVIDIA GPUs made available by the underlying system. When this setting is enabled, each local task is assigned GPUs based on their `accelerator` request, using the `CUDA_VISIBLE_DEVICES` environment variable. + `executor.jobName` : Determines the name of jobs submitted to the underlying cluster executor e.g. `executor.jobName = { "$task.name - $task.hash" }`. Make sure the resulting job name matches the validation constraints of the underlying batch scheduler. : This setting is supported by the following executors: Bridge, Condor, Flux, HyperQueue, Lsf, Moab, Nqsii, Oar, PBS, PBS Pro, SGE, SLURM and Google Batch. diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy index 0bad8bb9c7..26e345d3a9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/local/LocalTaskHandler.groovy @@ -80,6 +80,8 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { private volatile TaskResult result + List gpuSlots + LocalTaskHandler(TaskRun task, LocalExecutor executor) { super(task) // create the task handler @@ -142,11 +144,13 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { final workDir = task.workDir.toFile() final logFile = new File(workDir, TaskRun.CMD_LOG) - return new ProcessBuilder() + final pb = new ProcessBuilder() .redirectErrorStream(true) .redirectOutput(logFile) .directory(workDir) .command(cmd) + applyGpuSlots(pb) + return pb } protected ProcessBuilder fusionProcessBuilder() { @@ -162,10 +166,18 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask { final logPath = Files.createTempFile('nf-task','.log') - return new ProcessBuilder() + final pb = new ProcessBuilder() .redirectErrorStream(true) .redirectOutput(logPath.toFile()) .command(List.of('sh','-c', cmd)) + applyGpuSlots(pb) + return pb + } + + protected void applyGpuSlots(ProcessBuilder pb) { + if( !gpuSlots ) + return + pb.environment().put('CUDA_VISIBLE_DEVICES', gpuSlots.join(',')) } protected ProcessBuilder createLaunchProcessBuilder() { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy index d9dbda638a..57bf63ec24 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy @@ -23,8 +23,10 @@ import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.Session import nextflow.exception.ProcessUnrecoverableException +import nextflow.executor.local.LocalTaskHandler import nextflow.util.Duration import nextflow.util.MemoryUnit +import nextflow.util.TrackingSemaphore /** * Task polling monitor specialized for local execution. It manages tasks scheduling @@ -58,6 +60,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { */ private final long maxMemory + /** + * Number of `free` GPUs available to execute pending tasks + */ + private TrackingSemaphore availGpus + + /** + * Total number of CPUs available in the system + */ + private final int maxGpus + /** * Create the task polling monitor with the provided named parameters object. *

@@ -74,6 +86,8 @@ class LocalPollingMonitor extends TaskPollingMonitor { super(params) this.availCpus = maxCpus = params.cpus as int this.availMemory = maxMemory = params.memory as long + this.maxGpus = params.gpus as int + this.availGpus = new TrackingSemaphore(maxGpus) assert availCpus>0, "Local avail `cpus` attribute cannot be zero" assert availMemory>0, "Local avail `memory` attribute cannot zero" } @@ -98,14 +112,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { final int cpus = configCpus(session,name) final long memory = configMem(session,name) + final int gpus = configGpus(session,name) final int size = session.getQueueSize(name, OS.getAvailableProcessors()) - log.debug "Creating local task monitor for executor '$name' > cpus=$cpus; memory=${new MemoryUnit(memory)}; capacity=$size; pollInterval=$pollInterval; dumpInterval=$dumpInterval" + log.debug "Creating local task monitor for executor '$name' > cpus=$cpus; memory=${new MemoryUnit(memory)}; gpus=$gpus; capacity=$size; pollInterval=$pollInterval; dumpInterval=$dumpInterval" new LocalPollingMonitor( name: name, cpus: cpus, memory: memory, + gpus: gpus, session: session, capacity: size, pollInterval: pollInterval, @@ -128,6 +144,11 @@ class LocalPollingMonitor extends TaskPollingMonitor { (session.getExecConfigProp(name, 'memory', OS.getTotalPhysicalMemorySize()) as MemoryUnit).toBytes() } + @PackageScope + static int configGpus(Session session, String name) { + return session.getExecConfigProp(name, 'gpus', 0) as int + } + /** * @param handler * A {@link TaskHandler} instance @@ -149,6 +170,16 @@ class LocalPollingMonitor extends TaskPollingMonitor { handler.task.getConfig()?.getMemory()?.toBytes() ?: 1L } + /** + * @param handler + * A {@link TaskHandler} instance + * @return + * The number of gpus requested to execute the specified task + */ + private static int gpus(TaskHandler handler) { + handler.task.getConfig()?.getAccelerator()?.getRequest() ?: 0 + } + /** * Determines if a task can be submitted for execution checking if the resources required * (cpus and memory) match the amount of avail resource @@ -174,9 +205,14 @@ class LocalPollingMonitor extends TaskPollingMonitor { if( taskMemory>maxMemory) throw new ProcessUnrecoverableException("Process requirement exceeds available memory -- req: ${new MemoryUnit(taskMemory)}; avail: ${new MemoryUnit(maxMemory)}") - final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory + final taskGpus = gpus(handler) + if( taskGpus>maxGpus ) + throw new ProcessUnrecoverableException("Process requirement exceeds available GPUs -- req: $taskGpus; avail: $maxGpus") + + final availGpus0 = availGpus.availablePermits() + final result = super.canSubmit(handler) && taskCpus <= availCpus && taskMemory <= availMemory && taskGpus <= availGpus0 if( !result && log.isTraceEnabled( ) ) { - log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)}" + log.trace "Task `${handler.task.name}` cannot be scheduled -- taskCpus: $taskCpus <= availCpus: $availCpus && taskMemory: ${new MemoryUnit(taskMemory)} <= availMemory: ${new MemoryUnit(availMemory)} && taskGpus: $taskGpus <= availGpus: ${availGpus0}" } return result } @@ -192,6 +228,7 @@ class LocalPollingMonitor extends TaskPollingMonitor { super.submit(handler) availCpus -= cpus(handler) availMemory -= mem(handler) + ((LocalTaskHandler) handler).gpuSlots = availGpus.acquire(gpus(handler)) } /** @@ -209,6 +246,7 @@ class LocalPollingMonitor extends TaskPollingMonitor { if( result ) { availCpus += cpus(handler) availMemory += mem(handler) + availGpus.release(((LocalTaskHandler) handler).gpuSlots) } return result } diff --git a/modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy b/modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy new file mode 100644 index 0000000000..78f444e6e9 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/util/TrackingSemaphore.groovy @@ -0,0 +1,65 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.util + +import java.util.concurrent.Semaphore + +import groovy.transform.CompileStatic + +/** + * Specialized semaphore that keeps track of which slots + * are being used. + * + * @author Ben Sherman + */ +@CompileStatic +class TrackingSemaphore { + private final Semaphore semaphore + private final Map availIds + + TrackingSemaphore(int permits) { + semaphore = new Semaphore(permits) + availIds = new HashMap<>(permits) + for( int i=0; i acquire(int permits) { + semaphore.acquire(permits) + final result = new ArrayList(permits) + for( final entry : availIds.entrySet() ) { + if( entry.getValue() ) { + entry.setValue(false) + result.add(entry.getKey()) + } + if( result.size() == permits ) + break + } + return result + } + + void release(List ids) { + semaphore.release(ids.size()) + for( id in ids ) + availIds.put(id, true) + } + +} From de4ae6c2c979243c62e3a9fb12e744f0a4ef9e3a Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 4 Mar 2025 11:26:51 -0600 Subject: [PATCH 2/4] Fix failing tests Signed-off-by: Ben Sherman --- .../nextflow/processor/LocalPollingMonitor.groovy | 3 ++- .../processor/LocalPollingMonitorTest.groovy | 12 +++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy index 57bf63ec24..36e0cdb91e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy @@ -241,12 +241,13 @@ class LocalPollingMonitor extends TaskPollingMonitor { * {@code true} when the task is successfully removed from polling queue, * {@code false} otherwise */ + @Override protected boolean remove(TaskHandler handler) { final result = super.remove(handler) if( result ) { availCpus += cpus(handler) availMemory += mem(handler) - availGpus.release(((LocalTaskHandler) handler).gpuSlots) + availGpus.release(((LocalTaskHandler) handler).gpuSlots ?: []) } return result } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy index e6f6150ce5..246a246319 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/LocalPollingMonitorTest.groovy @@ -21,6 +21,7 @@ import java.lang.management.ManagementFactory import com.sun.management.OperatingSystemMXBean import nextflow.Session import nextflow.exception.ProcessUnrecoverableException +import nextflow.executor.local.LocalTaskHandler import nextflow.util.MemoryUnit import spock.lang.Specification /** @@ -38,6 +39,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 20, memory: _20_GB, + gpus: 0, session: session, name: 'local', pollInterval: 100 @@ -45,7 +47,7 @@ class LocalPollingMonitorTest extends Specification { def task = new TaskRun() task.config = new TaskConfig(cpus: 3, memory: MemoryUnit.of('2GB')) - def handler = Mock(TaskHandler) + def handler = Mock(LocalTaskHandler) handler.getTask() >> { task } expect: @@ -86,6 +88,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 10, memory: _20_GB, + gpus: 0, session: session, name: 'local', pollInterval: 100 @@ -93,7 +96,7 @@ class LocalPollingMonitorTest extends Specification { def task = new TaskRun() task.config = new TaskConfig(cpus: 4, memory: MemoryUnit.of('8GB')) - def handler = Mock(TaskHandler) + def handler = Mock(LocalTaskHandler) handler.getTask() >> { task } handler.canForkProcess() >> true handler.isReady() >> true @@ -132,6 +135,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 1, capacity: 1, memory: _20_GB, + gpus: 0, session: session, name: 'local', pollInterval: 100 @@ -139,7 +143,7 @@ class LocalPollingMonitorTest extends Specification { def task = new TaskRun() task.config = new TaskConfig(cpus: 1, memory: MemoryUnit.of('8GB')) - def handler = Mock(TaskHandler) + def handler = Mock(LocalTaskHandler) handler.getTask() >> { task } handler.canForkProcess() >> true handler.isReady() >> true @@ -167,6 +171,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 20, memory: _20_GB, + gpus: 0, session: session, name: 'local', pollInterval: 100 @@ -195,6 +200,7 @@ class LocalPollingMonitorTest extends Specification { cpus: 10, capacity: 20, memory: _20_GB, + gpus: 0, session: session, name: 'local', pollInterval: 100 From 5f463e6b86aeab53c1ae9da5a36985742e9cefc4 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Tue, 4 Mar 2025 13:42:53 -0600 Subject: [PATCH 3/4] minor edit Signed-off-by: Ben Sherman --- .../main/groovy/nextflow/processor/LocalPollingMonitor.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy index 36e0cdb91e..3ab9bb2ae9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy @@ -247,7 +247,7 @@ class LocalPollingMonitor extends TaskPollingMonitor { if( result ) { availCpus += cpus(handler) availMemory += mem(handler) - availGpus.release(((LocalTaskHandler) handler).gpuSlots ?: []) + availGpus.release(((LocalTaskHandler) handler).gpuSlots ?: Collections.emptyList()) } return result } From 0d9350d99ea3ce2ddd4ca1a599e821a84e42871c Mon Sep 17 00:00:00 2001 From: Alan J Correa Date: Tue, 17 Jun 2025 03:14:53 +0200 Subject: [PATCH 4/4] fix `null` gpuSlots in LocalTaskHandler gpuSlots for the LocalTaskHandler need to be set before calling `submit()` Signed-off-by: Alan J Correa --- .../groovy/nextflow/processor/LocalPollingMonitor.groovy | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy index 3ab9bb2ae9..a500deef2f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/LocalPollingMonitor.groovy @@ -225,10 +225,13 @@ class LocalPollingMonitor extends TaskPollingMonitor { */ @Override protected void submit(TaskHandler handler) { + final taskGpus = gpus(handler) + if ( taskGpus > 0 ) { + ((LocalTaskHandler) handler).gpuSlots = availGpus.acquire(taskGpus) + } super.submit(handler) availCpus -= cpus(handler) availMemory -= mem(handler) - ((LocalTaskHandler) handler).gpuSlots = availGpus.acquire(gpus(handler)) } /**