|
- import cv2
- import tqdm
- import torch
- import ctypes
- import numpy as np
- import matplotlib.pyplot as plt
- from scipy.ndimage import zoom as scizoom
- from wand.image import Image as WandImage
- from wand.api import library as wandlibrary
-
-
- # Tell Python about the C method
- wandlibrary.MagickMotionBlurImage.argtypes = (ctypes.c_void_p, # wand
- ctypes.c_double, # radius
- ctypes.c_double, # sigma
- ctypes.c_double) # angle
-
-
- class MotionImage(WandImage):
- def motion_blur(self, radius=0.0, sigma=0.0, angle=0.0):
- wandlibrary.MagickMotionBlurImage(self.wand, radius, sigma, angle)
-
-
- def clipped_zoom(img, zoom_factor):
- h = img.shape[0]
- # ceil crop height(= crop width)
- ch = int(np.ceil(h / float(zoom_factor)))
-
- top = (h - ch) // 2
- img = scizoom(img[top:top + ch, top:top + ch], (zoom_factor, zoom_factor, 1), order=1)
- # trim off any extra pixels
- trim_top = (img.shape[0] - h) // 2
-
- return img[trim_top:trim_top + h, trim_top:trim_top + h]
-
-
- def disk(radius, alias_blur=0.1, dtype=np.float32):
- if radius <= 8:
- L = np.arange(-8, 8 + 1)
- ksize = (3, 3)
- else:
- L = np.arange(-radius, radius + 1)
- ksize = (5, 5)
- X, Y = np.meshgrid(L, L)
- aliased_disk = np.array((X ** 2 + Y ** 2) <= radius ** 2, dtype=dtype)
- aliased_disk /= np.sum(aliased_disk)
-
- # supersample disk to antialias
- return cv2.GaussianBlur(aliased_disk, ksize=ksize, sigmaX=alias_blur)
-
-
- def plasma_fractal(mapsize=256, wibbledecay=3):
- """
- Generate a heightmap using diamond-square algorithm.
- Return square 2d array, side length 'mapsize', of floats in range 0-255.
- 'mapsize' must be a power of two.
- """
- assert (mapsize & (mapsize - 1) == 0)
- maparray = np.empty((mapsize, mapsize), dtype=np.float_)
- maparray[0, 0] = 0
- stepsize = mapsize
- wibble = 100
-
- def wibbledmean(array):
- return array / 4 + wibble * np.random.uniform(-wibble, wibble, array.shape)
-
- def fillsquares():
- """For each square of points stepsize apart,
- calculate middle value as mean of points + wibble"""
- cornerref = maparray[0:mapsize:stepsize, 0:mapsize:stepsize]
- squareaccum = cornerref + np.roll(cornerref, shift=-1, axis=0)
- squareaccum += np.roll(squareaccum, shift=-1, axis=1)
- maparray[stepsize // 2:mapsize:stepsize,
- stepsize // 2:mapsize:stepsize] = wibbledmean(squareaccum)
-
- def filldiamonds():
- """For each diamond of points stepsize apart,
- calculate middle value as mean of points + wibble"""
- mapsize = maparray.shape[0]
- drgrid = maparray[stepsize // 2:mapsize:stepsize, stepsize // 2:mapsize:stepsize]
- ulgrid = maparray[0:mapsize:stepsize, 0:mapsize:stepsize]
- ldrsum = drgrid + np.roll(drgrid, 1, axis=0)
- lulsum = ulgrid + np.roll(ulgrid, -1, axis=1)
- ltsum = ldrsum + lulsum
- maparray[0:mapsize:stepsize, stepsize // 2:mapsize:stepsize] = wibbledmean(ltsum)
- tdrsum = drgrid + np.roll(drgrid, 1, axis=1)
- tulsum = ulgrid + np.roll(ulgrid, -1, axis=0)
- ttsum = tdrsum + tulsum
- maparray[stepsize // 2:mapsize:stepsize, 0:mapsize:stepsize] = wibbledmean(ttsum)
-
- while stepsize >= 2:
- fillsquares()
- filldiamonds()
- stepsize //= 2
- wibble /= wibbledecay
-
- maparray -= maparray.min()
- return maparray / maparray.max()
-
-
- def local_shuffle(image, patch_size=(80, 80)):
- imhigh, imwidth, imch = image.shape
- range_y = np.arange(0, imhigh - patch_size[0], patch_size[0])
- range_x = np.arange(0, imwidth - patch_size[1], patch_size[1])
- if range_y[-1] != imhigh - patch_size[0]:
- range_y = np.append(range_y, imhigh - patch_size[0])
- if range_x[-1] != imwidth - patch_size[1]:
- range_x = np.append(range_x, imwidth - patch_size[1])
- sz = len(range_y) * len(range_x)
- res = np.zeros((sz, patch_size[0], patch_size[1], imch))
- index = 0
- for y in range_y:
- for x in range_x:
- patch = image[y:y + patch_size[0], x:x + patch_size[1]]
- res[index] = patch
- index = index + 1
- np.random.shuffle(res)
-
- idx = 0
- new_img = np.zeros(image.shape)
- for y in range_y:
- for x in range_x:
- new_img[y:y + patch_size[0], x:x + patch_size[1]] = res[idx]
- idx += 1
- return new_img
-
-
- def mixup_data(x, y, alpha=1.0):
- '''Returns mixed inputs, pairs of targets, and lambda'''
- if alpha > 0:
- lam = np.random.beta(alpha, alpha)
- else:
- lam = 1
-
- batch_size = x.size()[0]
- index = torch.randperm(batch_size).to(x.device)
-
- mixed_x = lam * x + (1 - lam) * x[index, :]
- y_a, y_b = y, y[index]
- return mixed_x, y_a, y_b, lam
-
-
- def mixup_criterion(criterion, pred, y_a, y_b, lam):
- return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)
-
-
- def extend(src, offsets=5):
- dst = src.copy()
- rows, cols, _ = src.shape
- x = np.array(list(range(rows)))
- y = np.array(list(range(cols)))
- yv, xv = np.meshgrid(y, x)
-
- random1 = np.random.randint(0, offsets, size=yv.shape)
- random2 = np.random.randint(0, offsets, size=xv.shape)
-
- yv_random = yv + random1
- xv_random = xv + random2
-
- xv_random[xv_random >= rows] = rows - 1
- yv_random[yv_random >= cols] = cols - 1
- dst[xv, yv] = src[xv_random, yv_random]
- return dst
-
-
- def rand_bbox(size, lam):
- W, H = size[2], size[3]
- cut_rat = np.sqrt(1. - lam)
- cut_w = np.int(W * cut_rat)
- cut_h = np.int(H * cut_rat)
- # uniform
- cx = np.random.randint(W)
- cy = np.random.randint(H)
- bbx1 = np.clip(cx - cut_w // 2, 0, W)
- bby1 = np.clip(cy - cut_h // 2, 0, H)
- bbx2 = np.clip(cx + cut_w // 2, 0, W)
- bby2 = np.clip(cy + cut_h // 2, 0, H)
- return bbx1, bby1, bbx2, bby2
-
-
- def cutmix(x, y):
- lam = np.random.beta(1.0, 1.0)
- rand_index = torch.randperm(x.size()[0])
- target_a = y
- target_b = y[rand_index]
- bbx1, bby1, bbx2, bby2 = rand_bbox(x.size(), lam)
- x[:, :, bbx1:bbx2, bby1:bby2] = x[rand_index, :, bbx1:bbx2, bby1:bby2]
- lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (x.size()[-1] * x.size()[-2]))
- return x, target_a, target_b, lam
-
-
- def rain_noise(img, value, angle, beta):
- noise = get_noise(img, value=value)
- rain = rain_blur(noise, length=10, angle=angle, w=3)
- result_img = alpha_rain(rain, img, beta=beta)
- return result_img
-
-
- def get_noise(img, value=10):
- noise = np.random.uniform(0, 256, img.shape[0:2])
- v = value * 0.01
- noise[np.where(noise < (256 - v))] = 0
- k = np.array([[0, 0.1, 0],
- [0.1, 8, 0.1],
- [0, 0.1, 0]])
-
- noise = cv2.filter2D(noise, -1, k)
- return noise
-
-
- def rain_blur(noise, length=10, angle=0, w=1):
- trans = cv2.getRotationMatrix2D((length / 2, length / 2), angle - 45, 1 - length / 100.0)
- dig = np.diag(np.ones(length))
- k = cv2.warpAffine(dig, trans, (length, length))
- k = cv2.GaussianBlur(k, (w, w), 0)
- blurred = cv2.filter2D(noise, -1, k)
- cv2.normalize(blurred, blurred, 0, 255, cv2.NORM_MINMAX)
- blurred = np.array(blurred, dtype=np.uint8)
- return blurred
-
-
- def alpha_rain(rain, img, beta=0.8):
- rain = np.expand_dims(rain, 2)
- rain_effect = np.concatenate((img, rain), axis=2) # add alpha channel
-
- rain_result = img.copy()
- rain = np.array(rain, dtype=np.float32)
- rain_result[:, :, 0] = rain_result[:, :, 0] * (255 - rain[:, :, 0]) / 255.0 + beta * rain[:, :, 0]
- rain_result[:, :, 1] = rain_result[:, :, 1] * (255 - rain[:, :, 0]) / 255 + beta * rain[:, :, 0]
- rain_result[:, :, 2] = rain_result[:, :, 2] * (255 - rain[:, :, 0]) / 255 + beta * rain[:, :, 0]
- return rain_result
-
-
- def motion_blur(image, degree=12, angle=45):
- # image : numpy array
- M = cv2.getRotationMatrix2D((degree / 2, degree / 2), angle, 1)
- motion_blur_kernel = np.diag(np.ones(degree))
- motion_blur_kernel = cv2.warpAffine(motion_blur_kernel, M, (degree, degree))
-
- motion_blur_kernel = motion_blur_kernel / degree
- blurred = cv2.filter2D(image, -1, motion_blur_kernel)
-
- # convert to uint8
- cv2.normalize(blurred, blurred, 0, 255, cv2.NORM_MINMAX)
- blurred = np.array(blurred, dtype=np.uint8)
- return blurred
-
-
- def draw_hist(img_dict):
- x = [str(i) for i in range(20)]
- y = [img_dict[name] for name in x]
- plt.bar(x, y, align='center', color='b', tick_label=x, alpha=0.6)
-
- plt.xlabel('image category')
- plt.ylabel('image counts')
- plt.title('Distribution within Training Images')
-
- plt.grid(True, axis='y', ls=':', color='r', alpha=0.3)
- plt.show()
-
-
- def parse_txt(txt_name):
- with open(txt_name, 'r') as f:
- lines = f.readlines()
- f.close()
-
- training_pairs = list()
- img_counts = dict()
- for line in tqdm.tqdm(lines):
- img_name, label = line.strip().split(' ')
- training_pairs.append((img_name, int(label)))
- if label not in img_counts.keys():
- img_counts[label] = 0
- img_counts[label] += 1
- return training_pairs, img_counts
-
-
- if __name__ == '__main__':
- txt_name = 'data/train_phase1/label.txt'
- training_pairs, img_counts = parse_txt(txt_name)
- draw_hist(img_counts)
|