|
- # Copyright 2020-2021 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
-
- import os
- import time
- from tracemalloc import is_tracing
- import numpy as np
- import cv2
- from src.mask_rcnn_r50 import MaskTextSpotter_Resnet50
- from src.config import config
- from mindspore.train.serialization import load_checkpoint, load_param_into_net
- from mindspore import context, Tensor, nn
- from src.dataset import data_to_mindrecord_byte_image, create_maskrcnn_dataset
- import argparse
-
- parser = argparse.ArgumentParser(description='Masktextspotter')
- parser.add_argument('--checkpoint_path', help='your infer ckpt path', default='')
- parser.add_argument('--icdar_root', help='your dataset path', default='')
-
- def vis_mask(image, mask, alpha=0.7, color=None):
- img_copy = image.copy()
- img_copy[mask >= 0.5] = color
- img_add = cv2.addWeighted(image, alpha, img_copy, 1 - alpha, 0)
- return np.uint8(img_add)
-
- def create_mindrecord_dir(model_prefix="", model_mindrecord_dir=None, is_training=True):
- if not os.path.isdir(model_mindrecord_dir):
- os.makedirs(model_mindrecord_dir)
- if config.dataset == "coco":
- if os.path.isdir(config.coco_root):
- print("Create Mindrecord.")
- data_to_mindrecord_byte_image("coco", True, model_prefix)
- print("Create Mindrecord Done, at {}".format(model_mindrecord_dir))
- elif config.dataset == "icdar":
- if os.path.isdir(config.icdar_root):
- print("Create Mindrecord.")
- data_to_mindrecord_byte_image("icdar", True, model_prefix)
- print("Create Mindrecord Done, at {}".format(model_mindrecord_dir))
- else:
- if os.path.isdir(config.IMAGE_DIR) and os.path.exists(config.ANNO_PATH):
- print("Create Mindrecord.")
- data_to_mindrecord_byte_image("other", True, model_prefix)
- print("Create Mindrecord Done, at {}".format(model_mindrecord_dir))
- else:
- raise Exception("IMAGE_DIR or ANNO_PATH not exits.")
-
-
- def apply_gt_mask(gt_box_list, imgs):
- mean = [102.9801, 115.9465, 122.7717]
- imgs[:, 0, :, :] += mean[0]
- imgs[:, 1, :, :] += mean[1]
- imgs[:, 2, :, :] += mean[2]
- # mean = [102.9801, 115.9465, 122.7717]
- imgs = np.ascontiguousarray(imgs.transpose(0, 2, 3, 1))
-
- for i, single_gt in enumerate(gt_box_list):
- img = np.array(imgs[i], dtype=np.uint8)
- for instance in single_gt:
- x0, y0, x1, y1 = instance
- if instance.all() == 0:
- continue
- else:
- cv2.line(
- img,
- (int(x0), int(y0)),
- (int(x0), int(y1)),
- color=(255, 0, 0),
- thickness=3,
- )
- cv2.line(
- img,
- (int(x0), int(y1)),
- (int(x1), int(y1)),
- color=(255, 0, 0),
- thickness=3,
- )
- cv2.line(
- img,
- (int(x1), int(y1)),
- (int(x1), int(y0)),
- color=(255, 0, 0),
- thickness=3,
- )
- cv2.line(
- img,
- (int(x1), int(y0)),
- (int(x0), int(y0)),
- color=(255, 0, 0),
- thickness=3,
- )
- cv2.imwrite("mask_imgs_gt_.jpg", img)
-
-
- def voc_ap(rec, prec, use_07_metric=False):
- """Compute VOC AP given precision and recall. If use_07_metric is true, uses
- the VOC 07 11-point method (default:False).
- """
- if use_07_metric:
- # 11 point metric
- ap = 0.0
- for t in np.arange(0.0, 1.1, 0.1):
- if np.sum(rec >= t) == 0:
- p = 0
- else:
- p = np.max(prec[rec >= t])
- ap = ap + p / 11.0
- else:
- # correct AP calculation
- # first append sentinel values at the end
- mrec = np.concatenate(([0.0], rec, [1.0]))
- mpre = np.concatenate(([0.0], prec, [0.0]))
-
- # compute the precision envelope
- for i in range(mpre.size - 1, 0, -1):
- mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
-
- # to calculate area under PR curve, look for points
- # where X axis (recall) changes value
- i = np.where(mrec[1:] != mrec[:-1])[0]
-
- # and sum (\Delta recall) * prec
- ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
- return ap
-
-
- def get_metric(results_1):
- det_thresh = 0.8
- v_precision = []
- v_recall = []
- v_f_measure = []
- for _, single_sample in enumerate(results_1):
- box_list = single_sample["bbox_pred"][0]
- gt_box_list = single_sample["bbox_gt"][0]
- box_list = box_list[box_list[:, 4] >= det_thresh]
- BBGT = gt_box_list[~(gt_box_list == 0).all(1)].astype(np.float64)
- nd = len(box_list)
- tp = np.zeros(nd)
- fp = np.zeros(nd)
- ovthresh = 0.5
- det = [False] * len(BBGT)
-
- for d in range(nd):
- bb = box_list[d, :4].astype(np.float64)
- ovmax = -np.inf
- ixmin = np.maximum(BBGT[:, 0], bb[0])
- iymin = np.maximum(BBGT[:, 1], bb[1])
- ixmax = np.minimum(BBGT[:, 2], bb[2])
- iymax = np.minimum(BBGT[:, 3], bb[3])
- iw = np.maximum(ixmax - ixmin + 1.0, 0.0)
- ih = np.maximum(iymax - iymin + 1.0, 0.0)
- inters = iw * ih
-
- # union
- uni = (
- (bb[2] - bb[0] + 1.0) * (bb[3] - bb[1] + 1.0)
- + (BBGT[:, 2] - BBGT[:, 0] + 1.0) * (BBGT[:, 3] - BBGT[:, 1] + 1.0)
- - inters
- )
-
- overlaps = inters / uni
- ovmax = np.max(overlaps)
- jmax = np.argmax(overlaps)
-
- if ovmax > ovthresh:
- if not det[jmax]:
- tp[d] = 1.0
- det[jmax] = 1
- else:
- fp[d] = 1.0
- else:
-
- fp[d] = 1.0
- # fp = np.cumsum(fp)
- # tp = np.cumsum(tp)
- fp = np.sum(fp)
- tp = np.sum(tp)
- rec = tp / len(BBGT)
- prec = tp / np.maximum(tp + fp, np.finfo(np.float64).eps)
- # ap = voc_ap(rec, prec, True)
- fscore = 2 * (prec * rec) / np.maximum((prec + rec), np.finfo(np.float64).eps)
-
- v_precision.append(prec)
- v_recall.append(rec)
- v_f_measure.append(fscore)
-
- v_precision = np.array(v_precision)[~np.isnan(v_precision)]
- v_recall = np.array(v_recall)[~np.isnan(v_recall)]
- v_f_measure = np.array(v_f_measure)[~np.isnan(v_f_measure)]
-
- print("precision:", v_precision, "recall_all", v_recall, "f_measure", v_f_measure)
-
- precision_all = np.average(v_precision)
- recall_all = np.average(v_recall)
- f_measure_all = np.average(f_measure)
-
- return precision_all, recall_all, f_measure_all
-
-
- def vis_results(vis_results_v):
-
- for index_i, single_sample in enumerate(vis_results_v):
- box_list = single_sample["bbox_pred"]
- seg_list = single_sample["seg_pred"]
- gt_box_list = single_sample["bbox_gt"]
- masks_gt = single_sample["seg_gt"]
- imgs = single_sample["image"]
- img_meta = single_sample["shape"]
- ori_shape = img_meta[:2].astype(np.int32)
- mean = [102.9801, 115.9465, 122.7717]
- imgs[:, 0, :, :] += mean[0]
- imgs[:, 1, :, :] += mean[1]
- imgs[:, 2, :, :] += mean[2]
-
- imgs = np.ascontiguousarray(imgs.transpose(0, 2, 3, 1))
- for i, single_pred in enumerate(box_list):
- img = np.array(imgs[i], dtype=np.uint8)
- img = cv2.resize(img, (ori_shape[1], ori_shape[0]))
-
- for p_j, instance in enumerate(single_pred):
- x0, y0, x1, y1, score = instance
- if score >= 0.8:
- cv2.line(img, (int(x0), int(y0)), (int(x0), int(y1)), color=(255, 0, 0), thickness=2,)
- cv2.line(img, (int(x0), int(y1)), (int(x1), int(y1)), color=(255, 0, 0), thickness=2,)
- cv2.line(img, (int(x1), int(y1)), (int(x1), int(y0)), color=(255, 0, 0), thickness=2,)
- cv2.line(img, (int(x1), int(y0)), (int(x0), int(y0)), color=(255, 0, 0), thickness=2,)
- img = vis_mask(img, seg_list[i][p_j], color=[255, 0, 0])
- for i, single_gt in enumerate(gt_box_list):
- img2 = np.array(imgs[i], dtype=np.uint8)
- img2 = cv2.resize(img2, (ori_shape[1], ori_shape[0]))
- for g_j, instance in enumerate(single_gt):
- x0, y0, x1, y1 = instance
-
- if instance.all() == 0:
- continue
- else:
- img2 = vis_mask(img2, masks_gt[i][g_j], color=[0, 0, 255])
-
- cv2.line(
- img2,
- (int(x0), int(y0)),
- (int(x0), int(y1)),
- color=(0, 0, 255),
- thickness=2,
- )
- cv2.line(
- img2,
- (int(x0), int(y1)),
- (int(x1), int(y1)),
- color=(0, 0, 255),
- thickness=2,
- )
- cv2.line(
- img2,
- (int(x1), int(y1)),
- (int(x1), int(y0)),
- color=(0, 0, 255),
- thickness=2,
- )
- cv2.line(
- img2,
- (int(x1), int(y0)),
- (int(x0), int(y0)),
- color=(0, 0, 255),
- thickness=2,
- )
- outputs = np.hstack((img, img2))
- cv2.imwrite("outputs/mask_imgs_results_" + str(index_i) + ".jpg", outputs)
-
-
- def bbox2result_1image(bboxes, labels, num_classes):
- """Convert detection results to a list of numpy arrays.
-
- Args:
- bboxes (Tensor): shape (n, 5)
- labels (Tensor): shape (n, )
- num_classes (int): class number, including background class
-
- Returns:
- list(ndarray): bbox results of each class
- """
- if bboxes.shape[0] == 0:
- result = [np.zeros((0, 5), dtype=np.float32) for i in range(num_classes - 1)]
- else:
- result = [bboxes[labels == i, :] for i in range(num_classes - 1)]
-
- return result
-
-
- def get_seg_masks(mask_pred, det_bboxes, det_labels, img_meta, rescale, num_classes):
- """Get segmentation masks from mask_pred and bboxes"""
- mask_pred = mask_pred.astype(np.float32)
-
- cls_segms_decode = [[] for _ in range(num_classes - 1)]
- cls_segms = [[] for _ in range(num_classes - 1)]
- bboxes = det_bboxes[:, :4]
- labels = det_labels + 1
-
- ori_shape = img_meta[:2].astype(np.int32)
- scale_factor = img_meta[2:].astype(np.int32)
-
- if rescale:
- img_h, img_w = ori_shape[:2]
- else:
- img_h = np.round(ori_shape[0] * scale_factor[0]).astype(np.int32)
- img_w = np.round(ori_shape[1] * scale_factor[1]).astype(np.int32)
-
- for i in range(bboxes.shape[0]):
- bbox = (bboxes[i, :] / 1.0).astype(np.int32)
- label = labels[i]
- w = max(bbox[2] - bbox[0] + 1, 1)
- h = max(bbox[3] - bbox[1] + 1, 1)
- w = min(w, img_w - bbox[0])
- h = min(h, img_h - bbox[1])
- if w <= 0 or h <= 0:
- print(
- "there is invalid proposal bbox, index={} bbox={} w={} h={}".format(
- i, bbox, w, h
- )
- )
- w = max(w, 1)
- h = max(h, 1)
- mask_pred_ = mask_pred[i, :, :]
- im_mask = np.zeros((img_h, img_w), dtype=np.uint8)
- bbox_mask = cv2.resize(mask_pred_, (w, h), interpolation=cv2.INTER_LINEAR)
- bbox_mask = (bbox_mask > config.mask_thr_binary).astype(np.uint8)
- im_mask[bbox[1] : bbox[1] + h, bbox[0] : bbox[0] + w] = bbox_mask
-
- #rle = maskUtils.encode(np.array(im_mask[:, :, np.newaxis], order="F"))[0]
- #cls_segms_decode[label - 1].append(rle)
- cls_segms[label - 1].append(im_mask)
-
- return cls_segms_decode, cls_segms
-
- args = parser.parse_args()
- #ckpt_path = "ckpt/mask_text_spotter__1-1_7908.ckpt"
- context.set_context(mode=context.GRAPH_MODE, device_target="Ascend", device_id=1)
-
- net = MaskTextSpotter_Resnet50(config=config)
- param_dict = load_checkpoint(args.checkpoint_path)
- load_param_into_net(net, param_dict)
- net.set_train(False)
-
-
- prefix = "MaskRcnn.mindrecord"
- mindrecord_dir = os.path.join(config.icdar_root, config.mindrecord_test_dir)
- mindrecord_file = os.path.join( mindrecord_dir, prefix)
- if not os.path.exists(mindrecord_file):
- create_mindrecord_dir(model_prefix=prefix, model_mindrecord_dir=mindrecord_dir, is_training=True)
- ds = create_maskrcnn_dataset(mindrecord_file,
- batch_size=config.test_batch_size, is_training=False)
- dataset_size = ds.get_dataset_size()
-
- print("\n========================================\n")
- print("total images num: ", dataset_size)
- print("Processing, please wait a moment.")
- max_num = 128
- eval_iter = 0
- results = []
- for index, data in enumerate(ds.create_dict_iterator(output_numpy=True, num_epochs=1)):
- # if index>5:
- # continue
- eval_iter += 1
- img_data = data["image"]
- img_metas = data["image_shape"]
- gt_bboxes = data["box"]
- gt_labels = data["label"]
- gt_num = data["valid_num"]
- mask_gt = data["mask_gt"]
- mask_char = data["mask_char"]
- # apply_gt_mask(gt_bboxes, img_data)
-
- start = time.time()
- # run net
- output = net(
- Tensor(img_data),
- Tensor(img_metas),
- Tensor(gt_bboxes),
- Tensor(gt_labels),
- Tensor(gt_num),
- Tensor(mask_gt),
- Tensor(mask_char),
- )
-
- end = time.time()
- print("Iter {} cost time {}".format(eval_iter, end - start))
-
- # output
- all_bbox = output[0]
- all_label = output[1]
- all_mask = output[2]
- all_mask_fb = output[3]
- # all_bboxes, all_labels, all_masks, all_masks_fb = multiclass_nms(all_bbox, all_label, all_mask, all_mask_fb)
- for j in range(config.test_batch_size):
- all_bbox_squee = np.squeeze(all_bbox.asnumpy()[j, :, :])
- all_label_squee = np.squeeze(all_label.asnumpy()[j, :, :])
- all_mask_squee = np.squeeze(all_mask.asnumpy()[j, :, :])
- all_mask_fb_squee = np.squeeze(all_mask_fb.asnumpy()[j, :, :, :])
-
- # all_bbox_squee = all_bbox[j][1].asnumpy()
- # all_label_squee = all_label[j].asnumpy()
- # all_mask_squee = all_mask[j].asnumpy()
- # all_mask_fb_squee = all_mask_fb[j].asnumpy()
-
- all_bboxes_tmp_mask = all_bbox_squee[all_mask_squee, :]
- all_labels_tmp_mask = all_label_squee[all_mask_squee]
- all_mask_fb_tmp_mask = all_mask_fb_squee[all_mask_squee, :, :]
-
- if all_bboxes_tmp_mask.shape[0] > max_num:
- inds = np.argsort(-all_bboxes_tmp_mask[:, -1])
- inds = inds[:max_num]
- all_bboxes_tmp_mask = all_bboxes_tmp_mask[inds]
- all_labels_tmp_mask = all_labels_tmp_mask[inds]
- all_mask_fb_tmp_mask = all_mask_fb_tmp_mask[inds]
-
- bbox_results = bbox2result_1image(
- all_bboxes_tmp_mask, all_labels_tmp_mask, config.num_classes
- )
- segm_results = get_seg_masks(
- all_mask_fb_tmp_mask,
- all_bboxes_tmp_mask,
- all_labels_tmp_mask,
- img_metas[j],
- True,
- config.num_classes,
- )
-
- results.append(
- {
- "bbox_pred": bbox_results,
- "seg_pred": segm_results,
- "bbox_gt": gt_bboxes,
- "seg_gt": mask_gt,
- "image": img_data,
- "char_mask": mask_char,
- "shape": img_metas[j],
- }
- )
- precision, recall, f_measure = get_metric(results)
-
- print(
- "mean_precision: ",
- precision,
- "mean_recall: ",
- recall,
- "mean_f_measure:",
- f_measure,
- )
- vis_results(results)
|