|
- # Copyright 2022 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """the dataset of deepmar"""
-
- import os
- import math
- import random
- import numpy as np
- from PIL import Image
- import mindspore
- import mindspore.common.dtype as mstype
- import mindspore.dataset.vision.c_transforms as C
- import mindspore.dataset.transforms.c_transforms as C2
- from scipy.io import loadmat
-
-
- np.random.seed(0)
- random.seed(0)
-
- new_weight = [1.6577705942628165, 1.9569133706913606, 2.453397219303324, 2.5511042784138787,
- 2.234931382642667, 2.2346961390891087, 1.1512428584451406, 1.1603675891347502,
- 2.3646536876713378, 2.3726322669984423, 2.455205651858246, 2.5371784866683735,
- 2.0040254740364105, 2.020547311604474, 2.6100475016963145, 2.1506916190382617,
- 1.566662199124304, 2.0233141637611203, 2.505331478757186, 1.2855130440514353,
- 2.0552984172896176, 2.652144136746138, 2.5188173194587176, 2.66782399742227,
- 1.8903024488309481, 2.6218881780191023, 2.35422250465764, 2.6015443856359055,
- 2.181013056518632, 2.6723209605343974, 2.6365564328356172, 1.622039341816392,
- 2.4982211718819127, 1.717271754249473, 2.6861401189973217]
-
-
- def generate_data_description(peta_dataset_mat_dir):
- """
- Create a data set description file consisting of images and labels
- """
- dataset = dict()
- dataset['description'] = 'peta'
- dataset['image'] = []
- dataset['att'] = []
- dataset['att_name'] = []
- dataset['selected_attribute'] = range(35)
- # load PETA.MAT
- data = loadmat(peta_dataset_mat_dir)
- for idx in range(105):
- dataset['att_name'].append(data['peta'][0][0][1][idx, 0][0])
-
- for idx in range(19000):
- dataset['image'].append('%05d.png' % (idx + 1))
- dataset['att'].append(data['peta'][0][0][0][idx, 4:].tolist())
- return dataset
-
-
- def create_trainvaltest_split(peta_dataset_mat_dir):
- """
- Create a data set split file that contains indexes for the TrainvalTest split
- """
- partition = dict()
- partition['trainval'] = []
- partition['train'] = []
- partition['val'] = []
- partition['test'] = []
- partition['weight_trainval'] = []
- partition['weight_train'] = []
- # load PETA.MAT
- data = loadmat(peta_dataset_mat_dir)
- for idx in range(5):
- train = (data['peta'][0][0][3][idx][0][0][0][0][:, 0] - 1).tolist()
- val = (data['peta'][0][0][3][idx][0][0][0][1][:, 0] - 1).tolist()
- test = (data['peta'][0][0][3][idx][0][0][0][2][:, 0] - 1).tolist()
- trainval = train + val
- partition['train'].append(train)
- partition['val'].append(val)
- partition['trainval'].append(trainval)
- partition['test'].append(test)
- # weight
- weight_trainval = np.mean(data['peta'][0][0][0][trainval, 4:].astype('float32') == 1, axis=0).tolist()
- weight_train = np.mean(data['peta'][0][0][0][train, 4:].astype('float32') == 1, axis=0).tolist()
- partition['weight_trainval'].append(weight_trainval)
- partition['weight_train'].append(weight_train)
- return partition
-
-
- class AttDataset():
-
- def __init__(self, peta_dataset_mat, split='train', image_path=None, partition_idx=0):
-
- self.dataset = generate_data_description(peta_dataset_mat)
- self.partition = create_trainvaltest_split(peta_dataset_mat)
-
- if partition_idx > len(self.partition[split]) - 1:
- print('partition_idx is out of range in partition.')
- raise ValueError
-
- self.root = image_path
-
- self.att_name = [self.dataset['att_name'][i] for i in
- self.dataset['selected_attribute']]
- self.image = []
- self.label = []
- for idx in self.partition[split][partition_idx]:
- self.image.append(self.dataset['image'][idx])
- label_tmp = np.array(self.dataset['att'][idx])[
- self.dataset['selected_attribute']].tolist()
- self.label.append(label_tmp)
-
- self.split = split
- if self.split == 'train' or self.split == 'trainval':
-
- rate = np.array(self.partition['weight_' + split][partition_idx])
- rate = rate[self.dataset['selected_attribute']].tolist()
- weight_pos = []
- weight_neg = []
- for idx, v in enumerate(rate):
- weight_pos.append(math.exp(1.0 - v))
- weight_neg.append(math.exp(v))
- self.weights = np.zeros((len(self.label), len(self.label[0]))).astype(np.float32)
-
- self.weights[:, :] = new_weight
-
- def __getitem__(self, index):
-
- imgname, target = self.image[index], self.label[index]
- imgname = os.path.join(self.root, imgname)
- img = Image.open(imgname)
-
- target = np.array(target).astype(np.float32)
- target[target == 0] = 0
- target[target == 2] = 0
-
- if self.split == 'train' or self.split == 'trainval':
- weight = self.weights[index]
- output = (img, target, weight)
- else:
- output = (img, target)
- return output
-
- def __len__(self):
- return len(self.image)
-
-
- def create_dataset(cfg, rank_size=None, rank_id=None):
- horizontal_flip_op = C.RandomHorizontalFlip(prob=0.5)
- resize_op = C.Resize(cfg.image_resize)
- normalize_op = C.Normalize(mean=[0.5 * 255, 0.5 * 255, 0.5 * 255], std=[0.5 * 255, 0.5 * 255, 0.5 * 255])
- random_crop_op = C.RandomCrop(cfg.image_resize)
- change_swap_op = C.HWC2CHW()
-
- trans_train = [resize_op, horizontal_flip_op, normalize_op, random_crop_op, change_swap_op]
- trans_valtest = [resize_op, normalize_op, change_swap_op]
- type_cast_op = C2.TypeCast(mstype.float32)
-
- # Get the training dataset
- if cfg.train_mode == "train":
- train_source = AttDataset(cfg.peta_dataset_mat_dir, cfg.split, cfg.image_path, cfg.partition_idx)
- if cfg.is_distributed:
- train_dataset = mindspore.dataset.GeneratorDataset(source=train_source,
- column_names=["data", "label", "weights"],
- shuffle=True, num_shards=rank_size, shard_id=rank_id)
- else:
- train_dataset = mindspore.dataset.GeneratorDataset(source=train_source,
- column_names=["data", "label", "weights"],
- shuffle=True)
-
- train_dataset = train_dataset.map(operations=trans_train, input_columns="data",
- num_parallel_workers=cfg.num_work)
- train_dataset = train_dataset.map(operations=type_cast_op, input_columns="label",
- num_parallel_workers=cfg.num_work)
- train_dataset = train_dataset.map(operations=type_cast_op, input_columns="weights",
- num_parallel_workers=cfg.num_work)
-
- train_dataset = train_dataset.project(columns=["data", "label", "weights"])
-
- train_dataset = train_dataset.shuffle(buffer_size=1000)
- train_dataset = train_dataset.batch(cfg.batch_size, drop_remainder=True)
- train_dataset = train_dataset.repeat(1)
-
- outputs = train_dataset
-
- # Getting test dataset
- elif cfg.train_mode == "test":
- test_source = AttDataset(cfg.peta_dataset_mat_dir, cfg.split, cfg.image_path, cfg.partition_idx)
- test_label = test_source.label
- test_len = test_source.__len__()
-
- test_dataset = mindspore.dataset.GeneratorDataset(source=test_source, column_names=["data", "label"],
- shuffle=False)
- test_dataset = test_dataset.map(operations=trans_valtest, input_columns="data",
- num_parallel_workers=cfg.num_work)
- test_dataset = test_dataset.map(operations=type_cast_op, input_columns="label",
- num_parallel_workers=cfg.num_work)
-
- test_dataset = test_dataset.project(columns=["data", "label"])
- test_dataset = test_dataset.batch(cfg.batch_size, drop_remainder=False)
- test_dataset = test_dataset.repeat(1)
-
- outputs = test_dataset, test_label, test_len
-
- else:
- raise Exception("Please enter the correct training mode keywords!")
-
- return outputs
|