Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dfanalyzer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def analyze_trace(
view_types: Optional[List[ViewType]] = None,
extra_columns: Optional[Dict[str, str]] = None,
extra_columns_fn: Optional[Callable[[dict], dict]] = None,
trace_path: Optional[str] = None,
checkpoint_dir: Optional[str] = None
):
"""Analyze the trace using the configured analyzer."""
return self.analyzer.analyze_trace(
Expand All @@ -53,9 +55,10 @@ def analyze_trace(
extra_columns_fn=extra_columns_fn,
logical_view_types=self.hydra_config.logical_view_types,
metric_boundaries=OmegaConf.to_object(self.hydra_config.metric_boundaries),
trace_path=self.hydra_config.trace_path,
trace_path=trace_path or self.hydra_config.trace_path,
unoverlapped_posix_only=self.hydra_config.unoverlapped_posix_only,
view_types=self.hydra_config.view_types if not view_types else view_types,
checkpoint_dir=checkpoint_dir,
)

def shutdown(self):
Expand Down
38 changes: 38 additions & 0 deletions dfanalyzer/ai_dftracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Callable, Dict, List, Optional

import pandas as pd

from .constants import (
COL_EPOCH,
COL_FUNC_NAME,
COL_PROC_NAME,
COL_TIME_RANGE,
AIDFTracer,
)
from .types import (
ViewMetricBoundaries,
ViewType,
)
from .dftracer import DFTracerAnalyzer


class AIDFTracerAnalyzer(DFTracerAnalyzer):
def postread_trace(self, traces, view_types):
traces = super().postread_trace(traces, view_types)
epochs = (
traces.query(AIDFTracer.get_epoch_query())
.groupby([COL_PROC_NAME, COL_FUNC_NAME])
.agg({COL_TIME_RANGE: list})
)
epochs[COL_EPOCH] = epochs[COL_TIME_RANGE].apply(lambda x: list(range(1, len(x) + 1)))
epochs = (
epochs.explode([COL_TIME_RANGE, COL_EPOCH])
.groupby(COL_EPOCH)
.min()
.reset_index()
.astype('uint64[pyarrow]')
)
traces = traces.map_partitions(self._set_epochs, epochs=epochs)
traces[COL_EPOCH] = traces[COL_EPOCH].replace({0: pd.NA}).astype('uint64[pyarrow]')

return traces
133 changes: 130 additions & 3 deletions dfanalyzer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from omegaconf import MISSING
from typing import Any, Dict, List, Optional

from .constants import COL_TIME_RANGE, VIEW_TYPES
from .constants import COL_TIME_RANGE, VIEW_TYPES, AIDFTracer
from .types import ViewMetricBoundaries
from .utils.env_utils import get_bool_env_var, get_int_env_var

Expand All @@ -24,6 +24,8 @@
'stat': 'io_cat == 3 and func_name.str.contains("stat")',
'other': 'io_cat == 6',
'sync': 'io_cat == 7',
'nondata': 'io_cat != 1 and io_cat != 2',
'all': 'io_cat == 1 or io_cat == 2 or io_cat == 3 or io_cat == 6 or io_cat == 7',
}
HASH_CHECKPOINT_NAMES = get_bool_env_var("DFANALYZER_HASH_CHECKPOINT_NAMES", False)

Expand Down Expand Up @@ -126,8 +128,6 @@ class AnalyzerPresetConfigDLIO(AnalyzerPresetConfig):
'checkpoint_posix_lustre': 'cat.str.contains("posix|stdio") & cat.str.contains("_checkpoint_lustre")',
'checkpoint_posix_ssd': 'cat.str.contains("posix|stdio") & cat.str.contains("_checkpoint_ssd")',
'other_posix': 'cat.isin(["posix", "stdio"])',
# 'other_posix_lustre': 'cat.isin(["posix_lustre", "stdio_lustre"])',
# 'other_posix_ssd': 'cat.isin(["posix_ssd", "stdio_ssd"])',
}
)
layer_deps: Optional[Dict[str, Optional[str]]] = dc.field(
Expand Down Expand Up @@ -173,6 +173,127 @@ class AnalyzerPresetConfigDLIO(AnalyzerPresetConfig):
)


