Skip to content

Commit 87799fb

Browse files
author
John Major
committed
0.0.30
1 parent dbe25c3 commit 87799fb

File tree

3 files changed

+234
-238
lines changed

3 files changed

+234
-238
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "snakemake-executor-plugin-pcluster-slurm"
3-
version = "0.0.25"
3+
version = "0.0.30"
44
description = "A Snakemake executor plugin for submitting jobs to an AWS Parallel Cluster (pcluster) SLURM cluster."
55
authors = [
66
"John Major <john@daylilyinformatics.com>",

snakemake_executor_plugin_pcluster_slurm/__init__.py

Lines changed: 144 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import re
1010
import shlex
1111
import subprocess
12-
import signal
1312
import time
1413
from dataclasses import dataclass, field
1514
from datetime import datetime, timedelta
@@ -25,7 +24,7 @@
2524
JobExecutorInterface,
2625
)
2726
from snakemake_interface_common.exceptions import WorkflowError
28-
#from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task
27+
from snakemake_executor_plugin_slurm_jobstep import get_cpus_per_task
2928

3029
from .utils import delete_slurm_environment
3130

@@ -92,18 +91,6 @@ def __post_init__(self):
9291
self._fallback_account_arg = None
9392
self._fallback_partition = None
9493
self._preemption_warning = False # no preemption warning has been issued
95-
# Register signal handlers to ensure jobs are killed on exit
96-
self.active_jobs = []
97-
signal.signal(signal.SIGINT, self._handle_exit)
98-
signal.signal(signal.SIGTERM, self._handle_exit)
99-
100-
def _handle_exit(self, signum, frame):
101-
"""Handle termination signals to cancel active Slurm jobs."""
102-
self.logger.info(f"Received signal {signum}. Canceling all active jobs.")
103-
104-
self.cancel_jobs(self.active_jobs)
105-
self.logger.info("All jobs canceled. Exiting.")
106-
exit(1)
10794

10895
def warn_on_jobcontext(self, done=None):
10996
if not done:
@@ -121,84 +108,155 @@ def additional_general_args(self):
121108
return "--executor slurm-jobstep --jobs 1"
122109

123110
def run_job(self, job: JobExecutorInterface):
124-
"""Submit a job to SLURM and track it for cancellation if needed."""
125-
# Determine job group or rule name
111+
# Implement here how to run a job.
112+
# You can access the job's resources, etc.
113+
# via the job object.
114+
# After submitting the job, you have to call
115+
# self.report_job_submission(job_info).
116+
# with job_info being of type
117+
# snakemake_interface_executor_plugins.executors.base.SubmittedJobInfo.
118+
126119
group_or_rule = f"group_{job.name}" if job.is_group() else f"rule_{job.name}"
127-
wildcard_str = "_".join(job.wildcards) if job.wildcards else ""
128120

129-
# Prepare log file paths
130-
log_dir = f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}"
131-
os.makedirs(log_dir, exist_ok=True)
121+
try:
122+
wildcard_str = "_".join(job.wildcards) if job.wildcards else ""
123+
except AttributeError:
124+
wildcard_str = ""
125+
126+
slurm_logfile = os.path.abspath(
127+
f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.log"
128+
)
129+
logdir = os.path.dirname(slurm_logfile)
132130

133-
slurm_logfile = os.path.abspath(f"{log_dir}/%j.log")
134-
slurm_errorlogfile = os.path.abspath(f"{log_dir}/%j.err")
131+
slurm_errorlogfile = os.path.abspath(
132+
f".snakemake/slurm_logs/{group_or_rule}/{wildcard_str}/%j.err"
133+
)
134+
errlogdir = os.path.dirname(slurm_errorlogfile)
135+
136+
# this behavior has been fixed in slurm 23.02, but there might be plenty of
137+
# older versions around, hence we should rather be conservative here.
138+
assert "%j" not in logdir, (
139+
"bug: jobid placeholder in parent dir of logfile. This does not work as "
140+
"we have to create that dir before submission in order to make sbatch "
141+
"happy. Otherwise we get silent fails without logfiles being created."
142+
)
143+
os.makedirs(logdir, exist_ok=True)
135144

