Skip to content

Fix relative cav #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions tcav/activation_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import PIL.Image
import six
import tensorflow as tf
from .utils import CONCEPT_SEPARATOR

class ActivationGeneratorInterface(six.with_metaclass(ABCMeta, object)):
"""Interface for an activation generator for a model"""

@abstractmethod
def process_and_load_activations(self, bottleneck_names, concepts):
def process_and_load_activations(self, bottleneck_names, concepts, is_relative_tcav=False):
pass

@abstractmethod
Expand All @@ -50,43 +51,44 @@ def get_model(self):
return self.model

@abstractmethod
def get_examples_for_concept(self, concept):
def get_examples_for_concept(self, concept, is_relative_tcav=False):
pass

def get_activations_for_concept(self, concept, bottleneck):
examples = self.get_examples_for_concept(concept)
def get_activations_for_concept(self, concept, bottleneck, is_relative_tcav=False):
examples = self.get_examples_for_concept(concept, is_relative_tcav)
return self.get_activations_for_examples(examples, bottleneck)

def get_activations_for_examples(self, examples, bottleneck):
acts = self.model.run_examples(examples, bottleneck)
return self.model.reshape_activations(acts).squeeze()

def process_and_load_activations(self, bottleneck_names, concepts):
def process_and_load_activations(self, bottleneck_names, concepts, is_relative_tcav=False):
acts = {}
if self.acts_dir and not tf.io.gfile.exists(self.acts_dir):
tf.io.gfile.makedirs(self.acts_dir)

for concept in concepts:
if concept not in acts:
acts[concept] = {}

for bottleneck_name in bottleneck_names:
acts_path = os.path.join(self.acts_dir, 'acts_{}_{}'.format(
concept, bottleneck_name)) if self.acts_dir else None
acts_path = os.path.join(self.acts_dir, f'acts_{concept}_{bottleneck_name}') if self.acts_dir else None

if acts_path and tf.io.gfile.exists(acts_path):
# load activations from file
with tf.io.gfile.GFile(acts_path, 'rb') as f:
acts[concept][bottleneck_name] = np.load(
f, allow_pickle=True).squeeze()
tf.compat.v1.logging.info('Loaded {} shape {}'.format(
acts_path, acts[concept][bottleneck_name].shape))
acts[concept][bottleneck_name] = np.load(f, allow_pickle=True).squeeze()
tf.compat.v1.logging.info(f'Loaded {acts_path} shape {acts[concept][bottleneck_name].shape}')
else:
acts[concept][bottleneck_name] = self.get_activations_for_concept(
concept, bottleneck_name)
# compute and save activations
acts[concept][bottleneck_name] = self.get_activations_for_concept(concept, bottleneck_name, is_relative_tcav)

if acts_path:
tf.compat.v1.logging.info(
'{} does not exist, Making one...'.format(acts_path))
tf.compat.v1.logging.info(f'{acts_path} does not exist, Making one...')
tf.io.gfile.mkdir(os.path.dirname(acts_path))
with tf.io.gfile.GFile(acts_path, 'w') as f:
np.save(f, acts[concept][bottleneck_name], allow_pickle=False)

return acts


Expand All @@ -110,13 +112,22 @@ def __init__(self,
super(ImageActivationGenerator, self).__init__(model, acts_dir,
max_examples)

def get_examples_for_concept(self, concept):
concept_dir = os.path.join(self.source_dir, concept)
img_paths = [
os.path.join(concept_dir, d) for d in tf.io.gfile.listdir(concept_dir)
]
def get_examples_for_concept(self, concept_names, is_relative_tcav=False):
if is_relative_tcav:
concepts = concept_names.split(CONCEPT_SEPARATOR)
else:
concepts = [concept_names]

img_paths = []
for concept in concepts:
concept_dir = os.path.join(self.source_dir, concept)
img_paths.append(
[os.path.join(concept_dir, d) for d in tf.io.gfile.listdir(concept_dir)]
)
img_paths = np.asarray(img_paths).flatten()
image_shape = self.model.get_image_shape()[:2]
imgs = self.load_images_from_files(
img_paths, self.max_examples, shape=self.model.get_image_shape()[:2])
img_paths, self.max_examples, shape=image_shape)
return imgs

def load_image_from_file(self, filename, shape):
Expand Down Expand Up @@ -219,7 +230,7 @@ def __init__(self, model, source_dir, acts_dir, max_examples):
super(DiscreteActivationGeneratorBase, self).__init__(
model=model, acts_dir=acts_dir, max_examples=max_examples)

def get_examples_for_concept(self, concept):
def get_examples_for_concept(self, concept, is_relative_tcav=False):
"""Extracts examples for a concept and transforms them to the desired format.

Args:
Expand All @@ -233,6 +244,10 @@ def get_examples_for_concept(self, concept):
load_data() and transform_data() functions.

"""
if is_relative_tcav:
# Needs to be implemented
raise NotImplementedError()

