Skip to content

Commit 19ac45a

Browse files
committed
feat: support for job manager features;
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 8be8e1a commit 19ac45a

File tree

10 files changed

+131
-51
lines changed

10 files changed

+131
-51
lines changed

fractale/transformer/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ def add(self, name: str, value=None):
7878

7979
# Determine if it's a short (-n) or long (--tasks) option
8080
prefix = "-" if len(name) == 1 else "--"
81-
self.script_lines.append(f"{self.directive}: {prefix}{name}={value}")
81+
self.script_lines.append(f"{self.directive} {prefix}{name}={value}")
8282

8383
def add_flag(self, name: str):
8484
"""
8585
Add a boolean flag (e.g., --exclusive).
8686
"""
87-
self.script_lines.append(f"{self.directive}: --{name}")
87+
self.script_lines.append(f"{self.directive} --{name}")
8888

8989
def render(self) -> str:
9090
"""

fractale/transformer/cobalt/transform.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,12 @@ def convert(self, spec):
154154
)
155155
qsub_cmd.extend(["--dependencies", dep_str])
156156

157-
# Node constraints are handled by --attrs
158-
if spec.constraints:
159-
qsub_cmd.extend(["--attrs", ":".join(spec.constraints)])
157+
# Node constraints and GPU type are handled by --attrs
158+
attrs = list(spec.constraints)
159+
if spec.gpu_type:
160+
attrs.append(f"gpu_type={spec.gpu_type}")
161+
if attrs:
162+
qsub_cmd.extend(["--attrs", ":".join(attrs)])
160163

161164
# -O sets the prefix for output/error files, which is derived from the job name.
162165
qsub_cmd.extend(["-O", job_name])
@@ -284,7 +287,11 @@ def _parse(self, content, return_unhandled=False):
284287
elif key == "dependencies":
285288
spec.depends_on = value.split(":")
286289
elif key == "attrs":
287-
spec.constraints = value.split(":")
290+
for attr in value.split(":"):
291+
if attr.startswith("gpu_type="):
292+
spec.gpu_type = attr.split("=", 1)[1]
293+
else:
294+
spec.constraints.append(attr)
288295
elif key == "M":
289296
spec.mail_user = value
290297
elif key == "notify" and value == "user":

fractale/transformer/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import sys
21
from dataclasses import dataclass, field
32
from typing import Dict, List, Optional, Union
43

@@ -32,6 +31,7 @@ class JobSpec:
3231
cpus_per_task: int = 1
3332
mem_per_task: Optional[str] = None
3433
gpus_per_task: int = 0
34+
gpu_type: Optional[str] = None
3535

3636
# Scheduling and Constraints
3737
wall_time: Optional[int] = None

fractale/transformer/flux/transform.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import re
2+
import shlex
3+
from typing import Optional
4+
15
from fractale.logger.generate import JobNamer
26
from fractale.transformer.base import Script, TransformerBase
37
from fractale.transformer.flux.validate import Validator
@@ -15,22 +19,15 @@ def priority_to_flux_priority(class_name):
1519
numerical priority value. This is the reverse of map_numeric_priority_to_class_name.
1620
"""
1721
# Define the mapping from the string class back to a representative number.
18-
mapping = {
19-
"low": 15,
20-
"normal": 16,
21-
"high": 50,
22-
"urgent": 100,
23-
}
22+
mapping = {"low": 15, "normal": 16, "high": 50, "urgent": 100}
23+
2424
# If we don't get it, default to Flux's default
2525
return mapping.get(class_name, 16)
2626

2727

2828
class FluxTransformer(TransformerBase):
2929
"""
30-
A Flux Transformer is a very manual way to transform a subsystem into
31-
a batch script. I am not even using jinja templates, I'm just
32-
parsing the subsystems in a sort of manual way. This a filler,
33-
and assuming that we will have an LLM that can replace this.
30+
A Flux Transformer for converting a generic JobSpec into a Flux batch script.
3431
"""
3532

3633
def _parse(self, jobspec, return_unhandled=False):
@@ -97,6 +94,10 @@ def convert(self, spec):
9794
script.add("c", spec.cpus_per_task if spec.cpus_per_task > 1 else None)
9895
script.add("gpus-per-task", spec.gpus_per_task if spec.gpus_per_task > 0 else None)
9996

97+
# Add a constraint for the specific GPU type, if provided
98+
# We could probably add gpu_type to requires if an admin configures it,
99+
# but it's too risky.
100+
100101
# Scheduling Directives
101102
if spec.exclusive_access:
102103
script.add_flag("exclusive")
@@ -106,7 +107,7 @@ def convert(self, spec):
106107
script.add("t", spec.wall_time)
107108

108109
flux_prio = priority_to_flux_priority(spec.priority)
109-
if flux_prio != 0:
110+
if flux_prio != 16:
110111
script.add("urgency", flux_prio)
111112
script.newline()
112113

fractale/transformer/kubernetes/transform.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def normalize_memory_request(mem_str):
3737
return mem_str
3838

