Skip to content

Commit

Permalink
chore: fixing lint and style
Browse files Browse the repository at this point in the history
  • Loading branch information
fracarfer5 committed Jul 30, 2024
1 parent 4526a9d commit 9f172a2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
def x_array_to_df(x_arr, feature_names):
return pd.DataFrame(x_arr, columns=feature_names)


def x_to_nd_array(x: pd.DataFrame):
return np.array(x)

Expand Down Expand Up @@ -36,6 +37,7 @@ def to_categorical(labels: Union[np.ndarray, list[float]], nb_classes: Optional[
categorical[np.arange(labels.shape[0]), np.squeeze(labels)] = 1
return categorical


def format_function_predict_proba(learning_task, predict_proba_fn):
"""
Format the predict_proba function based on the learning task.
Expand All @@ -54,10 +56,12 @@ def format_function_predict_proba(learning_task, predict_proba_fn):
"""
match learning_task:
case "binary_classification":

def forward(x: np.ndarray, feature_names: list[str]):
x_df = x_array_to_df(x, feature_names=feature_names)
score = np.array(predict_proba_fn(x_df))
if score.ndim == 2:
return score
return np.stack([1-score, score], axis=1)
return np.stack([1 - score, score], axis=1)

return forward
54 changes: 18 additions & 36 deletions src/holisticai/robustness/attackers/classification/hop_skip_jump.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
| Paper link: https://arxiv.org/abs/1904.02144
"""

from __future__ import annotations

from typing import Callable, Literal, Optional, Union
Expand Down Expand Up @@ -57,7 +58,8 @@ class HopSkipJump(BaseModel):
----------
.. [1] Chen, J., Jordan, M. I., & Wainwright, M. J. (2019). HopSkipJumpAttack: A query-efficient decision-based attack. In 2020 ieee symposium on security and privacy (sp) (pp. 1277-1294). IEEE.
"""
name : Literal["HSJ"] = "HSJ"

name: Literal["HSJ"] = "HSJ"

batch_size: int = 64
targeted: bool = False
Expand All @@ -67,12 +69,12 @@ class HopSkipJump(BaseModel):
init_eval: int = 100
init_size: int = 100
verbose: bool = True
predictor: Callable[[NDArray], NDArray|ArrayLike] = lambda x: np.ndarray([]) # noqa: ARG005
clip_values : tuple =()
input_shape : list =[]
input_size : int = 0
theta : float = 0.0
feature_names: list =[]
predictor: Callable[[NDArray], NDArray | ArrayLike] = lambda x: np.ndarray([]) # noqa: ARG005
clip_values: tuple = ()
input_shape: list = []
input_size: int = 0
theta: float = 0.0
feature_names: list = []
curr_iter: int = 0

def __init__(self, **kwargs):
Expand Down Expand Up @@ -103,9 +105,7 @@ def predict(self, x: np.ndarray):

return np.array(self.predictor(x_df))

def generate(
self, x_df: pd.DataFrame, y: Optional[np.ndarray] = None, **kwargs
) -> pd.DataFrame:
def generate(self, x_df: pd.DataFrame, y: Optional[np.ndarray] = None, **kwargs) -> pd.DataFrame:
"""
Generate adversarial samples and return them in an array.
Expand All @@ -129,9 +129,7 @@ def generate(
if y is None:
# Throw error if attack is targeted, but no targets are provided
if self.targeted: # pragma: no cover
raise ValueError(
"Target labels `y` need to be provided for a targeted attack."
)
raise ValueError("Target labels `y` need to be provided for a targeted attack.")

# Use model predictions as correct outputs
y = self.predict(x)
Expand Down Expand Up @@ -175,7 +173,6 @@ def generate(

# Generate the adversarial samples
for ind, val in enumerate(x_adv):

self.curr_iter = start

if self.targeted:
Expand Down Expand Up @@ -244,18 +241,14 @@ def _perturb(
An adversarial example.
"""
# First, create an initial adversarial sample
initial_sample = self._init_sample(
x, y, y_p, init_pred, adv_init, mask, clip_min, clip_max
)
initial_sample = self._init_sample(x, y, y_p, init_pred, adv_init, mask, clip_min, clip_max)

# If an initial adversarial example is not found, then return the original image
if initial_sample is None:
return x

# If an initial adversarial example found, then go with HopSkipJump attack
x_adv = self._attack(
initial_sample[0], x, initial_sample[1], mask, clip_min, clip_max
)
x_adv = self._attack(initial_sample[0], x, initial_sample[1], mask, clip_min, clip_max)

return x_adv

Expand Down Expand Up @@ -312,9 +305,7 @@ def _init_sample(

# Attack unsatisfied yet and the initial image unsatisfied
for _ in range(self.init_size):
random_img = nprd.uniform(clip_min, clip_max, size=x.shape).astype(
x.dtype
)
random_img = nprd.uniform(clip_min, clip_max, size=x.shape).astype(x.dtype)

if mask is not None:
random_img = random_img * mask + x * (1 - mask)
Expand Down Expand Up @@ -343,9 +334,7 @@ def _init_sample(

# The initial image unsatisfied
for _ in range(self.init_size):
random_img = nprd.uniform(clip_min, clip_max, size=x.shape).astype(
x.dtype
)
random_img = nprd.uniform(clip_min, clip_max, size=x.shape).astype(x.dtype)

if mask is not None:
random_img = random_img * mask + x * (1 - mask)
Expand Down Expand Up @@ -426,9 +415,7 @@ def _attack(
)

# Next compute the number of evaluations and compute the update
num_eval = min(
int(self.init_eval * np.sqrt(self.curr_iter + 1)), self.max_eval
)
num_eval = min(int(self.init_eval * np.sqrt(self.curr_iter + 1)), self.max_eval)

update = self._compute_update(
current_sample=current_sample,
Expand Down Expand Up @@ -655,10 +642,7 @@ def _compute_update(
satisfied = self._adversarial_satisfactory(
samples=eval_samples, target=target, clip_min=clip_min, clip_max=clip_max
)
f_val = (
2 * satisfied.reshape([num_eval] + [1] * len(self.input_shape))
- 1.0
)
f_val = 2 * satisfied.reshape([num_eval] + [1] * len(self.input_shape)) - 1.0

if np.mean(f_val) == 1.0:
grad = np.mean(rnd_noise, axis=0)
Expand Down Expand Up @@ -731,8 +715,6 @@ def _interpolate(
if norm == 2:
result = (1 - alpha) * original_sample + alpha * current_sample
else:
result = np.clip(
current_sample, original_sample - alpha, original_sample + alpha
)
result = np.clip(current_sample, original_sample - alpha, original_sample + alpha)

return result
34 changes: 15 additions & 19 deletions src/holisticai/robustness/attackers/classification/zoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
| Paper link: https://arxiv.org/abs/1708.03999
"""

from __future__ import annotations

from typing import Any, Callable, Literal, Optional
Expand Down Expand Up @@ -78,7 +79,8 @@ class ZooAttack(BaseModel):
black-box attacks to deep neural networks without training substitute models." In Proceedings of the 10th\\
ACM Workshop on Artificial Intelligence and Security, pp. 15-26. 2017.
"""
name : Literal["Zoo"] = "Zoo"

name: Literal["Zoo"] = "Zoo"

model_config = ConfigDict(arbitrary_types_allowed=True)
confidence: float = 0.0
Expand All @@ -94,16 +96,16 @@ class ZooAttack(BaseModel):
batch_size: int = 1
variable_h: float = 0.2
verbose: bool = True
input_is_feature_vector : bool =False
clip_values : tuple = (0,1)
predict_proba_fn : Callable[[NDArray, list], NDArray|ArrayLike] = lambda x: np.ndarray([]) # noqa: ARG005
predict_proba: Callable[[NDArray], NDArray|ArrayLike] = lambda x: np.ndarray([]) # noqa: ARG005
input_shape: tuple =()
input_size : int = 0
nb_classes: int =2
adam_mean: Optional[NDArray|ArrayLike|None]=None
adam_var: Optional[NDArray|ArrayLike|None]=None
adam_epochs: Optional[NDArray|ArrayLike|None]=None
input_is_feature_vector: bool = False
clip_values: tuple = (0, 1)
predict_proba_fn: Callable[[NDArray, list], NDArray | ArrayLike] = lambda x: np.ndarray([]) # noqa: ARG005
predict_proba: Callable[[NDArray], NDArray | ArrayLike] = lambda x: np.ndarray([]) # noqa: ARG005
input_shape: tuple = ()
input_size: int = 0
nb_classes: int = 2
adam_mean: Optional[NDArray | ArrayLike | None] = None
adam_var: Optional[NDArray | ArrayLike | None] = None
adam_epochs: Optional[NDArray | ArrayLike | None] = None

def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -139,9 +141,6 @@ def __init__(self, **kwargs):
self._current_noise = np.zeros((self.batch_size, *self.input_shape), dtype=np.float32)
self._sample_prob = np.ones(self._current_noise.size, dtype=np.float32) / self._current_noise.size




def _loss(
self, x: np.ndarray, x_adv: np.ndarray, target: np.ndarray, c_weight: np.ndarray
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
Expand All @@ -165,9 +164,7 @@ def _loss(
The predictions, the L2 distances, and the loss values.
"""
l2dist = np.sum(np.square(x - x_adv).reshape(x_adv.shape[0], -1), axis=1)
ratios = [1.0] + [
int(new_size) / int(old_size) for new_size, old_size in zip(self.input_shape, x.shape[1:])
]
ratios = [1.0] + [int(new_size) / int(old_size) for new_size, old_size in zip(self.input_shape, x.shape[1:])]
preds = self.predict_proba(np.array(zoom(x_adv, zoom=ratios)))
z_target = np.sum(preds * target, axis=1)
z_other = np.max(
Expand Down Expand Up @@ -394,7 +391,6 @@ def _generate_bss(
best_attack = np.array([x_adv[i] for i in range(x_adv.shape[0])])

for iter_ in range(self.max_iter):

# Upscaling for very large number of iterations
if self.use_resize:
if iter_ == 2000:
Expand Down Expand Up @@ -601,7 +597,7 @@ def _optimizer_adam_coordinate(

# ADAM update
mean[index] = beta1 * mean[index] + (1 - beta1) * grads
var[index] = beta2 * var[index] + (1 - beta2) * grads ** 2
var[index] = beta2 * var[index] + (1 - beta2) * grads**2

corr = (np.sqrt(1 - np.power(beta2, adam_epochs[index]))) / (1 - np.power(beta1, adam_epochs[index]))
orig_shape = current_noise.shape
Expand Down

0 comments on commit 9f172a2

Please sign in to comment.