1
+ import torch
2
+ import numpy as np
3
+ import torch
4
+ import torch .nn as nn
5
+ import torch .nn .functional as F
6
+ from tqdm import tqdm
7
+ import os
8
+ import matplotlib .pyplot as plt
9
+ from advertorch .attacks import LinfPGDAttack , GradientSignAttack , CarliniWagnerL2Attack , LinfBasicIterativeAttack
10
+ # import seaborn as sns
11
+
12
+ import sys
13
+ sys .path .insert (0 ,'/raid/sdas_ma/Adversarial_CapsNet_Pytorch/' )
14
+ from model .net import *
15
+ from model .cnn_net import *
16
+ from utils .training import *
17
+ from data .data import *
18
+
19
+ from advertorch .attacks .base import Attack , LabelMixin
20
+ from advertorch .utils import clamp
21
+ import matplotlib .pyplot as plt
22
+ from advertorch .attacks import LinfPGDAttack , GradientSignAttack , CarliniWagnerL2Attack , LinfBasicIterativeAttack
23
+
24
+
25
+
26
+ base_path = '/raid/sdas_ma/Adversarial_CapsNet_Pytorch/'
27
+ model_path = "/raid/sdas_ma/Adversarial_CapsNet_Pytorch/weights/" #os.path.join(os.getcwd(), "weights")
28
+
29
+ Caps_args = {
30
+ 'DATASET_NAME' :'mnist' ,
31
+ 'num_classes' :10 ,
32
+
33
+ 'USE_CUDA' : True if torch .cuda .is_available () else False ,
34
+ 'BATCH_SIZE' : 512 ,
35
+
36
+ ##For Decoder
37
+ 'num_features' :160 ,
38
+ 'LReLU_negative_slope' :0.1 ,
39
+ 'input_height' :28 ,
40
+ 'input_width' :28 ,
41
+ 'input_channel' :1 ,
42
+ }
43
+
44
+ CNN_args = {
45
+ 'DATASET_NAME' :'mnist' ,
46
+ 'num_classes' :10 ,
47
+
48
+ 'USE_CUDA' : True if torch .cuda .is_available () else False ,
49
+ 'BATCH_SIZE' : 256 ,
50
+ #For Decoder
51
+ 'num_features' :160 ,
52
+ 'LReLU_negative_slope' :0.1 ,
53
+ 'input_height' :28 ,
54
+ 'input_width' :28 ,
55
+ 'input_channel' :1 ,
56
+ 'type' :'plusCR' ,
57
+ }
58
+
59
+ class Caps_Config :
60
+ def __init__ (self , dataset = 'mnist' ):
61
+ # CNN (cnn)
62
+ self .cnn_in_channels = 1
63
+ self .cnn_out_channels = 12
64
+ self .cnn_kernel_size = 15
65
+
66
+ # Primary Capsule (pc)
67
+ self .pc_num_capsules = 1
68
+ self .pc_in_channels = 12
69
+ self .pc_out_channels = 16
70
+ self .pc_kernel_size = 8
71
+ self .pc_num_routes = 7 * 7
72
+
73
+ # Digit Capsule 1 (dc)
74
+ self .dc_num_capsules = 49
75
+ self .dc_num_routes = 7 * 7
76
+ self .dc_in_channels = 16
77
+ self .dc_out_channels = 16 #1
78
+
79
+ # Digit Capsule 2 (dc)
80
+ self .dc_2_num_capsules = 10
81
+ self .dc_2_num_routes = 7 * 7
82
+ self .dc_2_in_channels = 16 #1
83
+ self .dc_2_out_channels = 16
84
+
85
+ # Decoder
86
+ self .input_width = 28
87
+ self .input_height = 28
88
+
89
+ class CNN_Config :
90
+ def __init__ (self , dataset = 'mnist' ):
91
+ # CONV1
92
+ self .conv1_in = 1
93
+ self .conv1_out = 12
94
+ self .conv1_kernel_size = 15
95
+
96
+ # CONV2
97
+ self .conv2_in = 12
98
+ self .conv2_out = 16
99
+ self .conv2_kernel_size = 8
100
+
101
+ # FC1
102
+ self .fc1_in = 7 * 7 * 16
103
+ self .fc1_out = 784
104
+
105
+ # FC1
106
+ self .fc2_in = 784
107
+ self .fc2_out = 160
108
+
109
+ torch .manual_seed (1 )
110
+
111
+ class Model_for_Adversary_Caps (nn .Module ):
112
+ def __init__ (self , net ):
113
+ super (Model_for_Adversary_Caps , self ).__init__ ()
114
+ self .net = net
115
+
116
+ def forward (self , x ):
117
+ output , recons , masked = self .net (x )
118
+ classes = torch .sqrt ((output ** 2 ).sum (2 )).squeeze ()
119
+ return classes
120
+
121
+ class Model_for_Adversary_CNN (nn .Module ):
122
+ def __init__ (self , net ):
123
+ super (Model_for_Adversary_CNN , self ).__init__ ()
124
+ self .net = net
125
+
126
+ def forward (self , x ):
127
+ output , recons , masked = self .net (x )
128
+ classes = output .sum (2 )
129
+ return classes
130
+
131
+ def WhiteBox_Attacks_Targeted (net , dataloader , adversary_dict , args ):
132
+ net .eval ()
133
+ n_batch = len (dataloader )
134
+ Success_Rate = {key :0.0 for key in adversary_dict .keys ()}
135
+ Undetected_Rate = {key :0.0 for key in adversary_dict .keys ()}
136
+ Und_l2 = {key :torch .tensor ([],dtype = torch .int16 ).cuda () for key in adversary_dict .keys ()}
137
+ for adversary in adversary_dict .keys ():
138
+ for batch_id , (data , labels ) in enumerate (tqdm (dataloader )):
139
+ if (args ['USE_CUDA' ]):
140
+ data , labels = data .cuda (), labels .cuda ()
141
+ target = torch .randint (0 ,10 ,size = (labels .size (0 ),), dtype = labels .dtype ).cuda ()
142
+ while (torch .sum (target == labels )/ target .size (0 )> 0.0001 ):
143
+ target [target == labels ] = torch .randint (0 ,10 , size = (torch .sum (target == labels ),), dtype = labels .dtype ).cuda ()
144
+ adv_data = adversary_dict [adversary ].perturb (data , target )
145
+ with torch .no_grad ():
146
+ output , reconstructions , max_length_indices = net (adv_data )
147
+ unnormalized_data = net .decoder .unnormalize (adv_data )
148
+ l2_distances = ((reconstructions .view (adv_data .size (0 ),- 1 )- unnormalized_data .view (adv_data .size (0 ), - 1 ))** 2 ).sum (1 ).squeeze ().detach ()
149
+ theta = np .percentile (l2_distances .cpu ().numpy (), 95 )
150
+ if (adversary == 'Clean' ):
151
+ Und_l2 [adversary ] = torch .cat ((Und_l2 [adversary ],l2_distances ))
152
+ else :
153
+ Und_l2 [adversary ] = torch .cat ((Und_l2 [adversary ],l2_distances [max_length_indices == target ]))
154
+ Success_Rate [adversary ]+= torch .sum (max_length_indices == target ).item ()
155
+ Undetected_Rate [adversary ]+= torch .sum (l2_distances [max_length_indices == target ]<= theta ).item ()
156
+
157
+ Und_l2 [adversary ] = Und_l2 [adversary ].cpu ().numpy ()
158
+ Success_Rate [adversary ]/= 100
159
+ Undetected_Rate [adversary ]/= 100
160
+ return Success_Rate , Undetected_Rate , Und_l2
161
+
162
+ def WhiteBox_Attacks_Untargeted (net , dataloader , adversary_dict , args ):
163
+ net .eval ()
164
+ n_batch = len (dataloader )
165
+ Success_Rate = {key :0.0 for key in adversary_dict .keys ()}
166
+ Undetected_Rate = {key :0.0 for key in adversary_dict .keys ()}
167
+ Und_l2 = {key :torch .tensor ([],dtype = torch .int16 ).cuda () for key in adversary_dict .keys ()}
168
+ for adversary in adversary_dict .keys ():
169
+ for batch_id , (data , labels ) in enumerate (tqdm (dataloader )):
170
+ if (args ['USE_CUDA' ]):
171
+ data , labels = data .cuda (), labels .cuda ()
172
+
173
+ adv_data = adversary_dict [adversary ].perturb (data )
174
+ with torch .no_grad ():
175
+ output , reconstructions , max_length_indices = net (adv_data )
176
+ unnormalized_data = net .decoder .unnormalize (adv_data )
177
+ l2_distances = ((reconstructions .view (adv_data .size (0 ),- 1 )- unnormalized_data .view (adv_data .size (0 ), - 1 ))** 2 ).sum (1 ).squeeze ().detach ()
178
+ theta = np .percentile (l2_distances .cpu ().numpy (), 95 )
179
+ if (adversary == 'Clean' ):
180
+ Und_l2 [adversary ] = torch .cat ((Und_l2 [adversary ],l2_distances ))
181
+ else :
182
+ Und_l2 [adversary ] = torch .cat ((Und_l2 [adversary ],l2_distances [max_length_indices != labels ]))
183
+ Success_Rate [adversary ]+= torch .sum (max_length_indices != labels ).item ()
184
+ Undetected_Rate [adversary ]+= torch .sum (l2_distances [max_length_indices != labels ]<= theta ).item ()
185
+ # print(Success_Rate[adversary])
186
+ # print(Undetected_Rate[adversary])
187
+ # print(theta)
188
+ Und_l2 [adversary ] = Und_l2 [adversary ].cpu ().numpy ()
189
+ Success_Rate [adversary ]/= 100
190
+ Undetected_Rate [adversary ]/= 100
191
+ return Success_Rate , Undetected_Rate , Und_l2
192
+
193
+ class CleanAttack (Attack , LabelMixin ):
194
+ def __init__ (self , clip_min = 0. , clip_max = 1. ):
195
+ super (CleanAttack , self ).__init__ (None ,None ,clip_min , clip_max )
196
+
197
+ def perturb (self , x , y = None ):
198
+ return x
199
+
200
+ def make_adversary_dict (model , model_name , targetted = False ):
201
+ if (model_name == "capsnet" ):
202
+ model_for_adversary = Model_for_Adversary_Caps (model )
203
+ else :
204
+ model_for_adversary = Model_for_Adversary_CNN (model )
205
+
206
+ linf_eps = 0.3
207
+ fgsm_step = 0.05
208
+ bim_pgd_step = 0.01
209
+
210
+ adversary_dict = {}
211
+ adversary_dict ['Clean' ] = CleanAttack (clip_min = - 0.4242 , clip_max = 2.8215 )
212
+ adversary_dict ['PGD' ] = LinfPGDAttack (
213
+ model_for_adversary , loss_fn = nn .CrossEntropyLoss (reduction = "sum" ), eps = (linf_eps / 0.3081 ),
214
+ nb_iter = 100 , eps_iter = (bim_pgd_step / 0.3081 ), rand_init = True , clip_min = - 0.4242 , clip_max = 2.8215 ,
215
+ targeted = targetted )
216
+
217
+ adversary_dict ['FGSM' ] = GradientSignAttack (model_for_adversary , loss_fn = nn .CrossEntropyLoss (reduction = "sum" ), eps = (fgsm_step / 0.3081 ), clip_min = - 0.4242 , clip_max = 2.8215 , targeted = targetted )
218
+ adversary_dict ['BIM' ] = LinfBasicIterativeAttack (model_for_adversary , loss_fn = nn .CrossEntropyLoss (reduction = "sum" ), eps = (linf_eps / 0.3081 ), nb_iter = 100 , eps_iter = (bim_pgd_step / 0.3081 ), clip_min = - 0.4242 , clip_max = 2.8215 , targeted = targetted )
219
+
220
+ return adversary_dict
221
+
222
+ def capsnet ():
223
+ config = Caps_Config ()
224
+ net = CapsNet (Caps_args , config )
225
+ # capsule_net = torch.nn.DataParallel(capsule_net)
226
+ if Caps_args ['USE_CUDA' ]:
227
+ net = net .cuda ()
228
+ net .load_state_dict (torch .load (os .path .join (model_path , 'CapsNet_mnist.pth' ), map_location = 'cpu' ))
229
+ return net , Caps_args
230
+
231
+ def CNN (model_type ):
232
+ CNN_args ['type' ] = model_type
233
+ config = CNN_Config ()
234
+ net = CNNnet (CNN_args , config )
235
+ net .load_state_dict (torch .load (os .path .join (model_path , 'CNN' + model_type + '_mnist.pth' ), map_location = 'cpu' ))
236
+ if CNN_args ['USE_CUDA' ]:
237
+ net = net .cuda ()
238
+ return net , CNN_args
239
+
240
+ def load_model (model_name ):
241
+ if (model_name == "capsnet" ):
242
+ net , args = capsnet ()
243
+ return net , args
244
+ else :
245
+ net , args = CNN (model_name )
246
+ return net , args
247
+
248
+
249
+ _ , testloader = dataset (Caps_args )
250
+
251
+ model_name_list = ["plusCR" , "plusR" , "capsnet" ]
252
+ types = [True , False ]
253
+ funcs = [ WhiteBox_Attacks_Targeted , WhiteBox_Attacks_Untargeted ]
254
+ for model_name in model_name_list :
255
+ net , args = load_model (model_name )
256
+ for i ,func in enumerate (funcs ):
257
+ adversary_dict = make_adversary_dict (net , model_name , targetted = types [i ])
258
+ _ , _ , Und_l2 = func (net , testloader , adversary_dict , args )
259
+ for attack in adversary_dict .keys ():
260
+ print (Und_l2 [attack ].shape [0 ],": Success Rate for" ,model_name ,attack )
261
+ print (np .sum (Und_l2 [attack ]< 45 ),": Undetected Rate for" ,model_name ,attack )
262
+ if (types [i ] == True ):
263
+ targetted = "Targeted"
264
+ else :
265
+ targetted = "Untargeted"
266
+ if (model_name == "capsnet" ):
267
+ mod_name = "CapsNet"
268
+ elif (model_name == "plusCR" ):
269
+ mod_name = "plusCR"
270
+ else :
271
+ mod_name = "plusR"
272
+ np .save (os .path .join (base_path , "results" ,str ("Und_L2_" + targetted + "_" + mod_name + ".npy" )),Und_l2 )
0 commit comments