3939

40-
def parse_memory(self, mem_str: str) -> str:
40+
def parse_memory(mem_str: str) -> str:
4141
"""
4242
Converts K8s memory (e.g., 1Gi) to JobSpec format (e.g., 1G).
4343
"""
@@ -53,7 +53,7 @@ def parse_memory(self, mem_str: str) -> str:
5353
return mem_str
5454

5555

56-
def parse_cpu(self, cpu_str: str) -> int:
56+
def parse_cpu(cpu_str: str) -> int:
5757
"""
5858
Converts K8s CPU string to an integer. Assumes no millicores.
5959
"""
@@ -125,12 +125,14 @@ def convert(self, spec):
125125
if spec.environment:
126126
container["env"] = [{"name": k, "value": v} for k, v in spec.environment.items()]
127127

128+
# This is the spec for the pod template
129+
template_pod_spec = {"containers": [container], "restartPolicy": "Never"}
128130
pod_spec = {
129131
"apiVersion": "batch/v1",
130132
"kind": "Job",
131133
"metadata": {"name": job_name},
132134
"spec": {
133-
"template": {"spec": {"containers": [container], "restartPolicy": "Never"}},
135+
"template": {"spec": template_pod_spec},
134136
"backoffLimit": 0,
135137
},
136138
}
@@ -158,7 +160,7 @@ def convert(self, spec):
158160
job_spec = {
159161
"parallelism": spec.num_nodes,
160162
"completions": spec.num_nodes,
161-
"backoffLimit": 4, # A sensible default
163+
"backoffLimit": 4,
162164
"template": {"metadata": {"labels": {"job-name": spec.job_name}}, "spec": pod_spec},
163165
}
164166

fractale/transformer/lsf/transform.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,13 @@ def convert(self, spec):
160160

161161
# Build the complex -R "select[...] span[...] rusage[...]" string
162162
r_parts = []
163-
if spec.constraints:
164-
r_parts.append(f'select[{":".join(spec.constraints)}]')
163+
164+
# Handle select criteria, including GPU type
165+
select_criteria = list(spec.constraints)
166+
if spec.gpu_type:
167+
select_criteria.append(spec.gpu_type)
168+
if select_criteria:
169+
r_parts.append(f'select[{":".join(select_criteria)}]')
165170

166171
if spec.num_nodes > 1 and spec.num_tasks > 0:
167172
tasks_per_node = spec.num_tasks // spec.num_nodes
@@ -252,6 +257,9 @@ def _parse(self, content, return_unhandled=False):
252257
command_lines = []
253258
not_handled = set()
254259

260+
# Heuristic list of common GPU names to identify as gpu_type
261+
known_gpu_types = {"a100", "v100", "h100", "a30", "a40", "mi250"}
262+
255263
for line in content.splitlines():
256264
if not line.strip():
257265
continue
@@ -314,7 +322,13 @@ def _parse(self, content, return_unhandled=False):
314322
if spec.num_tasks > 0 and tasks_per_node > 0:
315323
spec.num_nodes = spec.num_tasks // tasks_per_node
316324
if select_match:
317-
spec.constraints.extend(select_match.group(1).split(":"))
325+
criteria = select_match.group(1).split(":")
326+
for criterion in criteria:
327+
# If a criterion is a known GPU type, set it and move on
328+
if criterion.lower() in known_gpu_types:
329+
spec.gpu_type = criterion
330+
else:
331+
spec.constraints.append(criterion)
318332
else:
319333
not_handled.add(key)
320334
continue

fractale/transformer/moab/transform.py

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import re
22
import shlex
3-
from datetime import timedelta
3+
from datetime import datetime, timedelta
44

55
import fractale.utils as utils
66
from fractale.logger.generate import JobNamer
@@ -169,8 +169,19 @@ def convert(self, spec) -> str:
169169

170170
# Resource Requests
171171
resource_parts = []
172-
if spec.num_nodes and spec.cpus_per_task:
173-
resource_parts.append(f"nodes={spec.num_nodes}:ppn={spec.cpus_per_task}")
172+
node_spec = []
173+
if spec.num_nodes:
174+
node_spec.append(f"nodes={spec.num_nodes}")
175+
if spec.cpus_per_task:
176+
node_spec.append(f"ppn={spec.cpus_per_task}")
177+
if spec.gpus_per_task > 0:
178+
node_spec.append(f"gpus={spec.gpus_per_task}")
179+
if spec.gpu_type:
180+
# Add gpu type as a feature request
181+
node_spec.append(spec.gpu_type)
182+
183+
if node_spec:
184+
resource_parts.append(":".join(node_spec))
174185

175186
if spec.generic_resources:
176187
resource_parts.append(f"gres={spec.generic_resources}")
@@ -307,15 +318,26 @@ def _parse(self, filename, return_unhandled=False):
307318
for part in shlex.split(full_l_string):
308319