136-
# SLURM job submission command
137-
comment_str = os.getenv('SMK_SLURM_COMMENT', 'RandD')
145+
# generic part of a submission string:
146+
# we use a run_uuid as the job-name, to allow `--name`-based
147+
# filtering in the job status checks (`sacct --name` and `squeue --name`)
148+
149+
#if wildcard_str == "":
150+
# comment_str = f"rule_{job.name}"
151+
#else:
152+
# comment_str = f"rule_{job.name}_wildcards_{wildcard_str}"
153+
comment_str=os.getenv('SMK_SLURM_COMMENT','RandD')
138154
call = (
139-
f"sbatch --parsable --no-requeue "
155+
f"sbatch "
156+
f"--parsable "
157+
f" --no-requeue "
140158
f"--comment '{comment_str}' "
141159
f"--job-name '{job.name}-{self.run_uuid}' "
160+
f"--distribution block "
142161
f"--chdir {os.getcwd()} "
143162
f"--error '{slurm_errorlogfile}' "
144163
f"--output '{slurm_logfile}' "
145164
)
146165

147-
# Add partition and resources if defined
148166
call += self.get_partition_arg(job)
149-
call += f" --ntasks={job.resources.get('tasks', 1)} "
150-
call += f" --cpus-per-task={max(1, job.threads)}"
151-
if job.resources.get("mem_mb"):
152-
call += f" --mem={job.resources['mem_mb']}"
153-
154-
# Generate the execution command
155-
exec_job = self.format_job_exec(job)
156-
call += f" -D {self.workflow.workdir_init} <<EOF\n#!/bin/bash\n{exec_job}\nEOF"
157167

158-
# Submit the job to SLURM
159-
try:
160-
out = subprocess.check_output(call, shell=True, text=True, stderr=subprocess.STDOUT).strip()
161-
slurm_jobid = out.split(";")[0]
162-
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
168+
if self.workflow.executor_settings.requeue:
169+
call += " --no-requeue "
163170

164-
self.logger.info(f"Job {job.jobid} submitted with SLURM jobid {slurm_jobid} (log: {slurm_logfile}).")
171+
if job.resources.get("clusters"):
172+
call += f" --clusters {job.resources.clusters}"
165173

166-
# Track active job for future cancellation
167-
submitted_job = SubmittedJobInfo(job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile})
168-
self.active_jobs.append(submitted_job)
169-
self.report_job_submission(submitted_job)
174+
if job.resources.get("runtime"):
175+
call += f" -t {job.resources.runtime}"
176+
else:
177+
self.logger.warning(
178+
"No wall time information given. This might or might not "
179+
"work on your cluster. "
180+
"If not, specify the resource runtime in your rule or as a reasonable "
181+
"default via --default-resources."
182+
)
170183

171-
except subprocess.CalledProcessError as e:
172-
self.logger.error(f"SLURM job submission failed: {e.output}")
173-
raise WorkflowError(f"SLURM job submission failed: {e.output}")
184+
if job.resources.get("constraint"):
185+
call += f" -C '{job.resources.constraint}'"
186+
if job.resources.get("mem_mb_per_cpu"):
187+
call += f" --mem-per-cpu {job.resources.mem_mb_per_cpu}"
188+
elif job.resources.get("mem_mb"):
189+
call += f" --mem {job.resources.mem_mb}"
190+
else:
191+
self.logger.warning(
192+
"No job memory information ('mem_mb' or 'mem_mb_per_cpu') is given "
193+
"- submitting without. This might or might not work on your cluster."
194+
)
174195

196+
if job.resources.get("nodes", False):
197+
call += f" --nodes={job.resources.get('nodes', 1)}"
175198

