Skip to content

Commit

Permalink
Fix intermittent failures in test_dfencoder_distributed_e2e test (nv-…
Browse files Browse the repository at this point in the history
…morpheus#1113)

* Call the `manual_seed` method from within the subprocess, this ensures the subprocess runs deterministically.
* Add a sleep to a busy-loop in `morpheus/models/dfencoder/multiprocessing.py`
* Misc pylint fixes

fixes nv-morpheus#1021

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Christopher Harris (https://github.com/cwharris)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#1113
  • Loading branch information
dagardner-nv committed Aug 22, 2023
1 parent aba421e commit 0853e82
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
3 changes: 2 additions & 1 deletion morpheus/models/dfencoder/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import multiprocessing
import signal
import sys
import time


def _wrap(fn, i, args, error_queue):
Expand Down Expand Up @@ -62,4 +63,4 @@ def start_processes(fn, args=(), nprocs=1, join=True, daemon=False, start_method

# Loop on join until it returns True or raises an exception.
while not context.join():
pass
time.sleep(0.1)
16 changes: 10 additions & 6 deletions tests/dfencoder/test_dfencoder_distributed_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# This must come before torch
# isort: off
import cudf # noqa: F401
import cudf # noqa: F401 pylint: disable=unused-import
# isort: on

import json
Expand Down Expand Up @@ -106,7 +106,6 @@ def cleanup_dist():


@pytest.mark.slow
@pytest.mark.usefixtures("manual_seed")
def test_dfencoder_distributed_e2e():

world_size = 1
Expand All @@ -117,14 +116,19 @@ def test_dfencoder_distributed_e2e():


def _run_test(rank, world_size):
from morpheus.utils import seed as seed_utils
seed_utils.manual_seed(42)

import torch
torch.cuda.set_device(rank)

setup_dist(rank, world_size)

preset_cats = json.load(open(PRESET_CATS_FILEPATH, 'r'))
preset_numerical_scaler_params = json.load(open(PRESET_NUMERICAL_SCALER_PARAMS_FILEPATH, 'r'))
with open(PRESET_CATS_FILEPATH, 'r', encoding='utf-8') as fh:
preset_cats = json.load(fh)

with open(PRESET_NUMERICAL_SCALER_PARAMS_FILEPATH, 'r', encoding='utf-8') as fh:
preset_numerical_scaler_params = json.load(fh)

# Initializing model
model = AutoEncoder(
Expand Down Expand Up @@ -171,9 +175,9 @@ def _run_test(rank, world_size):
# Make sure model converges (low loss)
for loss_type in LOSS_TYPES:
ft_losses = getattr(model.logger, f"{loss_type}_fts")
for ft, losses_l in ft_losses.items():
for feature, losses_l in ft_losses.items():
losses = losses_l[1]
assert min(losses) < LOSS_TARGETS[loss_type][ft] * LOSS_TOLERANCE_RATIO
assert min(losses) < LOSS_TARGETS[loss_type][feature] * LOSS_TOLERANCE_RATIO

# Inference
inf_dataset = DatasetFromPath(
Expand Down

0 comments on commit 0853e82

Please sign in to comment.