309320
# Split combined node:ppn requests first
310-
if "nodes" in part and ":" in part:
321+
if ":" in part:
322+
node_features = []
311323
for subpart in part.split(":"):
312324
if "=" not in subpart:
325+
# This is a feature request, like "gtx1080"
326+
node_features.append(subpart)
313327
continue
328+
314329
k, v = subpart.split("=", 1)
315330
if k == "nodes":
316331
spec.num_nodes = int(v)
317332
elif k == "ppn":
318333
spec.cpus_per_task = int(v)
334+
elif k == "gpus":
335+
spec.gpus_per_task = int(v)
336+
337+
# Heuristic: If we found GPUs and other features, assume the first
338+
# other feature is the gpu_type.
339+
if spec.gpus_per_task > 0 and node_features:
340+
spec.gpu_type = node_features[0]
319341

320342
elif "=" in part:
321343
k, v = part.split("=", 1)
@@ -329,7 +351,7 @@ def _parse(self, filename, return_unhandled=False):
329351
spec.num_tasks = int(v)
330352
elif k == "mem":
331353
spec.mem_per_task = v
332-
elif k == "gres" or k == "gpus":
354+
elif k == "gres":
333355
spec.generic_resources = v
334356
elif k == "depend":
335357
spec.depends_on = v

fractale/transformer/oar/transform.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ def convert(self, spec):
182182
# This requests nodes that *each* have at least this many GPUs.
183183
l_parts.append(f"/gpunum={spec.gpus_per_task}")
184184

185+
# Add the specific GPU type as a resource property
186+
if spec.gpu_type:
187+
l_parts.append(f"/gpu_model='{spec.gpu_type}'")
188+
185189
resource_str = "".join(l_parts)
186190

187191
# Node constraints are added as properties to the resource string.
@@ -311,6 +315,8 @@ def _parse(self, content, return_unhandled=False):
311315
spec.cpus_per_task = int(v)
312316
elif k == "gpunum":
313317
spec.gpus_per_task = int(v)
318+
elif k == "gpu_model":
319+
spec.gpu_type = v.strip("'")
314320
else:
315321
# Assume parts without '=' are constraints
316322
spec.constraints.append(part.strip().strip("'"))

fractale/transformer/pbs/transform.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,19 @@ def convert(self, spec):
148148
# Resource Selection (-l)
149149
select_parts = []
150150
if spec.num_nodes > 0:
151-
select_parts.append(f"select={spec.num_nodes}")
152-
153-
# mpiprocs is often used to specify total tasks, which works well with our spec
154-
if spec.num_tasks > 1:
155-
select_parts.append(f"mpiprocs={spec.num_tasks}")
151+
# Build the select statement parts
152+
node_spec = [f"select={spec.num_nodes}"]
153+
if spec.cpus_per_task > 1:
154+
node_spec.append(f"ncpus={spec.cpus_per_task}")
155+
if spec.gpus_per_task > 0:
156+
node_spec.append(f"ngpus={spec.gpus_per_task}")
157+
if spec.gpu_type:
158+
node_spec.append(f"gpu_type={spec.gpu_type}")
159+
# mpiprocs is often used to specify total tasks, which works well with our spec
160+
if spec.num_tasks > 1:
161+
node_spec.append(f"mpiprocs={spec.num_tasks}")
156162

157-
if spec.cpus_per_task > 1:
158-
select_parts.append(f"ncpus={spec.cpus_per_task}")
159-
if spec.gpus_per_task > 0:
160-
select_parts.append(f"ngpus={spec.gpus_per_task}")
163+
select_parts.append(":".join(node_spec))
161164

162165
# PBS memory format often includes units like gb or mb
163166
if spec.mem_per_task:
@@ -167,7 +170,7 @@ def convert(self, spec):
167170
mem_val += "b"
168171
select_parts.append(f"mem={mem_val}")
169172

170-
resource_str = ":".join(select_parts)
173+
resource_str = ",".join(select_parts)
171174

172175
wt = seconds_to_pbs(spec.wall_time)
173176
if wt:
@@ -289,7 +292,7 @@ def _parse(self, content, return_unhandled=False):
289292
if k == "walltime":
290293
spec.wall_time = pbs_time_to_seconds(v)
291294
elif k == "select":
292-
# select=N:ncpus=C:mpiprocs=T...
295+
# select=N:ncpus=C:mpiprocs=T:gpu_type=a100...
293296
select_parts = v.split(":")
294297
spec.num_nodes = int(select_parts[0])
295298
for sp in select_parts[1:]:
@@ -298,6 +301,8 @@ def _parse(self, content, return_unhandled=False):
298301
spec.cpus_per_task = int(sv)
299302
elif sk == "ngpus":
300303
spec.gpus_per_task = int(sv)
304+
elif sk == "gpu_type":
305+
spec.gpu_type = sv
301306
elif sk == "mem":
302307
spec.mem_per_task = sv.upper().replace("B", "")
303308
elif sk == "mpiprocs":

0 commit comments

Comments
 (0)