PaddleDetection+OpenVINO实现铝片表面缺陷检测
1. 项目展示
2. 项目概述
本项目主要用以解决工业检测中的铝片表面缺陷检测,现代工业中铝合金、钢材等材料的表面缺陷直接影响到产品最终品质与定价,个别缺陷甚至会影响下一个阶段产品的安全可靠性,所以现在工业生产亟需利用人工智能技术手段将智能设备部署于生产线上。该项目针对这一问题,采用PaddlePaddle框架,搭建SSD模型,将其部署到OpenVINO上,同时搭建前端交互页面,实现自动检测、自动处理,降低次品率,提高生产效率,保障生产安全。
3. 技术工具
- 模型训练:
- PaddlePaddle
- PaddleDetection
- 模型后端部署(python):
- 前端:
4. 模型训练
5. PaddleLite部署模型转换
转换参数文档
!pip -q install paddlelite
# 前端
!unzip -oq /home/aistudio/nginx-1.20.2.zip
# 后端
!unzip -oq /home/aistudio/server.zip
# 解压模型
!unzip -oq /home/aistudio/model.zip
# 准备PaddleLite部署模型
!paddle_lite_opt \
--model_file=home/aistudio/model/__model__ \
--param_file=home/aistudio/model/__params__ \
--optimize_out=./ssd_mobilenet_v1_opt \
--optimize_out_type=naive_buffer \
--valid_targets=x86
6. 部署
部署所需要的文件(都已提供):
6.1 前端部署
前端已经完成,只需要执行以下命令即可启动前端(本地Windows环境下)
- 在
nginx-1.20.2
目录下powershell
中,输入命令:./nginx.exe -c conf/det.conf
- 显示以下即启动成功
6.2 python 后端部署
采用的paddlelite第三方库运行nb模型
6.2.1 文件夹内容说明
- server/history:存放历史检测信息
- server/label_list.txt:标签
- server/userinfo.json:用户信息
- server/config.json:配置信息
- server/utils.py:通用程序文件
- server/app.py:主程序文件
- server/ssd_mobilenet_v1_opt.nb:nb模型
6.2.2 utils.py
import cv2
import numpy as np
size = (300, 300)
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
threshold = 0.5
color = [
(237, 189, 101),
(0, 0, 255),
(102, 153, 153),
(255, 0, 0),
(9, 255, 0)
]
# 数据预处理
def preprocess(image):
image = cv2.resize(image.astype('float32'), size) / 255. # HWC
for i in range(image.shape[-1]):
image[:, :, i] = (image[:, :, i] - mean[i]) / std[i]
# image = (image - np.array(mean)) / np.array(std) # HWC
image = image.transpose((2, 0, 1)) # HWC -> CHW
image = np.array([image]) # CHW -> NCHW
return image
def postprocess(output_tensor, image, label_list):
pred = {'result': []}
idx = 0
for item in output_tensor.numpy():
if item[1] < threshold:
continue
pred['result'].append({
"class_name": label_list[int(item[0])],
"score": float(item[1]),
"xmin": int(item[2] * image.shape[1]),
"ymin": int(item[3] * image.shape[0]),
"xmax": int(item[4] * image.shape[1]),
"ymax": int(item[5] * image.shape[0])
})
cv2.rectangle(image, (pred['result'][idx]['xmin'], pred['result'][idx]['ymin']),
(pred['result'][idx]['xmax'], pred['result'][idx]['ymax']), color[int(item[0])], 2)
cv2.putText(image,
'{}.{}?:{}'.format(int(item[0]), pred['result'][idx]['class_name'], pred['result'][idx]['score']),
(pred['result'][idx]['xmin'], pred['result'][idx]['ymin']), cv2.FONT_HERSHEY_SIMPLEX, 0.7,
(255, 255, 255), 1)
idx += 1
pred['image'] = image
return pred
6.2.3 app.py
import base64
from paddlelite.lite import *
from PIL import Image
from flask import Flask, request, jsonify
import json
import os
import sys
from datetime import timedelta
import time
import copy
from utils import *
app = Flask(__name__)
app.secret_key = 'secret!'
# 解决缓存刷新问题
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = timedelta(seconds=1)
# 添加header解决跨域
@app.after_request
def after_request(response):
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Allow-Methods'] = 'POST'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, X-Requested-With'
return response
@app.route('/register', methods=['POST'])
def register():
"""
注册模块
"""
data = json.loads(request.data)['params']
username = data['username']
password = data['password']
if not os.path.exists(app.config['userinfo']):
userinfo_dict = {}
else:
with open(app.config['userinfo']) as f:
userinfo_dict = json.load(f)
# 判断用户是否存在, 用户存在, 注册失败
if userinfo_dict.get(username, None) is not None:
return json.dumps({"state": 0})
# 注册成功
userinfo_dict[username] = password
with open(app.config['userinfo'], 'w') as f:
json.dump(userinfo_dict, f)
return json.dumps({"state": 1})
@app.route('/login', methods=['POST'])
def login():
"""
登录模块
"""
print("==================")
print(request.url)
print("==================")
if not os.path.exists(app.config['userinfo']):
return json.dumps({'state': -1})
data = json.loads(request.data)['params']
username = data['username']
password = data['password']
with open(app.config['userinfo']) as f:
userinfo_dict = json.load(f)
# 判断用户是否存在, 用户不存在
if userinfo_dict.get(username, None) is None:
return json.dumps({"state": -1})
# 判断用户密码是否正确, 用户存在且密码正确
if userinfo_dict.get(username) == password:
return json.dumps({"state": 1})
# 用户存在, 密码错误
return json.dumps({"state": 0})
@app.route('/plot', methods=['GET'])
def plot():
"""
读取历史信息进行可视化
:return:
"""
results = []
for idx, file_name in enumerate(os.listdir(app.config['history_images'])):
with open(os.path.join(app.config['history_infos'], file_name.replace('.jpg', '.json'))) as f:
info = json.load(f)
results.append({'id': idx, **info})
return json.dumps({'history': results})
@app.route('/history', methods=['POST'])
def history():
"""
读取历史预测数据, 两个文件内的文件名一致, 只后缀不一样
"""
data = json.loads(request.data)['params']
pageNum = data['pageNum']
pageSize = data['pageSize']
end = pageNum * pageSize
start = end - pageSize
history_length = len(os.listdir(app.config['history_images']))
if end > history_length:
end = history_length
if start > history_length:
start = history_length
results = []
for idx, file_name in enumerate(os.listdir(app.config['history_images'])[start: end]):
image = np.array(Image.open(os.path.join(app.config['history_images'], file_name)))
image = base64.b64encode(np.array(cv2.imencode('.jpg', image)[1]).tobytes()).decode('utf8')
with open(os.path.join(app.config['history_infos'], file_name.replace('.jpg', '.json'))) as f:
info = json.load(f)
results.append({'id': idx, 'image': image, **info})
return json.dumps({'history': results})
@app.route('/predict', methods=['POST'])
def infer():
"""
预测接收到的图片
"""
image = cv2.imdecode(np.array(bytearray(request.files['file'].stream.read())), cv2.IMREAD_COLOR)
plt_image = copy.deepcopy(image)
start_time = time.time()
# 图片预处理
image = preprocess(image)
# 预测
input_tensor = app.config['model'].get_input(0)
input_tensor.from_numpy(image)
app.config['model'].run()
output_tensor = app.config['model'].get_output(0)
# 处理预测结果
pred = postprocess(output_tensor, plt_image, app.config['label_list'])
pred['predict_time'] = (time.time() - start_time) * 1000
# 存储
num = len(os.listdir(app.config['history_images']))
cv2.imwrite(app.config['history_images'] + f'/{num}.jpg', pred['image'])
with open(app.config['history_infos'] + f'/{num}.json', 'w') as f:
json.dump({'predict_time': pred['predict_time'], 'result': pred['result']}, f)
image_bytes = base64.b64encode(np.array(cv2.imencode('.jpg', pred['image'])[1]).tobytes()).decode('utf8')
return jsonify({'state': 1, 'image': image_bytes, 'predict_time': pred['predict_time'], 'result': pred['result']})
if __name__ == '__main__':
if not os.path.exists('./config.json'):
sys.exit(1)
# 读取配置
with open('./config.json') as f:
cfg = json.load(f)
for key in cfg:
if isinstance(cfg[key], str):
app.config[key] = cfg[key]
else:
for sub_key in cfg[key]:
app.config[key + '_' + sub_key] = cfg[key][sub_key]
if not os.path.exists(app.config['history_images']):
os.makedirs(app.config['history_images'])
os.mkdir(app.config['history_infos'])
# 加载模型
config = MobileConfig()
config.set_model_from_file(app.config['model_path'])
app.config['model'] = create_paddle_predictor(config)
# 读取label
app.config['label_list'] = []
with open(app.config['label_list_path']) as f:
for line in f.readlines():
app.config['label_list'].append(line.strip())
# 启动服务
app.run(host=app.config['server_ip'], port=app.config['server_port'], debug=True)
6.2.4 启动后端
- 在
server
目录下cmd
中,输入命令:python app.py
- 显示以下即启动成功
个人简介