176-
self.logger.debug(f"sbatch call: {call}")
177-
try:
178-
out = subprocess.check_output(
179-
call, shell=True, text=True, stderr=subprocess.STDOUT
180-
).strip()
181-
except subprocess.CalledProcessError as e:
182-
raise WorkflowError(
183-
f"SLURM job submission failed. The error message was {e.output}"
199+
# fixes #40 - set ntasks regardless of mpi, because
200+
# SLURM v22.05 will require it for all jobs
201+
call += f" --ntasks={job.resources.get('tasks', 1)}"
202+
# MPI job
203+
if job.resources.get("mpi", False):
204+
if not job.resources.get("tasks_per_node") and not job.resources.get(
205+
"nodes"
206+
):
207+
self.logger.warning(
208+
"MPI job detected, but no 'tasks_per_node' or 'nodes' "
209+
"specified. Assuming 'tasks_per_node=1'."
210+
"Probably not what you want."
184211
)
212+
213+
n_cpus = 1 if int(get_cpus_per_task(job)) <= 1 else int(get_cpus_per_task(job))
214+
215+
call += f" --cpus-per-task={n_cpus}"
216+
217+
if job.resources.get("slurm_extra"):
218+
self.check_slurm_extra(job)
219+
call += f" {job.resources.slurm_extra}"
220+
221+
exec_job = self.format_job_exec(job)
222+
223+
# ensure that workdir is set correctly
224+
# use short argument as this is the same in all slurm versions
225+
# (see https://github.com/snakemake/snakemake/issues/2014)
226+
call += f" -D {self.workflow.workdir_init}"
227+
# and finally the job to execute with all the snakemake parameters
228+
call += f''' <<EOF
229+
#!/bin/bash
230+
{exec_job}
231+
EOF
232+
'''
185233

186-
# multicluster submissions yield submission infos like
187-
# "Submitted batch job <id> on cluster <name>" by default, but with the
188-
# --parsable option it simply yields "<id>;<name>".
189-
# To extract the job id we split by semicolon and take the first element
190-
# (this also works if no cluster name was provided)
191-
slurm_jobid = out.split(";")[0]
192-
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
193-
self.logger.info(
194-
f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} "
195-
f"(log: {slurm_logfile})."
234+
self.logger.debug(f"sbatch call: {call}")
235+
try:
236+
out = subprocess.check_output(
237+
call, shell=True, text=True, stderr=subprocess.STDOUT
238+
).strip()
239+
except subprocess.CalledProcessError as e:
240+
raise WorkflowError(
241+
f"SLURM job submission failed. The error message was {e.output}"
196242
)
197-
self.report_job_submission(
198-
SubmittedJobInfo(
199-
job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile}
200-
)
243+
244+
# multicluster submissions yield submission infos like
245+
# "Submitted batch job <id> on cluster <name>" by default, but with the
246+
# --parsable option it simply yields "<id>;<name>".
247+
# To extract the job id we split by semicolon and take the first element
248+
# (this also works if no cluster name was provided)
249+
slurm_jobid = out.split(";")[0]
250+
slurm_logfile = slurm_logfile.replace("%j", slurm_jobid)
251+
self.logger.info(
252+
f"Job {job.jobid} has been submitted with SLURM jobid {slurm_jobid} "
253+
f"(log: {slurm_logfile})."
254+
)
255+
self.report_job_submission(
256+
SubmittedJobInfo(
257+
job, external_jobid=slurm_jobid, aux={"slurm_logfile": slurm_logfile}
201258
)
259+
)
202260

203261
async def check_active_jobs(
204262
self, active_jobs: List[SubmittedJobInfo]
@@ -260,26 +318,37 @@ async def check_active_jobs(
260318
# Assume job is still running
261319
yield job_info
262320

321+
263322
def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
264-
"""Cancel all active Slurm jobs."""
323+
# Cancel all active jobs.
324+
# This method is called when Snakemake is interrupted.
265325
if active_jobs:
326+
# TODO chunk jobids in order to avoid too long command lines
266327
jobids = " ".join([job_info.external_jobid for job_info in active_jobs])
267328
try:
268-
scancel_command = f"scancel {jobids}"
329+
# timeout set to 60, because a scheduler cycle usually is
330+
# about 30 sec, but can be longer in extreme cases.
331+
# Under 'normal' circumstances, 'scancel' is executed in
332+
# virtually no time.
333+
scancel_command = f"scancel {jobids} --clusters=all"
334+
269335
subprocess.check_output(
270336
scancel_command,
271337
text=True,
272338
shell=True,
273339
timeout=60,
274340
stderr=subprocess.PIPE,
275341
)
276-
self.logger.info(f"Successfully canceled jobs: {jobids}")
277342
except subprocess.TimeoutExpired:
278-
self.logger.warning("Unable to cancel jobs within the timeout period.")
343+
self.logger.warning("Unable to cancel jobs within a minute.")
279344
except subprocess.CalledProcessError as e:
280345
msg = e.stderr.strip()
281-
self.logger.error(f"Failed to cancel jobs: {msg}")
282-
346+
if msg:
347+
msg = f": {msg}"
348+
raise WorkflowError(
349+
"Unable to cancel jobs with scancel "
350+
f"(exit code {e.returncode}){msg}"
351+
) from e
283352

284353
def get_partition_arg(self, job: JobExecutorInterface):
285354
"""

0 commit comments

Comments
 (0)