data = self.load_data(concept)
data_parsed = self.transform_data(data)
return data_parsed
Expand Down
241 changes: 241 additions & 0 deletions tcav/activation_generator_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import os
import tempfile
import numpy as np
from tensorflow.python.platform import googletest
from PIL import Image
import weakref as _weakref
import sys

from tcav.activation_generator import ImageActivationGenerator
from tcav.tcav_examples.discrete.kdd99_activation_generator import KDD99DiscreteActivationGenerator
from tcav.tcav import TCAV
from tcav.utils import CONCEPT_SEPARATOR

IMG_SHAPE = (28, 28, 3)
MAX_TEST_IMAGES = 255


class TemporaryDirectory(tempfile.TemporaryDirectory):
"""
Create temporary directories as defined int tempfile.TemporaryDirectory but when
prefix starts with 'random500' do not add unique directory name, but keep directory
name as is, defined by prefix+suffix.
"""
def __init__(self, suffix=None, prefix=None, dir=None):
prefix, suffix, dir, output_type = tempfile._sanitize_params(prefix, suffix, dir)
if prefix.startswith('random500_'):
self.name = os.path.join(dir, prefix + suffix)
sys.audit("tempfile.mkstemp", self.name)
os.mkdir(self.name, 0o700)
else:
self.name = tempfile.mkdtemp(suffix, prefix, dir)
self._finalizer = _weakref.finalize(
self, self._cleanup, self.name,
warn_message="Implicitly cleaning up {!r}".format(self))


def _create_random_test_image_files(dst_dir, img_shape, count, start_offset, prefix):
if (count + start_offset) > MAX_TEST_IMAGES:
raise Exception(f"Cannot create more than '{MAX_TEST_IMAGES}' in one directory")

count_magnitude = int(np.floor(np.log10(count)))
test_file_paths = []
for i in range(count):
# create image with recognizable image values
# image value corresponds to image filename,
# zero values for img_0.jpg, ones for img_1.jpg, ...
values = np.ones(shape=img_shape, dtype=np.uint8) * (i + start_offset) # because of this, max_test_images=255
img = Image.fromarray(values)
# save to disk
test_file = os.path.join(dst_dir, f"{prefix}_testfile_{i:0>{count_magnitude}d}.jpg")
img.save(test_file)

test_file_paths.append(test_file)

return test_file_paths


def _create_concept_dirs(dst_dir, concept_settings, image_shape, test_file_count, test_file_prefix):
concept_dirs = []
for concept in concept_settings.keys():
# create concept dir
concept_dir = TemporaryDirectory(prefix=concept, dir=dst_dir)

# fill concept dir with test files
start_offset = concept_settings[concept].get('start_offset')
_ = _create_random_test_image_files(concept_dir.name, image_shape, test_file_count, start_offset, test_file_prefix)

concept_dirs.append(concept_dir)

return concept_dirs


def _get_image_value(image):
# return first pixel value
return image[0, 0, 0] # this works only for image len(shape)==3, e.g., for HxWxC


class MockTestModel:
"""
A mock model of model class.
"""
def __init__(self, image_shape):
self.model_name = 'test_model'
self.image_shape = image_shape

def get_image_shape(self):
return self.image_shape


class ActivationGeneratorTest(googletest.TestCase):
def setUp(self):
self.concept_image_dir = tempfile.TemporaryDirectory()

self.image_test_file_prefix = 'img'
self.image_test_file_count = 7

self.concept_settings = {
'concept-1': {'start_offset': 0},
'concept-2': {'start_offset': self.image_test_file_count},
'concept-3': {'start_offset': self.image_test_file_count*2},
'random500_0': {'start_offset': self.image_test_file_count*3},
'random500_1': {'start_offset': self.image_test_file_count*4},
'random500_2': {'start_offset': self.image_test_file_count*5},
}
self.concept_dirs = _create_concept_dirs(self.concept_image_dir.name, self.concept_settings, IMG_SHAPE, self.image_test_file_count, self.image_test_file_prefix)
self.concepts = [os.path.basename(concept_dir.name) for concept_dir in self.concept_dirs]

self.model = MockTestModel(IMG_SHAPE)
self.max_examples = 1000
self.img_act_gen = ImageActivationGenerator(model=self.model, source_dir=self.concept_image_dir.name, acts_dir=None, max_examples=self.max_examples, normalize_image=False)

