Skip to content

Commit 8c1d46e

Browse files
authored
checkpoints: completely remove checkpoint outs on exp run --reset (#5586)
* checkpoints: remove checkpoint outs on --reset * exp run: no longer prune/reset lockfiles * update tests * make --queue imply --reset unless --rev is provided
1 parent 11b0581 commit 8c1d46e

File tree

7 files changed

+32
-70
lines changed

7 files changed

+32
-70
lines changed

dvc/command/experiments.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,11 @@ class CmdExperimentsRun(CmdRepro):
514514
def run(self):
515515
from dvc.command.metrics import _show_metrics
516516

517+
if self.args.reset and self.args.checkpoint_resume:
518+
raise InvalidArgumentError(
519+
"--reset and --rev are mutually exclusive."
520+
)
521+
517522
if self.args.reset:
518523
logger.info("Any existing checkpoints will be reset and re-run.")
519524

dvc/output/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,7 @@ def checkout(
408408
relink=False,
409409
filter_info=None,
410410
allow_missing=False,
411+
checkpoint_reset=False,
411412
**kwargs,
412413
):
413414
if not self.use_cache:
@@ -422,6 +423,11 @@ def checkout(
422423
# backward compatibility
423424
return None
424425

426+
if self.checkpoint and checkpoint_reset:
427+
if self.exists:
428+
self.remove()
429+
return None
430+
425431
added = not self.exists
426432

427433
try:

dvc/repo/experiments/__init__.py

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,6 @@ def _stash_exp(
182182
)
183183
self.scm.reset()
184184

185-
self._prune_lockfiles()
186-
187185
# update experiment params from command line
188186
if params:
189187
self._update_params(params)
@@ -244,34 +242,6 @@ def _stash_exp(
244242

245243
return stash_rev
246244

247-
def _prune_lockfiles(self):
248-
from dvc.dvcfile import is_lock_file
249-
250-
# NOTE: dirty DVC lock files must be restored to index state to
251-
# avoid checking out incorrect persist or checkpoint outs
252-
fs = self.scm.get_fs("HEAD")
253-
lock_files = [
254-
str(fname)
255-
for fname in fs.walk_files(self.scm.root_dir)
256-
if is_lock_file(fname)
257-
]
258-
if lock_files:
259-
260-
self.scm.reset(paths=lock_files)
261-
self.scm.checkout_index(paths=lock_files, force=True)
262-
263-
def _prune_untracked_lockfiles(self):
264-
from dvc.dvcfile import is_lock_file
265-
from dvc.utils.fs import remove
266-
267-
untracked = [
268-
fname
269-
for fname in self.scm.untracked_files()
270-
if is_lock_file(fname)
271-
]
272-
for fname in untracked:
273-
remove(fname)
274-
275245
def _stash_msg(
276246
self,
277247
rev: str,
@@ -345,9 +315,17 @@ def reproduce_one(
345315
queue: bool = False,
346316
tmp_dir: bool = False,
347317
checkpoint_resume: Optional[str] = None,
318+
reset: bool = False,
348319
**kwargs,
349320
):
350321
"""Reproduce and checkout a single experiment."""
322+
if queue and not checkpoint_resume:
323+
reset = True
324+
325+
if reset:
326+
self.reset_checkpoints()
327+
kwargs["force"] = True
328+
351329
if not (queue or tmp_dir):
352330
staged, _, _ = self.scm.status()
353331
if staged:
@@ -370,7 +348,9 @@ def reproduce_one(
370348
else:
371349
checkpoint_resume = self._workspace_resume_rev()
372350

373-
stash_rev = self.new(checkpoint_resume=checkpoint_resume, **kwargs)
351+
stash_rev = self.new(
352+
checkpoint_resume=checkpoint_resume, reset=reset, **kwargs
353+
)
374354
if queue:
375355
logger.info(
376356
"Queued experiment '%s' for future execution.", stash_rev[:7],
@@ -709,7 +689,6 @@ def _workspace_repro(self) -> Mapping[str, str]:
709689
# result in conflict between workspace params and stashed CLI params).
710690
self.scm.reset(hard=True)
711691
with self.scm.detach_head(entry.rev):
712-
self._prune_untracked_lockfiles()
713692
rev = self.stash.pop()
714693
self.scm.set_ref(EXEC_BASELINE, entry.baseline_rev)
715694
if entry.branch:

dvc/repo/experiments/executor/base.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -259,24 +259,27 @@ def filter_pipeline(stages):
259259
"Executor repro with force = '%s'", str(repro_force)
260260
)
261261

262-
# NOTE: for checkpoint experiments we handle persist outs slightly
263-
# differently than normal:
262+
# NOTE: checkpoint outs are handled as a special type of persist
263+
# out:
264264
#
265265
# - checkpoint out may not yet exist if this is the first time this
266266
# experiment has been run, this is not an error condition for
267267
# experiments
268-
# - at the start of a repro run, we need to remove the persist out
269-
# and restore it to its last known (committed) state (which may
270-
# be removed/does not yet exist) so that our executor workspace
271-
# is not polluted with the (persistent) out from an unrelated
272-
# experiment run
268+
# - if experiment was run with --reset, the checkpoint out will be
269+
# removed at the start of the experiment (regardless of any
270+
# dvc.lock entry for the checkpoint out)
271+
# - if run without --reset, the checkpoint out will be checked out
272+
# using any hash present in dvc.lock (or removed if no entry
273+
# exists in dvc.lock)
274+
checkpoint_reset = kwargs.pop("reset", False)
273275
dvc_checkout(
274276
dvc,
275277
targets=targets,
276278
with_deps=targets is not None,
277279
force=True,
278280
quiet=True,
279281
allow_missing=True,
282+
checkpoint_reset=checkpoint_reset,
280283
)
281284

282285
checkpoint_func = partial(

dvc/repo/experiments/run.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ def run(
1515
run_all: bool = False,
1616
jobs: int = 1,
1717
tmp_dir: bool = False,
18-
reset: bool = False,
1918
**kwargs,
2019
) -> dict:
2120
"""Reproduce the specified targets as an experiment.
@@ -25,10 +24,6 @@ def run(
2524
Returns a dict mapping new experiment SHAs to the results
2625
of `repro` for that experiment.
2726
"""
28-
if reset:
29-
repo.experiments.reset_checkpoints()
30-
kwargs["force"] = True
31-
3227
if run_all:
3328
return repo.experiments.reproduce_queued(jobs=jobs)
3429

tests/func/experiments/test_checkpoints.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,6 @@ def test_reset_checkpoint(
8888
checkpoint_stage.addressing, name="foo", tmp_dir=not workspace,
8989
)
9090

91-
if workspace:
92-
scm.reset(hard=True)
93-
scm.gitpython.repo.git.clean(force=True)
94-
9591
results = dvc.experiments.run(
9692
checkpoint_stage.addressing,
9793
params=["foo=2"],

tests/func/experiments/test_experiments.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -418,28 +418,6 @@ def test_untracked(tmp_dir, scm, dvc, caplog, workspace):
418418
assert fobj.read().strip() == "foo: 2"
419419

420420

421-
@pytest.mark.parametrize("workspace", [True, False])
422-
def test_dirty_lockfile(tmp_dir, scm, dvc, exp_stage, workspace):
423-
from dvc.dvcfile import LockfileCorruptedError
424-
425-
tmp_dir.gen("dvc.lock", "foo")
426-
427-
with pytest.raises(LockfileCorruptedError):
428-
dvc.reproduce(exp_stage.addressing)
429-
430-
results = dvc.experiments.run(
431-
exp_stage.addressing, params=["foo=2"], tmp_dir=not workspace
432-
)
433-
exp = first(results)
434-
435-
fs = scm.get_fs(exp)
436-
with fs.open(tmp_dir / "metrics.yaml") as fobj:
437-
assert fobj.read().strip() == "foo: 2"
438-
439-
if not workspace:
440-
assert (tmp_dir / "dvc.lock").read_text() == "foo"
441-
442-
443421
def test_packed_args_exists(tmp_dir, scm, dvc, exp_stage, caplog):
444422
from dvc.repo.experiments.executor.base import BaseExecutor
445423

0 commit comments

Comments
 (0)