|
- from __future__ import print_function, division, absolute_import
- import argparse
- from gettext import install
- from importlib.metadata import requires
- import os
- from PIL import Image
- os.environ['CUDA_VISIBLE_DEVICES'] = '1'
-
- import torch
- import torchvision.transforms as transforms
- import torchvision.utils as vutils
- import cv2
- import sys
-
- import pandas
- import torch_fidelity
- import numpy
- from tqdm import tqdm
- from shutil import copy, rmtree
- import random
- import traceback
-
- sys.path.append('.')
-
- from model import efficientnetv2_self as create_model
-
-
-
- device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
-
-
- parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')
- parser.add_argument('--arch',
- '-a',
- metavar='ARCH',
- default='efficientV2',
- type=str,)
- parser.add_argument('--eps',
- '-e',
- type=float,
- default=0.4,
- help='Disturbance ratio')
-
- parser.add_argument('--order',
- type=int,
- default=0)
-
- # data_transformBack = transforms.Normalize([-1, -1, -1], [2, 2, 2])
-
-
- # 基于补丁patch的扰动(约束补丁的大小)
-
- def getFileName(str):
- fileName = ''
- for i in range(len(str)):
- if str[i] != '/':
- fileName = fileName + str[i]
- else:
- fileName = ''
- return fileName
-
-
- def min(a, b):
- if a < b:
- return a
- else:
- return b
-
-
- def max(a, b):
- if a > b:
- return a
- else:
- return b
-
-
- def mk_file(file_path: str):
- if os.path.exists(file_path) == False:
- os.makedirs(file_path)
-
-
-
- def makeInitialName(name, modelName):
- initialName = ""
- flag = False
- for i in range(len(name)):
- if flag == False:
- if name[len(name) - 1 - i] != '.':
- initialName = name[len(name) - 1 - i] + initialName
- else:
- flag = True
- initialName = "_initial" + name[len(name) - 1 - i] + initialName
- else:
- initialName = name[len(name) - 1 - i] + initialName
- return initialName
-
-
- def makeAttackName(name, modelName, attack):
- attackName = ""
- flag = False
- for i in range(len(name)):
- if flag == False:
- if name[len(name) - 1 - i] != '.':
- attackName = name[len(name) - 1 - i] + attackName
- else:
- flag = True
- attackName = "_" + attack + name[len(name) - 1 - i] + attackName
- else:
- attackName = name[len(name) - 1 - i] + attackName
- return attackName
-
-
- def getKey():
- with open('data/imagenet_synsets.txt', 'r') as f:
- synsets = f.readlines()
-
- synsets = [x.strip() for x in synsets]
- splits = [line.split(' ') for line in synsets]
- key_to_classname = {spl[0]: ' '.join(spl[1:]) for spl in splits}
-
- with open('data/imagenet_classes.txt', 'r') as f:
- class_id_to_key = f.readlines()
-
- class_id_to_key = [x.strip() for x in class_id_to_key]
- return (key_to_classname, class_id_to_key)
-
-
- def prediction(model, input, class_id_to_key, key_to_classname, arch, path_img,
- savePath, out):
- # Make predictions
- output = model(input) # size(1, 1000)
- cost, argmax = output.data.squeeze().max(0)
-
- class_id = argmax.item()
- class_key = class_id_to_key[class_id]
- classnames = key_to_classname[class_key]
- classname = getClassName(classnames)
-
- if out == True:
- # print("Model '{}': '{}' is a '{}', classify score is {}".format(arch, path_img, classname, cost))
- if savePath != None:
- vutils.save_image(input, savePath, padding=0)
- #加载背景图片
- bk_img = cv2.imread(savePath)
- #在图片上添加文字信息
- '''
- cv2.putText(bk_img, classname + ':' + str(round(cost.item(), 2)),
- (0, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1,
- cv2.LINE_AA)
- cv2.imwrite(savePath, bk_img)
- '''
- return class_id
-
-
- def getClassName(name):
- classname = ''
- for i in range(len(name)):
- if name[i] != ',':
- classname = classname + name[i]
- else:
- break
- return classname
-
-
- def FGSM_attack(model, input_data, eps):
- input = torch.autograd.Variable(input_data, requires_grad=True)
- # output_features = model.features(input) # 1x14x14x2048 size may differ
- # output_logits = model.logits(output_features) # 1x1000 """
-
- output = model(input)
- loss = torch.max(output)
- loss.backward()
-
- disturbance = -input.grad.sign()
-
- # print('dis is {}'.format(disturbance))
- attack_input = input + eps * disturbance
- attack_input = torch.clamp(attack_input, 0, 1)
- return attack_input
-
-
- def FGM_attack(model, input_data, eps):
- input = torch.autograd.Variable(input_data, requires_grad=True)
-
- output = model(input)
- loss = torch.max(output)
- loss.backward()
-
- disturbance = -input.grad
-
- # print('dis is {}'.format(disturbance))
- attack_input = input + 200 * eps * disturbance / input.grad.norm(2)
- attack_input = torch.clamp(attack_input, 0, 1)
- return attack_input
-
-
- def BIM_attack(model, input_data, eps, iterations, theta):
- classNum = -1
- clipped_input = torch.autograd.Variable(input_data, requires_grad=True)
- # print("Begin BIM Attack")
-
- for i in range(iterations):
- output_logits = model(clipped_input)
- if classNum == -1:
- loss = torch.max(output_logits)
- classNum = torch.argmax(output_logits)
- else:
- loss = (output_logits.view(-1))[classNum]
- loss.backward()
- disturbance = -clipped_input.grad.sign()
-
- # print('dis is {}'.format(disturbance))
- attack_input = clipped_input + (eps / 10) * disturbance
- attack_input = torch.clamp(attack_input, 0, 1)
- clipped_input = clip(input_data, attack_input, theta)
- clipped_input = torch.autograd.Variable(clipped_input,
- requires_grad=True)
-
- # print("BIM Attack Done.")
- return clipped_input
-
-
- def ILCM_attack(model, input_data, eps, iterations, theta):
- classNum = -1
- clipped_input = torch.autograd.Variable(input_data, requires_grad=True)
- # print("Begin ILCM Attack")
-
- for i in range(iterations):
- output_logits = model(clipped_input)
- if classNum == -1:
- loss = torch.min(output_logits)
- classNum = torch.argmin(output_logits)
- else:
- loss = (output_logits.view(-1))[classNum]
- loss.backward()
- disturbance = clipped_input.grad.sign()
-
- # print('dis is {}'.format(disturbance))
- attack_input = clipped_input + (eps / 10) * disturbance
- attack_input = torch.clamp(attack_input, 0, 1)
- clipped_input = clip(input_data, attack_input, theta)
- clipped_input = torch.autograd.Variable(clipped_input,
- requires_grad=True)
-
- # print("ILCM Attack Done.")
- return clipped_input
-
- def PGD_attack(model, input_data, eps, iterations, theta):
- classNum = -1
- initialDis = 255 * torch.rand(input_data.size()).to(device)
- clipped_input = clip(input_data, input_data + initialDis, theta)
- clipped_input = torch.autograd.Variable(clipped_input, requires_grad=True)
- # print("Begin PGD Attack")
- for i in range(iterations):
- output_logits = model(clipped_input)
- if classNum == -1:
- loss = torch.max(output_logits)
- classNum = torch.argmax(output_logits)
- else:
- loss = (output_logits.view(-1))[classNum]
- loss.backward()
- disturbance = -clipped_input.grad.sign()
-
- # print('dis is {}'.format(disturbance))
- attack_input = clipped_input + (eps / 10) * disturbance
- attack_input = torch.clamp(attack_input, 0, 1)
- clipped_input = clip(input_data, attack_input, theta)
- clipped_input = torch.autograd.Variable(clipped_input,
- requires_grad=True)
-
- # print("PGD Attack Done.")
- return clipped_input
-
-
- def clip(inputImage, disturbedImage, theta):
- clip1 = torch.clamp(disturbedImage, inputImage - theta, inputImage + theta)
- clip2 = torch.clamp(clip1, 0, 255)
- return clip2
-
-
- def main():
- random.seed(0)
- img_size = {"s": [300, 384], # train_size, val_size
- "m": [384, 480],
- "l": [384, 480]}
- num_model = "l"
-
- data_transform = transforms.Compose(
- [transforms.Resize(img_size[num_model][1]),
- transforms.CenterCrop(img_size[num_model][1]),
- transforms.ToTensor()])
-
- global args
- args = parser.parse_args()
-
- arch = args.arch
- # 指向你解压后的flower_photos文件夹
- data_root = os.path.join("/root/autodl-tmp/classification_data")
- origin_object_path = os.path.join(data_root, "classification_photos")
- assert os.path.exists(origin_object_path), "path '{}' does not exist.".format(origin_object_path)
-
- object_class = [cla for cla in os.listdir(origin_object_path)
- if os.path.isdir(os.path.join(origin_object_path, cla))]
-
- # 建立保存训练集的文件夹
- train_root = os.path.join("/root/autodl-tmp/attacked_data", "attacked_classification_photos")
- mk_file(train_root)
- for cla in object_class:
- # 建立每个类别对应的文件夹
- mk_file(os.path.join(train_root, cla))
-
- # model = pretrainedmodels.__dict__[arch](num_classes=1000, pretrained='imagenet').to(device)
- model = create_model(num_classes=20).to(device)
-
- # load model weights
- model_weight_path = "/root/autodl-tmp/torch_efficientnetv2/saved_weights/model-9422.pth"
- assert os.path.exists(
- model_weight_path), f"file: '{model_weight_path}' dose not exist."
- model.load_state_dict(torch.load(model_weight_path, map_location=device))
- model.eval()
-
- eps = args.eps
- bim_theta = 0.3
- bim_iterations = 20
- split_rate = 0.6
-
- imagesList = []
- evalList = []
- for i, classNum in enumerate(object_class):
- cla = object_class[i]
- cla_path = os.path.join(origin_object_path, cla)
- images = os.listdir(cla_path)
- imagesList.append(images)
- eval_index = random.sample(images, k=int(len(images)*split_rate))
- evalList.append(eval_index)
-
- for i, classNum in enumerate(object_class):
- if args.order == 0:
- cla = object_class[i]
- tqdmbar = tqdm(imagesList[i])
- eval_index = evalList[i]
- else:
- cla = object_class[len(object_class) - 1 - i]
- tqdmbar = tqdm(imagesList[len(object_class) - 1 - i])
- eval_index = evalList[len(object_class) - 1 - i]
-
- cla_path = os.path.join(origin_object_path, cla)
- for image in tqdmbar:
- tqdmbar.set_description("class: {}".format(cla))
- path_img = os.path.join(cla_path, image)
- attack_path = os.path.join(train_root, cla, image)
- initial_path = makeInitialName(attack_path, arch)
- if os.path.exists(attack_path) == False:
- copy(path_img, attack_path)
-
- if image in eval_index:
- # input_data = load_img(path_img) # 3x400x225
- # input_data = tf_img(input_data) # 3x299x299
- # input_data = input_data.unsqueeze(0).to(device) # 1x3x299x299
- img = Image.open(path_img)
- # [N, C, H, W]
- try:
- img = data_transform(img)
- except:
- continue
- # expand batch dimension
- input_data = torch.unsqueeze(img, dim=0).to(device)
- try:
- fgsm_attack_path = makeAttackName(attack_path, arch, "FGSM")
- if os.path.exists(fgsm_attack_path) == False:
- fgsm_attack_input = FGSM_attack(model, input_data, eps)
- vutils.save_image(fgsm_attack_input, fgsm_attack_path, padding=0)
- except Exception:
- print(Exception.__class__.__name__)
- print('FGSM error')
- try:
- fgm_attack_path = makeAttackName(attack_path, arch, "FGM")
- if os.path.exists(fgm_attack_path) == False:
- fgm_attack_input = FGM_attack(model, input_data, eps)
- vutils.save_image(fgm_attack_input, fgm_attack_path, padding=0)
- except Exception:
- print(Exception.__class__.__name__)
- print('FGM error')
-
- try:
- pgd_attack_path = makeAttackName(attack_path, arch, "PGD")
- if os.path.exists(pgd_attack_path) == False:
- pgd_attack_input = PGD_attack(model, input_data, eps, bim_iterations, bim_theta)
- vutils.save_image(pgd_attack_input, pgd_attack_path, padding=0)
- except Exception:
- print(Exception.__class__.__name__)
- print('PGD error')
- try:
- bim_attack_path = makeAttackName(attack_path, arch, "BIM")
- if os.path.exists(bim_attack_path) == False:
- bim_attack_input = BIM_attack(model, input_data, eps, bim_iterations, bim_theta)
- vutils.save_image(bim_attack_input, bim_attack_path, padding=0)
- except Exception:
- traceback.print_exc()
- print('BIM error')
-
- try:
- ilcm_attack_path = makeAttackName(attack_path, arch, "ILCM")
- if os.path.exists(ilcm_attack_path) == False:
- ilcm_attack_input = ILCM_attack(model, input_data, eps, bim_iterations, bim_theta)
- vutils.save_image(ilcm_attack_input, ilcm_attack_path, padding=0)
- except Exception:
- print(Exception)
- print('ilcm error')
-
- if __name__ == '__main__':
- main()
|