|
- #! /usr/bin/env python
- # coding=utf-8
-
- import os
- import cv2
- import random
- import numpy as np
- import tensorflow as tf
- import core.utils as utils
- from core.config import cfg
-
-
- class Dataset(object):
- """implement Dataset here"""
-
- def __init__(self, FLAGS, is_training: bool, dataset_type: str = "converted_coco"):
- self.tiny = FLAGS.tiny
- self.strides, self.anchors, NUM_CLASS, XYSCALE = utils.load_config(FLAGS)
- self.dataset_type = dataset_type
-
- self.annot_path = (
- cfg.TRAIN.ANNOT_PATH if is_training else cfg.TEST.ANNOT_PATH
- )
- self.input_sizes = (
- cfg.TRAIN.INPUT_SIZE if is_training else cfg.TEST.INPUT_SIZE
- )
- self.batch_size = (
- cfg.TRAIN.BATCH_SIZE if is_training else cfg.TEST.BATCH_SIZE
- )
- self.data_aug = cfg.TRAIN.DATA_AUG if is_training else cfg.TEST.DATA_AUG
-
- self.train_input_sizes = cfg.TRAIN.INPUT_SIZE
- self.classes = utils.read_class_names(cfg.YOLO.CLASSES)
- self.num_classes = len(self.classes)
- self.anchor_per_scale = cfg.YOLO.ANCHOR_PER_SCALE
- self.max_bbox_per_scale = 150
-
- self.annotations = self.load_annotations()
- self.num_samples = len(self.annotations)
- self.num_batchs = int(np.ceil(self.num_samples / self.batch_size))
- self.batch_count = 0
-
- def load_annotations(self):
- with open(self.annot_path, "r") as f:
- txt = f.readlines()
- if self.dataset_type == "converted_coco":
- annotations = [
- line.strip()
- for line in txt
- if len(line.strip().split()[1:]) != 0
- ]
- elif self.dataset_type == "yolo":
- annotations = []
- for line in txt:
- image_path = line.strip()
- root, _ = os.path.splitext(image_path)
- with open(root + ".txt") as fd:
- boxes = fd.readlines()
- string = ""
- for box in boxes:
- box = box.strip()
- box = box.split()
- class_num = int(box[0])
- center_x = float(box[1])
- center_y = float(box[2])
- half_width = float(box[3]) / 2
- half_height = float(box[4]) / 2
- string += " {},{},{},{},{}".format(
- center_x - half_width,
- center_y - half_height,
- center_x + half_width,
- center_y + half_height,
- class_num,
- )
- annotations.append(image_path + string)
-
- np.random.shuffle(annotations)
- return annotations
-
- def __iter__(self):
- return self
-
- def __next__(self):
- with tf.device("/cpu:0"):
- # self.train_input_size = random.choice(self.train_input_sizes)
- self.train_input_size = cfg.TRAIN.INPUT_SIZE
- self.train_output_sizes = self.train_input_size // self.strides
-
- batch_image = np.zeros(
- (
- self.batch_size,
- self.train_input_size,
- self.train_input_size,
- 3,
- ),
- dtype=np.float32,
- )
-
- batch_label_sbbox = np.zeros(
- (
- self.batch_size,
- self.train_output_sizes[0],
- self.train_output_sizes[0],
- self.anchor_per_scale,
- 5 + self.num_classes,
- ),
- dtype=np.float32,
- )
- batch_label_mbbox = np.zeros(
- (
- self.batch_size,
- self.train_output_sizes[1],
- self.train_output_sizes[1],
- self.anchor_per_scale,
- 5 + self.num_classes,
- ),
- dtype=np.float32,
- )
- batch_label_lbbox = np.zeros(
- (
- self.batch_size,
- self.train_output_sizes[2],
- self.train_output_sizes[2],
- self.anchor_per_scale,
- 5 + self.num_classes,
- ),
- dtype=np.float32,
- )
-
- batch_sbboxes = np.zeros(
- (self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32
- )
- batch_mbboxes = np.zeros(
- (self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32
- )
- batch_lbboxes = np.zeros(
- (self.batch_size, self.max_bbox_per_scale, 4), dtype=np.float32
- )
-
- num = 0
- if self.batch_count < self.num_batchs:
- while num < self.batch_size:
- index = self.batch_count * self.batch_size + num
- if index >= self.num_samples:
- index -= self.num_samples
- annotation = self.annotations[index]
- image, bboxes = self.parse_annotation(annotation)
- (
- label_sbbox,
- label_mbbox,
- label_lbbox,
- sbboxes,
- mbboxes,
- lbboxes,
- ) = self.preprocess_true_boxes(bboxes)
-
- batch_image[num, :, :, :] = image
- batch_label_sbbox[num, :, :, :, :] = label_sbbox
- batch_label_mbbox[num, :, :, :, :] = label_mbbox
- batch_label_lbbox[num, :, :, :, :] = label_lbbox
- batch_sbboxes[num, :, :] = sbboxes
- batch_mbboxes[num, :, :] = mbboxes
- batch_lbboxes[num, :, :] = lbboxes
- num += 1
- self.batch_count += 1
- batch_smaller_target = batch_label_sbbox, batch_sbboxes
- batch_medium_target = batch_label_mbbox, batch_mbboxes
- batch_larger_target = batch_label_lbbox, batch_lbboxes
-
- return (
- batch_image,
- (
- batch_smaller_target,
- batch_medium_target,
- batch_larger_target,
- ),
- )
- else:
- self.batch_count = 0
- np.random.shuffle(self.annotations)
- raise StopIteration
-
- def random_horizontal_flip(self, image, bboxes):
- if random.random() < 0.5:
- _, w, _ = image.shape
- image = image[:, ::-1, :]
- bboxes[:, [0, 2]] = w - bboxes[:, [2, 0]]
-
- return image, bboxes
-
- def random_crop(self, image, bboxes):
- if random.random() < 0.5:
- h, w, _ = image.shape
- max_bbox = np.concatenate(
- [
- np.min(bboxes[:, 0:2], axis=0),
- np.max(bboxes[:, 2:4], axis=0),
- ],
- axis=-1,
- )
-
- max_l_trans = max_bbox[0]
- max_u_trans = max_bbox[1]
- max_r_trans = w - max_bbox[2]
- max_d_trans = h - max_bbox[3]
-
- crop_xmin = max(
- 0, int(max_bbox[0] - random.uniform(0, max_l_trans))
- )
- crop_ymin = max(
- 0, int(max_bbox[1] - random.uniform(0, max_u_trans))
- )
- crop_xmax = max(
- w, int(max_bbox[2] + random.uniform(0, max_r_trans))
- )
- crop_ymax = max(
- h, int(max_bbox[3] + random.uniform(0, max_d_trans))
- )
-
- image = image[crop_ymin:crop_ymax, crop_xmin:crop_xmax]
-
- bboxes[:, [0, 2]] = bboxes[:, [0, 2]] - crop_xmin
- bboxes[:, [1, 3]] = bboxes[:, [1, 3]] - crop_ymin
-
- return image, bboxes
-
- def random_translate(self, image, bboxes):
- if random.random() < 0.5:
- h, w, _ = image.shape
- max_bbox = np.concatenate(
- [
- np.min(bboxes[:, 0:2], axis=0),
- np.max(bboxes[:, 2:4], axis=0),
- ],
- axis=-1,
- )
-
- max_l_trans = max_bbox[0]
- max_u_trans = max_bbox[1]
- max_r_trans = w - max_bbox[2]
- max_d_trans = h - max_bbox[3]
-
- tx = random.uniform(-(max_l_trans - 1), (max_r_trans - 1))
- ty = random.uniform(-(max_u_trans - 1), (max_d_trans - 1))
-
- M = np.array([[1, 0, tx], [0, 1, ty]])
- image = cv2.warpAffine(image, M, (w, h))
-
- bboxes[:, [0, 2]] = bboxes[:, [0, 2]] + tx
- bboxes[:, [1, 3]] = bboxes[:, [1, 3]] + ty
-
- return image, bboxes
-
- def parse_annotation(self, annotation):
- line = annotation.split()
- image_path = line[0]
- if not os.path.exists(image_path):
- raise KeyError("%s does not exist ... " % image_path)
- image = cv2.imread(image_path)
- if self.dataset_type == "converted_coco":
- bboxes = np.array(
- [list(map(int, box.split(","))) for box in line[1:]]
- )
- elif self.dataset_type == "yolo":
- height, width, _ = image.shape
- bboxes = np.array(
- [list(map(float, box.split(","))) for box in line[1:]]
- )
- bboxes = bboxes * np.array([width, height, width, height, 1])
- bboxes = bboxes.astype(np.int64)
-
- if self.data_aug:
- image, bboxes = self.random_horizontal_flip(
- np.copy(image), np.copy(bboxes)
- )
- image, bboxes = self.random_crop(np.copy(image), np.copy(bboxes))
- image, bboxes = self.random_translate(
- np.copy(image), np.copy(bboxes)
- )
-
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
- image, bboxes = utils.image_preprocess(
- np.copy(image),
- [self.train_input_size, self.train_input_size],
- np.copy(bboxes),
- )
- return image, bboxes
-
-
- def preprocess_true_boxes(self, bboxes):
- label = [
- np.zeros(
- (
- self.train_output_sizes[i],
- self.train_output_sizes[i],
- self.anchor_per_scale,
- 5 + self.num_classes,
- )
- )
- for i in range(3)
- ]
- bboxes_xywh = [np.zeros((self.max_bbox_per_scale, 4)) for _ in range(3)]
- bbox_count = np.zeros((3,))
-
- for bbox in bboxes:
- bbox_coor = bbox[:4]
- bbox_class_ind = bbox[4]
-
- onehot = np.zeros(self.num_classes, dtype=np.float)
- onehot[bbox_class_ind] = 1.0
- uniform_distribution = np.full(
- self.num_classes, 1.0 / self.num_classes
- )
- deta = 0.01
- smooth_onehot = onehot * (1 - deta) + deta * uniform_distribution
-
- bbox_xywh = np.concatenate(
- [
- (bbox_coor[2:] + bbox_coor[:2]) * 0.5,
- bbox_coor[2:] - bbox_coor[:2],
- ],
- axis=-1,
- )
- bbox_xywh_scaled = (
- 1.0 * bbox_xywh[np.newaxis, :] / self.strides[:, np.newaxis]
- )
-
- iou = []
- exist_positive = False
- for i in range(3):
- anchors_xywh = np.zeros((self.anchor_per_scale, 4))
- anchors_xywh[:, 0:2] = (
- np.floor(bbox_xywh_scaled[i, 0:2]).astype(np.int32) + 0.5
- )
- anchors_xywh[:, 2:4] = self.anchors[i]
-
- iou_scale = utils.bbox_iou(
- bbox_xywh_scaled[i][np.newaxis, :], anchors_xywh
- )
- iou.append(iou_scale)
- iou_mask = iou_scale > 0.3
-
- if np.any(iou_mask):
- xind, yind = np.floor(bbox_xywh_scaled[i, 0:2]).astype(
- np.int32
- )
-
- label[i][yind, xind, iou_mask, :] = 0
- label[i][yind, xind, iou_mask, 0:4] = bbox_xywh
- label[i][yind, xind, iou_mask, 4:5] = 1.0
- label[i][yind, xind, iou_mask, 5:] = smooth_onehot
-
- bbox_ind = int(bbox_count[i] % self.max_bbox_per_scale)
- bboxes_xywh[i][bbox_ind, :4] = bbox_xywh
- bbox_count[i] += 1
-
- exist_positive = True
-
- if not exist_positive:
- best_anchor_ind = np.argmax(np.array(iou).reshape(-1), axis=-1)
- best_detect = int(best_anchor_ind / self.anchor_per_scale)
- best_anchor = int(best_anchor_ind % self.anchor_per_scale)
- xind, yind = np.floor(
- bbox_xywh_scaled[best_detect, 0:2]
- ).astype(np.int32)
-
- label[best_detect][yind, xind, best_anchor, :] = 0
- label[best_detect][yind, xind, best_anchor, 0:4] = bbox_xywh
- label[best_detect][yind, xind, best_anchor, 4:5] = 1.0
- label[best_detect][yind, xind, best_anchor, 5:] = smooth_onehot
-
- bbox_ind = int(
- bbox_count[best_detect] % self.max_bbox_per_scale
- )
- bboxes_xywh[best_detect][bbox_ind, :4] = bbox_xywh
- bbox_count[best_detect] += 1
- label_sbbox, label_mbbox, label_lbbox = label
- sbboxes, mbboxes, lbboxes = bboxes_xywh
- return label_sbbox, label_mbbox, label_lbbox, sbboxes, mbboxes, lbboxes
-
- def __len__(self):
- return self.num_batchs
|