|
- # -*- coding: utf-8 -*-
- '''
- 视频交通事故检测算法
- '''
- import argparse
- import time
- import numpy as np
- import os
- import sys
-
-
- # sys.path.append(os.path.join(os.getcwd(), r"alg_manager/RoadAccidentDetector"))
- sys.path.append(os.getcwd())
-
-
- from I3D.extract_flow_features import *
- from I3D.extract_rgb_features import *
- from AnoDetector.RN_Vanilla import RNVanilla
- import yaml
-
-
- class RADetector():
- '''
- 检测算法
- '''
- # 说明检测算法检测的类别
- __id2cat__ = {0: "normal", 1: "accident"}
-
- def min_max_normalize(self, scores):
- if not self.use_norm:
- return scores
- norm_scores = (scores - np.min(scores)) / (np.max(scores) - np.min(scores))
- return norm_scores
-
- @staticmethod
- def predict_anomaly_scores(vfeats, net):
- features = torch.nn.functional.normalize(vfeats, p=2, dim=1)
- feat_num = features.size()[0]
- segments = []
- if feat_num < 33:
- indices = torch.from_numpy(np.linspace(0, feat_num, 32, endpoint=False, dtype=np.int))
- for index in indices:
- segments.append(features[index].unsqueeze(0))
- else:
- indices = torch.from_numpy(np.linspace(0, feat_num, 33, endpoint=True, dtype=np.int))
- for k in range(indices.shape[0] - 1):
- cur_seg_feats = features[indices[k]:indices[k + 1]].mean(dim=0).unsqueeze(0)
- segments.append(cur_seg_feats)
- segments = torch.stack(segments)
- with torch.no_grad():
- return net(segments.cuda()).squeeze().cpu().numpy()
-
- @staticmethod
- def merge_score(score1, score2):
- cs1 = score1.squeeze()
- cs2 = score2.squeeze()
- ms = cs1 * cs2
- return ms
-
- def __init__(self, config_path: str, *arg, **kwarg):
- '''
- 初始化算法模型。
- 注:请详细说明输入和输出的类型,以及输入输出值的含义。
- Arg
- ---
- config_path: .yaml配置文件的路径.
- '''
- with open(config_path, 'r', encoding='gb18030', errors='ignore') as yaml_file:
- config = yaml_file.read()
- config_dict = yaml.safe_load(config)
- self.gpu = config_dict['gpu']
- self.read_mode = config_dict['read_mode']
- self.use_merge = config_dict['use_merge']
- self.use_norm = config_dict['use_norm']
- self.vpath = config_dict['vpath']
- # self.outdir = config_dict['outdir']
- self.rgb_mdir = config_dict['rgb_mdir']
- self.flow_mdir = config_dict['flow_mdir']
- # if not os.path.exists(self.outdir):
- # os.makedirs(self.outdir)
- os.environ['CUDA_VISIBLE_DEVICES'] = str(self.gpu)
- self.rgb_net = RNVanilla(pretrained=True, model_dir=self.rgb_mdir, input_dim=1024).eval().cuda()
- self.flow_net = RNVanilla(pretrained=True, model_dir=self.flow_mdir, input_dim=1024).eval().cuda()
- # self.fps_list = []
- pass
-
- def __call__(self, frames: list = None) -> np.array:
- '''
- 对视频进行检测,返回一个 N x 32的分数数组,对应N个视频,每个视频32段的事故异常分数,同时每个视频的
- 分数数组会以.npy形式输出到self.outdir文件夹中
- Arg
- ---
- frames:list 输入帧,以列表存放的帧图像,每张图像使用cv2读入
- Return
- ---
- np.array
- shape: (32, ) Each element represents the anomaly score (0.0 ~ 1.0) of the corresponding v
- ideo segment in the video. Each video is split into 32 segments.
- type: np.float
- '''
- print(f'Read Mode:{self.read_mode}')
- if self.read_mode == 'frames':
- assert frames is not None
- print('Apply Transformation on Input Frames')
- frames = [video_tf(Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))) for frame in frames]
- video_path = self.vpath
- score_list = []
- print(f'Detecting start ...')
- start_t = time.time()
- print(f'Extracting RGB Features ...')
- rgb_feat, frame_num = extract_rgb_features(videopath=video_path, read_mode=self.read_mode, frames=frames)
- print(f'Extracting FLOW Features...')
- flow_feat, _ = extract_flow_features(videopath=video_path, read_mode=self.read_mode, frames=frames)
- print(f'Detecting with RGB features...')
- rgb_scores = self.predict_anomaly_scores(rgb_feat, net=self.rgb_net)
- rgb_scores = self.min_max_normalize(rgb_scores)
- if self.use_merge:
- print(f'Detecting with FLOW features...')
- flow_scores = self.predict_anomaly_scores(flow_feat, net=self.flow_net)
- flow_scores = self.min_max_normalize(flow_scores)
- print(f'Merging Scores...')
- joint_scores = self.merge_score(rgb_scores, flow_scores)
- else:
- joint_scores = rgb_scores
- # print(f'Saving Scores...')
- # score_list.append(joint_scores)
- # np.save(os.path.join(self.outdir, '{}.npy'.format(vname.split('.')[0])), joint_scores)
- end_t = time.time()
- print(f'Detection FPS :{frame_num // (end_t - start_t)}')
- # self.fps_list.append(frame_num / (end_t - start_t))
- # print('avg fps:{}'.format(sum(self.fps_list) // len(self.fps_list)))
- # score_list = np.stack(score_list)
- print(f'Done. \nScores for 32 segments: {joint_scores}')
- return joint_scores
-
-
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='Road Accident Detection')
- parser.add_argument('--config_path', default='config.yaml', type=str, help='Path to the configuration file')
- args = parser.parse_args()
- detect_instance = RADetector(config_path=args.config_path)
- # read sample video
- videopath = 'AnoVideos\\RoadAccidents002_x264.mp4'
- capture = cv2.VideoCapture(videopath)
- frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
- count = 0
- retaining = True
- frames = []
- while count < frame_count and retaining:
- retaining, frame = capture.read()
- if frame is not None:
- frames.append(frame)
- count += 1
- capture.release()
- detect_instance.__call__(frames=frames)
|