@dc.dataclass
class AnalyzerPresetConfigAIDFtracer(AnalyzerPresetConfig):
additional_metrics: Optional[Dict[str, Optional[str]]] = dc.field(
default_factory=lambda: {
'compute_util': 'compute_{time_metric}.fillna(0).astype("float") / (epoch_{time_metric}.astype("float") + {epsilon})',
'data_loader_fetch_util': 'data_loader_fetch_{time_metric}.fillna(0).astype("float") / (epoch_{time_metric}.astype("float") + {epsilon})',
'checkpoint_util': 'checkpoint_{time_metric}.fillna(0).astype("float") / (epoch_{time_metric}.astype("float") + {epsilon})',
}
)
derived_metrics: Optional[Dict[str, Dict[str, str]]] = dc.field(
default_factory=lambda: {
'app': {},
'training': {},
'epoch': {},
'comm': {
"send": f'func_name == "{AIDFTracer.Communication.SEND}"',
"receive": f'func_name == "{AIDFTracer.Communication.RECEIVE}"',
"barrier": f'func_name == "{AIDFTracer.Communication.BARRIER}"',
"bcast": f'func_name == "{AIDFTracer.Communication.BCAST}"',
"reduce": f'func_name == "{AIDFTracer.Communication.REDUCE}"',
"all_reduce": f'func_name == "{AIDFTracer.Communication.ALL_REDUCE}"',
"gather": f'func_name == "{AIDFTracer.Communication.GATHER}"',
"all_gather": f'func_name == "{AIDFTracer.Communication.ALL_GATHER}"',
"scatter": f'func_name == "{AIDFTracer.Communication.SCATTER}"',
"reduce_scatter": f'func_name == "{AIDFTracer.Communication.REDUCE_SCATTER}"',
"all_to_all": f'func_name == "{AIDFTracer.Communication.ALL_TO_ALL}"',
},
'device': {
'transfer': f'func_name == "{AIDFTracer.Device.TRANSFER}"',
},
'compute': {
'step': f'func_name == "{AIDFTracer.Compute.STEP}" | func_name == "{AIDFTracer.Category.COMPUTE}"',
'forward': f'func_name == "{AIDFTracer.Compute.FORWARD}"',
'backward': f'func_name == "{AIDFTracer.Compute.BACKWARD}"',
},
'data_loader': {
'init': f'func_name == "{AIDFTracer.get_init(AIDFTracer.Category.DATALOADER)}"',
'fetch': f'func_name == "{AIDFTracer.get_iter(AIDFTracer.DataLoader.FETCH)}"',
},
'data_loader_worker': {
'fork': 'func_name == "fork"',
'spawn': 'func_name == "spawn"',
},
'data': {
'init': f'func_name == "{AIDFTracer.get_init(AIDFTracer.Category.DATA)}"',
'item': f'func_name == "{AIDFTracer.Data.ITEM}"',
'preprocess': f'func_name == "{AIDFTracer.Data.PREPROCESS}"',
},
'data_posix': DERIVED_POSIX_METRICS,
'checkpoint': {
"capture": f'func_name == "{AIDFTracer.Checkpoint.CAPTURE}"',
"restart": f'func_name == "{AIDFTracer.Checkpoint.RESTART}"',
},
'checkpoint_posix': DERIVED_POSIX_METRICS,
'posix': DERIVED_POSIX_METRICS,
}
)
layer_defs: Dict[str, Optional[str]] = dc.field(
default_factory=lambda: {
'app': f'cat == "{AIDFTracer.ROOT_CAT}" & func_name == "{AIDFTracer.ROOT_NAME}"',
'training': f'cat == "{AIDFTracer.Category.PIPELINE}" & func_name == "{AIDFTracer.Pipeline.TRAIN}"',
'epoch': AIDFTracer.get_epoch_query(),
'device': f'cat == "{AIDFTracer.Category.DEVICE}"',
'compute': f'cat == "{AIDFTracer.Category.COMPUTE}"',
'data_loader': f'cat == "{AIDFTracer.Category.DATALOADER}"',
'comm': f'cat == "{AIDFTracer.Category.COMM}"',
'data_loader_worker': 'cat == "posix" & func_name.isin(["fork", "spawn"])',
'data': f'cat == "{AIDFTracer.Category.DATA}"',
'data_posix': 'cat.str.contains("posix|stdio") & cat.str.contains("_reader")',
'checkpoint': f'cat == "{AIDFTracer.Category.CHECKPOINT}"',
'checkpoint_posix': 'cat.str.contains("posix|stdio") & cat.str.contains("_checkpoint")',
'posix': 'cat.str.contains("posix|stdio")',
}
)
layer_deps: Optional[Dict[str, Optional[str]]] = dc.field(
default_factory=lambda: {
'app': None,
'training': 'app',
'epoch': 'training',
'comm': 'epoch',
'device': 'epoch',
'compute': 'epoch',
'data_loader': 'epoch',
'data_loader_worker': 'data_loader',
'data': 'epoch',
'data_posix': 'data',
'posix_lustre': 'posix',
'checkpoint': 'training',
'checkpoint_posix': 'checkpoint',
'posix': None,
}
)
logical_views: Optional[Dict[str, Dict[str, Optional[str]]]] = dc.field(
default_factory=lambda: {
'file_name': {
'file_dir': None,
'file_pattern': None,
},
'proc_name': {
'host_name': 'proc_name.str.split("#").str[1]',
'proc_id': 'proc_name.str.split("#").str[2]',
'thread_id': 'proc_name.str.split("#").str[3]',
},
}
)
name: str = "ai_dftracer"
threaded_layers: Optional[List[str]] = dc.field(
default_factory=lambda: [
'data_loader',
'data_loader_worker',
'data',
'data_posix',
]
)
unscored_metrics: Optional[List[str]] = dc.field(
default_factory=lambda: [
'consumer_rate',
'producer_rate',
]
)