self.target = 't0'
self.bottleneck = 'bn'
self.hparams = {'model_type': 'linear', 'alpha': .01}
self.num_random_exp = 2
self.normal_tcav = TCAV(sess=None,
target=self.target,
concepts=self.concepts,
bottlenecks=[self.bottleneck],
activation_generator=self.img_act_gen,
alphas=[self.hparams['alpha']],
num_random_exp=self.num_random_exp)
self.relative_tcav = TCAV(sess=None,
target=self.target,
concepts=self.concepts,
bottlenecks=[self.bottleneck],
activation_generator=self.img_act_gen,
alphas=[self.hparams['alpha']],
random_concepts=self.concepts) # this makes it relative_tcav

def tearDown(self):
self.concept_image_dir.cleanup()

for concept_dir in self.concept_dirs:
concept_dir.cleanup()

def _get_concept_setting(self, concept_dir_name):
for concept in self.concept_settings.keys():
if concept_dir_name.startswith(concept):
return self.concept_settings[concept]
return None

def _get_expected_values_by_concept(self, concept_name, is_relative_tcav=False):
if is_relative_tcav:
concepts = concept_name.split(CONCEPT_SEPARATOR)
else:
concepts = [concept_name]

expected_set_image_values = set()
for concept in concepts:
concept_settings = self._get_concept_setting(concept)
self.assertIsNotNone(concept_settings)

start = concept_settings.get('start_offset')
end = start + self.image_test_file_count
expected_set_image_values.update(np.arange(start, end, dtype=np.float32))

return expected_set_image_values

def test_get_examples_for_concept(self):
# (target, [positive-concept, negative-concept])
concept_pairs = sorted(self.normal_tcav.pairs_to_test)

# extract concepts
concept_pair_id = 0
target, (pos_concept, neg_concept) = concept_pairs[concept_pair_id]

# get positive concept image values
pos_set_images = self.img_act_gen.get_examples_for_concept(pos_concept)
actual_pos_set_image_values = [_get_image_value(img) for img in pos_set_images]

# compute expected values for positive concept
expected_pos_set_image_values = self._get_expected_values_by_concept(pos_concept)

# test whether correct positive set images were loaded
self.assertEqual(len(expected_pos_set_image_values), len(actual_pos_set_image_values))
actual_pos_set_image_values = set(actual_pos_set_image_values)
self.assertEqual(expected_pos_set_image_values, actual_pos_set_image_values)

# get negative concept image values
neg_set_images = self.img_act_gen.get_examples_for_concept(neg_concept)
actual_neg_set_image_values = [_get_image_value(img) for img in neg_set_images]

# compute expected values for negative concept
expected_neg_set_image_values = self._get_expected_values_by_concept(neg_concept)

# test whether correct negative set images were loaded
self.assertEqual(len(expected_neg_set_image_values), len(actual_neg_set_image_values))
actual_neg_set_image_values = set(actual_neg_set_image_values)
self.assertEqual(expected_neg_set_image_values, actual_neg_set_image_values)

def test_get_examples_for_concept_relative_tcav(self):
# (target, [positive-concept, negative-concept1+negative-concept2+...])
concept_pairs = sorted(self.relative_tcav.pairs_to_test)

# test if tcav object is relative tcav
is_relative_tcav = self.relative_tcav.is_relative_tcav
self.assertTrue(is_relative_tcav)

# extract concepts
concept_pair_id = 0
target, (pos_concept, neg_concept) = concept_pairs[concept_pair_id]

# get positive concept image values
pos_set_images = self.img_act_gen.get_examples_for_concept(pos_concept, is_relative_tcav)
actual_pos_set_image_values = [_get_image_value(img) for img in pos_set_images]

# compute expected values for positive concept
expected_pos_set_image_values = self._get_expected_values_by_concept(pos_concept, is_relative_tcav)

# test whether correct positive set images were loaded
self.assertEqual(len(expected_pos_set_image_values), len(actual_pos_set_image_values))
actual_pos_set_image_values = set(actual_pos_set_image_values)
self.assertEqual(expected_pos_set_image_values, actual_pos_set_image_values)

# get negative concept image values
neg_set_images = self.img_act_gen.get_examples_for_concept(neg_concept, is_relative_tcav)
actual_neg_set_image_values = [_get_image_value(img) for img in neg_set_images]

# compute expected values for negative concept
expected_neg_set_image_values = self._get_expected_values_by_concept(neg_concept, is_relative_tcav)

# test whether correct negative set images were loaded
self.assertEqual(len(expected_neg_set_image_values), len(actual_neg_set_image_values))
actual_neg_set_image_values = set(actual_neg_set_image_values)
self.assertEqual(expected_neg_set_image_values, actual_neg_set_image_values)

def test_get_examples_for_concept_discrete(self):
discrete_act_gen = KDD99DiscreteActivationGenerator(
model=None,
source_dir=None,
acts_dir=None,
max_examples=0
)
concept = None
is_relative = True
self.assertRaises(NotImplementedError, discrete_act_gen.get_examples_for_concept, concept, is_relative)


if __name__ == '__main__':
googletest.main()
Loading