Skip to content

Commit f61f834

Browse files
committed
Change to --until and make the mechanism more general.
You can still specify multiple steps to re-run *from* and can now also specify multiple steps to re-run *to*.
1 parent ef82012 commit f61f834

File tree

2 files changed

+65
-76
lines changed

2 files changed

+65
-76
lines changed

metaflow/cli_components/run_cmds.py

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -192,23 +192,22 @@ def wrapper(*args, **kwargs):
192192
help="If specified, it identifies the task that started this resume call. It is in the form of {step_name}-{task_id}",
193193
)
194194
@click.option(
195-
"--step-only/--no-step-only",
196-
default=False,
195+
"--until",
196+
default=None,
197197
show_default=True,
198-
help="If specified, runs up to the specified step(s) (inclusive) and stops. "
199-
"If the steps are not reachable in the cloned run (ie: parent steps were "
200-
"not executed), an error will be raised.",
198+
help="If specified, runs up to the specified step(s) (exclusive) and stops. "
199+
"Multiple steps can be specified using a comma-separated list.",
201200
)
202-
@click.argument("steps-to-rerun", required=False, nargs=-1)
201+
@click.argument("step-to-rerun", required=False, nargs=-1)
203202
@click.command(help="Resume execution of a previous run of this flow.")
204203
@tracing.cli("cli/resume")
205204
@common_run_options
206205
@click.pass_obj
207206
def resume(
208207
obj,
209208
tags=None,
210-
steps_to_rerun=None,
211-
step_only=False,
209+
step_to_rerun=None,
210+
until=None,
212211
origin_run_id=None,
213212
run_id=None,
214213
clone_only=False,
@@ -230,18 +229,16 @@ def resume(
230229
"A previous run id was not found. Specify --origin-run-id."
231230
)
232231

233-
if steps_to_rerun is None:
232+
if step_to_rerun is None:
234233
steps_to_rerun = set()
235-
if step_only:
236-
raise CommandException(
237-
"Cannot step-only resume without specifying at least one step to execute"
238-
)
239234
else:
235+
if clone_only:
236+
raise CommandException("Cannot specify both --clone-only and --until")
240237
# validate step name
241-
for step_to_rerun in steps_to_rerun:
242-
if step_to_rerun not in obj.graph.nodes:
238+
for s in step_to_rerun:
239+
if s not in obj.graph.nodes:
243240
raise CommandException(
244-
"invalid step name {0} specified, must be step present in "
241+
"Invalid step name {0} specified, must be step present in "
245242
"current form of execution graph. Valid step names include: {1}".format(
246243
step_to_rerun, ",".join(list(obj.graph.nodes.keys()))
247244
)
@@ -259,10 +256,21 @@ def resume(
259256
# f"part of the original execution path for run '{origin_run_id}'."
260257
# )
261258

262-
steps_to_rerun = set(steps_to_rerun)
259+
steps_to_rerun = set(step_to_rerun)
263260

264-
if step_only:
265-
clone_only = False
261+
if clone_only and until is not None:
262+
raise CommandException("Cannot specify both --clone-only and --until")
263+
264+
if until is not None:
265+
until_steps = set(until.split(","))
266+
for step in until_steps:
267+
if step not in obj.graph.nodes:
268+
raise CommandException(
269+
"Invalid until step name {0} specified, must be step present in "
270+
"current form of execution graph. Valid step names include: {1}".format(
271+
step, ",".join(list(obj.graph.nodes.keys()))
272+
)
273+
)
266274
if run_id:
267275
# Run-ids that are provided by the metadata service are always integers.
268276
# External providers or run-ids (like external schedulers) always need to
@@ -290,14 +298,14 @@ def resume(
290298
clone_only=clone_only,
291299
reentrant=reentrant,
292300
steps_to_rerun=steps_to_rerun,
293-
step_only=step_only,
301+
until_steps=until_steps,
294302
max_workers=max_workers,
295303
max_num_splits=max_num_splits,
296304
max_log_size=max_log_size * 1024 * 1024,
297305
resume_identifier=resume_identifier,
298306
)
299307
write_file(run_id_file, runtime.run_id)
300-
if step_only:
308+
if until is not None:
301309
write_latest_run_id(obj, runtime.run_id)
302310

303311
runtime.print_workflow_info()

metaflow/runtime.py

Lines changed: 36 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from contextlib import contextmanager
2727

2828
from . import get_namespace
29-
from .metadata_provider import MetaDatum
3029
from .metaflow_config import FEAT_ALWAYS_UPLOAD_CODE_PACKAGE, MAX_ATTEMPTS, UI_URL
3130
from .exception import (
3231
MetaflowException,
@@ -44,7 +43,6 @@
4443
from .unbounded_foreach import (
4544
CONTROL_TASK_TAG,
4645
UBF_CONTROL,
47-
UBF_TASK,
4846
)
4947

5048
from .user_configs.config_options import ConfigInput
@@ -103,7 +101,7 @@ def __init__(
103101
clone_only=False,
104102
reentrant=False,
105103
steps_to_rerun=None,
106-
step_only=False,
104+
until_steps=None,
107105
max_workers=MAX_WORKERS,
108106
max_num_splits=MAX_NUM_SPLITS,
109107
max_log_size=MAX_LOG_SIZE,
@@ -148,49 +146,31 @@ def __init__(
148146
# If steps_to_rerun is specified, we will not clone them in resume mode.
149147
self._steps_to_rerun = steps_to_rerun or set()
150148
self._steps_can_clone = set()
151-
self._steps_ran = set()
152-
self._step_only = step_only
149+
self._steps_no_run = until_steps or set()
150+
153151
all_steps = set()
154-
cannot_clone_steps = set(self._steps_to_rerun)
155-
# sorted_nodes are in topological order already, so we only need to
156-
# iterate through the nodes once to get a stable set of rerun steps.
157-
# A few modes:
158-
# - no steps_to_rerun:
159-
# - not clone_only and not step_only: clone all previously executed steps and
160-
# continue execution.
161-
# - clone_only and not step_only: clone all steps that have previously executed
162-
# and stop
163-
# - not clone_only and step_only: NOT possible (requires a steps_to_rerun)
164-
# - clone_only and step_only: NOT possible (requires a steps_to_rerun)
165-
# => in all these cases, _steps_to_rerun is empty and so _steps_can_clone is
166-
# all_steps
167-
# - steps_to_rerun:
168-
# - not clone_only and not step_only: clone all previously executed steps *except*
169-
# any of the steps in steps_to_rerun and the subsequent steps. Continue execution.
170-
# => _steps_to_rerun contains the steps to rerun and all descendants. _steps_can_clone
171-
# contains all other steps
172-
# - clone_only and not step_only: clone all steps that have previously executed
173-
# up to (but not including) any of the steps in steps_to_rerun and
174-
# subsequent steps.
175-
# => same as above but steps_to_rerun is not used to run anything
176-
# - not clone_only and step_only: clone all steps that have previously executed
177-
# up to (but not including) any of the steps in steps_to_rerun and
178-
# subsequent steps. Execute *only* the steps in steps_to_rerun if possible
179-
# and stop.
180-
# - clone_only and step_only: NOT possible (if step_only is specified, we turn
181-
# off clone_only -- clone_only implies no further execution since task
182-
# objects will not be generated).
183-
# => _steps_to_rerun contains *only* the initially passed steps to run and
184-
# _steps_can_clone contains the same as in the other cases.
152+
# If clone_only is specified, we should have no until_steps and no steps_to_rerun
153+
# so the computation below yields reruning all the steps that we previously
154+
# executed.
155+
# In the other cases, we will allow the cloning of steps up to but not
156+
# inclusive of anything in steps_to_rerun and at the end, steps_to_rerun
157+
# will contain all steps up to but not inclusive of anything in _steps_no_run.
185158
for step_name in self._graph.sorted_nodes:
186159
all_steps.add(step_name)
187-
if step_name in cannot_clone_steps:
188-
out_funcs = self._graph[step_name].out_funcs or []
160+
out_funcs = self._graph[step_name].out_funcs or []
161+
if step_name in self._steps_no_run:
162+
for next_step in out_funcs:
163+
self._steps_no_run.add(next_step)
164+
elif step_name in self._steps_to_rerun:
189165
for next_step in out_funcs:
190-
cannot_clone_steps.add(next_step)
191-
self._steps_can_clone = all_steps - cannot_clone_steps
192-
if not self._step_only:
193-
self._steps_to_rerun = cannot_clone_steps
166+
# We may add things that are in steps_no_run but
167+
# we will remove them later.
168+
self._steps_to_rerun.add(next_step)
169+
self._steps_to_rerun = self._steps_to_rerun - self._steps_no_run
170+
self._steps_can_clone = all_steps - self._steps_to_rerun - self._steps_no_run
171+
print(f"steps_to_rerun: {self._steps_to_rerun}")
172+
print(f"steps_no_run: {self._steps_no_run}")
173+
print(f"steps_can_clone: {self._steps_can_clone}")
194174

195175
self._origin_ds_set = None
196176
if clone_run_id:
@@ -715,17 +695,19 @@ def execute(self):
715695
system_msg=True,
716696
)
717697
self._params_task.mark_resume_done()
718-
elif self._step_only:
719-
# Check that we ran all the steps in self._steps_to_rerun
720-
steps_missing = self._steps_to_rerun - self._steps_ran
721-
if steps_missing:
722-
raise MetaflowInternalError(
723-
"The following steps were not executed: {0}".format(
724-
", ".join(steps_missing)
725-
)
726-
)
698+
elif self._steps_no_run:
699+
# Ran a subset of the graph
700+
count_cloned = -1 # Account for _parameters task
701+
count_reexec = 0
702+
for t in self._is_cloned.values():
703+
if t:
704+
count_cloned += 1
705+
else:
706+
count_reexec += 1
707+
727708
self._logger(
728-
"Step-only resume complete -- all specified steps were executed!",
709+
f"Partial resume complete -- cloned {count_cloned} step(s) and "
710+
f"executed {count_reexec} step(s)",
729711
system_msg=True,
730712
)
731713
else:
@@ -1125,7 +1107,6 @@ def _queue_tasks(self, finished_tasks):
11251107
# finished tasks include only successful tasks
11261108
for task in finished_tasks:
11271109
step_name, _, _ = task.finished_id
1128-
self._steps_ran.add(step_name)
11291110
self._finished[task.finished_id] = task.path
11301111
self._is_cloned[task.path] = task.is_cloned
11311112

@@ -1190,11 +1171,11 @@ def _queue_tasks(self, finished_tasks):
11901171
)
11911172
)
11921173

1193-
if self._step_only:
1174+
if self._steps_no_run:
11941175
# We need to filter next_steps to only include steps that are in
11951176
# self._steps_to_rerun
11961177
next_steps = [
1197-
step for step in next_steps if step in self._steps_to_rerun
1178+
step for step in next_steps if step not in self._steps_no_run
11981179
]
11991180
if not next_steps:
12001181
# No steps to execute, so we can stop

0 commit comments

Comments
 (0)