@dc.dataclass
class AnalyzerConfig:
checkpoint: Optional[bool] = True
Expand All @@ -198,6 +319,10 @@ class DFTracerAnalyzerConfig(AnalyzerConfig):
time_granularity: Optional[float] = 1
time_resolution: Optional[float] = 1e6

@dc.dataclass
class AIDFTracerAnalyzerConfig(DFTracerAnalyzerConfig):
_target_: str = "dfanalyzer.ai_dftracer.AIDFTracerAnalyzer"


@dc.dataclass
class RecorderAnalyzerConfig(AnalyzerConfig):
Expand Down Expand Up @@ -358,9 +483,11 @@ def init_hydra_config_store() -> ConfigStore:
cs.store(name="config", node=Config)
cs.store(group="analyzer", name="darshan", node=DarshanAnalyzerConfig)
cs.store(group="analyzer", name="dftracer", node=DFTracerAnalyzerConfig)
cs.store(group="analyzer", name="ai_dftracer", node=AIDFTracerAnalyzerConfig)
cs.store(group="analyzer", name="recorder", node=RecorderAnalyzerConfig)
cs.store(group="analyzer/preset", name="posix", node=AnalyzerPresetConfigPOSIX)
cs.store(group="analyzer/preset", name="dlio", node=AnalyzerPresetConfigDLIO)
cs.store(group="analyzer/preset", name="ai_dftracer", node=AnalyzerPresetConfigAIDFtracer)
cs.store(group="cluster", name="external", node=ExternalClusterConfig)
cs.store(group="cluster", name="local", node=LocalClusterConfig)
cs.store(group="cluster", name="lsf", node=LSFClusterConfig)
Expand Down
87 changes: 85 additions & 2 deletions dfanalyzer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ class Layer(StrEnum):
('proc_name', 'thread_id'),
]
VIEW_TYPES = [
'file_name',
'proc_name',
'time_range',
]

Expand Down Expand Up @@ -304,3 +302,88 @@ class Layer(StrEnum):
EVENT_DET_CHAR = 'detect_characteristics'
EVENT_READ_TRACES = 'read_traces'
EVENT_SAVE_VIEWS = 'save_views'

#########################################################
# CONSTANTS
# Copied from dftracer/logger/ai.py
#########################################################

# namespacing for AI DFTracer
class AIDFTracer:
ROOT_NAME = "ai_root"
ROOT_CAT = "ai_root"
ITER_COUNT_NAME = "count"
INIT_NAME = "init"
ITER_COUNT_NAME = "count"
INIT_NAME = "init"
BLOCK_NAME = "block"
ITER_NAME = "iter"
CTX_SEPARATOR = "."

class Category(StrEnum):
COMPUTE = "compute"
DATA = "data"
DATALOADER = "dataloader"
COMM = "comm"
DEVICE = "device"
CHECKPOINT = "checkpoint"
PIPELINE = "pipeline"

class Compute(StrEnum):
FORWARD = "forward"
BACKWARD = "backward"
STEP = "step"

class Data(StrEnum):
PREPROCESS = "preprocess"
ITEM = "item"

class DataLoader(StrEnum):
FETCH = "fetch"

class Communication(StrEnum):
SEND = "send"
RECEIVE = "receive"
BARRIER = "barrier"
BCAST = "bcast"
REDUCE = "reduce"
ALL_REDUCE = "all_reduce"
GATHER = "gather"
ALL_GATHER = "all_gather"
SCATTER = "scatter"
REDUCE_SCATTER = "reduce_scatter"
ALL_TO_ALL = "all_to_all"

class Device(StrEnum):
TRANSFER = "transfer"

class Checkpoint(StrEnum):
CAPTURE = "capture"
RESTART = "restart"

class Pipeline(StrEnum):
EPOCH = "epoch"
TRAIN = "train"
EVALUATE = "evaluate"
TEST = "test"

@staticmethod
def get_block(func_name: str) -> str:
return f"{func_name}{AIDFTracer.CTX_SEPARATOR}{AIDFTracer.BLOCK_NAME}"

@staticmethod
def get_iter(func_name: str) -> str:
return f"{func_name}{AIDFTracer.CTX_SEPARATOR}{AIDFTracer.ITER_NAME}"

@staticmethod
def get_init(func_name: str) -> str:
return f"{func_name}{AIDFTracer.CTX_SEPARATOR}{AIDFTracer.INIT_NAME}"

@staticmethod
def get_epoch_query() -> str:
return f'(cat == "{AIDFTracer.Category.PIPELINE}" & (func_name == "{AIDFTracer.get_block(AIDFTracer.Pipeline.EPOCH)}" | func_name == "{AIDFTracer.Pipeline.EPOCH}"))'


#########################################################


1 change: 1 addition & 0 deletions dfanalyzer/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ py.install_sources(
[
'__init__.py',
'__main__.py',
'ai_dftracer.py',
'analysis.py',
'analysis_utils.py',
'analyzer.py',
Expand Down
Loading