Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging for influence modules #567

Merged
merged 9 commits into from
May 7, 2024
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,30 @@

### Added

- Add progress bars to the computation of `LazyChunkSequence` and
`NestedLazyChunkSequence`
[PR #567](https://github.com/aai-institute/pyDVL/pull/567)
- Add a device fixture for `pytest`, which depending on the availability and
user input (`pytest --with-cuda`) resolves to cuda device
[PR #574](https://github.com/aai-institute/pyDVL/pull/574)

### Fixed

- Fixed logging issue in decorator `log_duration`
[PR #567](https://github.com/aai-institute/pyDVL/pull/567)
- Fixed missing move of tensors to model device in `EkfacInfluence`
implementation [PR #570](https://github.com/aai-institute/pyDVL/pull/570)
- Missing move to device of `preconditioner` in `CgInfluence` implementation
[PR #572](https://github.com/aai-institute/pyDVL/pull/572)

### Changed

- Changed logging behavior of iterative methods `LissaInfluence` and
`CgInfluence` to warn on not achieving desired tolerance within `maxiter`,
add parameter `warn_on_max_iteration` to set the level for this information
to `logging.DEBUG`
[PR #567](https://github.com/aai-institute/pyDVL/pull/567)

## 0.9.1 - Bug fixes, logging improvement

### Fixed
Expand Down
113 changes: 86 additions & 27 deletions src/pydvl/influence/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,25 @@
(chunked in one resp. two dimensions), with support for efficient storage and retrieval
using the Zarr library.
"""
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import Callable, Generator, Generic, List, Optional, Tuple, Union
from typing import (
Callable,
Generator,
Generic,
Iterator,
List,
Optional,
Tuple,
Union,
cast,
)

import zarr
from numpy.typing import NDArray
from tqdm import tqdm
from zarr.storage import StoreLike

from ..utils import log_duration
Expand All @@ -35,9 +47,12 @@ def from_numpy(self, x: NDArray) -> TensorType:

class SequenceAggregator(Generic[TensorType], ABC):
@abstractmethod
def __call__(self, tensor_generator: Generator[TensorType, None, None]):
def __call__(
self,
tensor_sequence: LazyChunkSequence,
):
"""
Aggregates tensors from a generator.
Aggregates tensors from a sequence.

Implement this method to define how a sequence of tensors, provided by a
generator, should be combined.
Expand All @@ -46,31 +61,37 @@ def __call__(self, tensor_generator: Generator[TensorType, None, None]):

class ListAggregator(SequenceAggregator):
def __call__(
self, tensor_generator: Generator[TensorType, None, None]
self,
tensor_sequence: LazyChunkSequence,
) -> List[TensorType]:
"""
Aggregates tensors from a single-level generator into a list. This method simply
collects each tensor emitted by the generator into a single list.

Args:
tensor_generator: A generator that yields TensorType objects.
tensor_sequence: Object wrapping a generator that yields `TensorType`
objects.

Returns:
A list containing all the tensors provided by the tensor_generator.
"""
return [t for t in tensor_generator]

gen = cast(Iterator[TensorType], tensor_sequence.generator_factory())

if tensor_sequence.len_generator is not None:
gen = cast(
Iterator[TensorType],
tqdm(gen, total=tensor_sequence.len_generator, desc="Blocks"),
)

return [t for t in gen]


class NestedSequenceAggregator(Generic[TensorType], ABC):
@abstractmethod
def __call__(
self,
nested_generators_of_tensors: Generator[
Generator[TensorType, None, None], None, None
],
):
def __call__(self, nested_sequence_of_tensors: NestedLazyChunkSequence):
"""
Aggregates tensors from a generator of generators.
Aggregates tensors from a nested sequence of tensors.

Implement this method to specify how tensors, nested in two layers of
generators, should be combined. Useful for complex data structures where tensors
Expand All @@ -81,27 +102,36 @@ def __call__(
class NestedListAggregator(NestedSequenceAggregator):
def __call__(
self,
nested_generators_of_tensors: Generator[
Generator[TensorType, None, None], None, None
],
nested_sequence_of_tensors: NestedLazyChunkSequence,
) -> List[List[TensorType]]:
"""
Aggregates tensors from a nested generator structure into a list of lists.
Each inner generator is converted into a list of tensors, resulting in a nested
list structure.

Args:
nested_generators_of_tensors: A generator of generators, where each inner
generator yields TensorType objects.
nested_sequence_of_tensors: Object wrapping a generator of generators,
where each inner generator yields TensorType objects.

Returns:
A list of lists, where each inner list contains tensors returned from one
of the inner generators.
"""
return [list(tensor_gen) for tensor_gen in nested_generators_of_tensors]
outer_gen = cast(
Iterator[Iterator[TensorType]],
nested_sequence_of_tensors.generator_factory(),
)
len_outer_gen = nested_sequence_of_tensors.len_outer_generator
if len_outer_gen is not None:
outer_gen = cast(
Iterator[Iterator[TensorType]],
tqdm(outer_gen, total=len_outer_gen, desc="Row blocks"),
)

return [list(tensor_gen) for tensor_gen in outer_gen]

class LazyChunkSequence:

class LazyChunkSequence(Generic[TensorType]):
"""
A class representing a chunked, and lazily evaluated array,
where the chunking is restricted to the first dimension
Expand All @@ -114,12 +144,18 @@ class LazyChunkSequence:
Attributes:
generator_factory: A factory function that returns
a generator. This generator yields chunks of the large array when called.
len_generator: if the number of elements from the generator is
known from the context, this optional parameter can be used to improve
logging by adding a progressbar.
"""

def __init__(
self, generator_factory: Callable[[], Generator[TensorType, None, None]]
self,
generator_factory: Callable[[], Generator[TensorType, None, None]],
len_generator: Optional[int] = None,
):
self.generator_factory = generator_factory
self.len_generator = len_generator

@log_duration(log_level=logging.INFO)
def compute(self, aggregator: Optional[SequenceAggregator] = None):
Expand All @@ -140,7 +176,7 @@ def compute(self, aggregator: Optional[SequenceAggregator] = None):
"""
if aggregator is None:
aggregator = ListAggregator()
return aggregator(self.generator_factory())
return aggregator(self)

@log_duration(log_level=logging.INFO)
def to_zarr(
Expand Down Expand Up @@ -171,7 +207,15 @@ def to_zarr(
"""
row_idx = 0
z = None
for block in self.generator_factory():

gen = cast(Iterator[TensorType], self.generator_factory())

if self.len_generator is not None:
gen = cast(
Iterator[TensorType], tqdm(gen, total=self.len_generator, desc="Blocks")
)

for block in gen:
numpy_block = converter.to_numpy(block)

if z is None:
Expand Down Expand Up @@ -204,7 +248,7 @@ def _initialize_zarr_array(block: NDArray, path_or_url: str, overwrite: bool):
)


class NestedLazyChunkSequence:
class NestedLazyChunkSequence(Generic[TensorType]):
"""
A class representing chunked, and lazily evaluated array, where the chunking is
restricted to the first two dimensions.
Expand All @@ -216,16 +260,21 @@ class NestedLazyChunkSequence:

Attributes:
generator_factory: A factory function that returns a generator of generators.
Each inner generator yields chunks.
Each inner generator yields chunks
len_outer_generator: if the number of elements from the outer generator is
known from the context, this optional parameter can be used to improve
logging by adding a progressbar.
"""

def __init__(
self,
generator_factory: Callable[
[], Generator[Generator[TensorType, None, None], None, None]
],
len_outer_generator: Optional[int] = None,
):
self.generator_factory = generator_factory
self.len_outer_generator = len_outer_generator

@log_duration(log_level=logging.INFO)
def compute(self, aggregator: Optional[NestedSequenceAggregator] = None):
Expand All @@ -247,7 +296,7 @@ def compute(self, aggregator: Optional[NestedSequenceAggregator] = None):
"""
if aggregator is None:
aggregator = NestedListAggregator()
return aggregator(self.generator_factory())
return aggregator(self)

@log_duration(log_level=logging.INFO)
def to_zarr(
Expand Down Expand Up @@ -280,7 +329,17 @@ def to_zarr(
row_idx = 0
z = None
numpy_block = None
for row_blocks in self.generator_factory():
block_generator = cast(Iterator[Iterator[TensorType]], self.generator_factory())

if self.len_outer_generator is not None:
block_generator = cast(
Iterator[Iterator[TensorType]],
tqdm(
block_generator, total=self.len_outer_generator, desc="Row blocks"
),
)

for row_blocks in block_generator:
col_idx = 0
for block in row_blocks:
numpy_block = converter.to_numpy(block)
Expand Down
2 changes: 2 additions & 0 deletions src/pydvl/influence/base_influence_function_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from enum import Enum
from typing import Collection, Generic, Iterable, Optional, Type, TypeVar

__all__ = ["InfluenceMode"]


class InfluenceMode(str, Enum):
"""
Expand Down
31 changes: 27 additions & 4 deletions src/pydvl/influence/influence_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import logging
from functools import partial
from typing import Generator, Iterable, Optional, Tuple, Type, Union
from typing import Generator, Iterable, Optional, Sized, Tuple, Type, Union, cast

import distributed
from dask import array as da
Expand Down Expand Up @@ -619,8 +619,14 @@ def influence_factors(
Returns:
A lazy data structure representing the chunks of the resulting tensor
"""
try:
len_iterable = len(cast(Sized, data_iterable))
except Exception as e:
logger.debug(f"Failed to retrieve len of data iterable: {e}")
len_iterable = None

tensors_gen_factory = partial(self._influence_factors_gen, data_iterable)
return LazyChunkSequence(tensors_gen_factory)
return LazyChunkSequence(tensors_gen_factory, len_generator=len_iterable)

def _influences_gen(
self,
Expand Down Expand Up @@ -677,7 +683,15 @@ def influences(
mode,
)

return NestedLazyChunkSequence(nested_tensor_gen_factory)
try:
len_iterable = len(cast(Sized, test_data_iterable))
except Exception as e:
logger.debug(f"Failed to retrieve len of test data iterable: {e}")
len_iterable = None

return NestedLazyChunkSequence(
nested_tensor_gen_factory, len_outer_generator=len_iterable
)

def _influences_from_factors_gen(
self,
Expand Down Expand Up @@ -735,4 +749,13 @@ def influences_from_factors(
train_data_iterable,
mode,
)
return NestedLazyChunkSequence(nested_tensor_gen)

try:
len_iterable = len(cast(Sized, z_test_factors))
except Exception as e:
logger.debug(f"Failed to retrieve len of factors iterable: {e}")
len_iterable = None

return NestedLazyChunkSequence(
nested_tensor_gen, len_outer_generator=len_iterable
)
Loading
Loading