|
- from celery import Celery, current_task
- import requests
- from file_upload import MinioHelper
- from net.api import DehazingNet
- import json
- import warnings
- import os
- import cv2
-
- app = Celery(broker="{{ cookiecutter.celery_broker }}", backend="{{ cookiecutter.celery_backend }}")
- app.conf.result_backend_transport_options = {'visibility_timeout': 18000}
- app.conf.broker_transport_options = {'visibility_timeout': 18000}
-
- IMAGE_FILE = 0
- VIDEO_FILE = 1
- RTSP_STREAM = 2
- RTMP_STREAM = 3
-
- STATUS_FINISHED = 0
- STATUS_IN_PROGRESS = 1
- STATUS_FAILED = 2
- STATUS_NOT_START = 3
- STATUS_STOPPING = 4
- STATUS_STOPPED = 5
-
- STATUS_ITEMS = (
- (STATUS_FINISHED, '处理完毕'),
- (STATUS_IN_PROGRESS, '正在处理'),
- (STATUS_FAILED, '异常'),
- (STATUS_NOT_START, '未处理'),
- (STATUS_STOPPING, '正在停止'),
- (STATUS_STOPPED, '已停止')
- )
-
-
- def download_image(url):
- file_name = url.split(os.sep)[-1]
- local_url = os.path.join(os.path.dirname(os.path.abspath("__file__")), file_name)
- try:
- r = requests.get(url)
- with open(local_url, "wb") as f:
- f.write(r.content)
- except Exception as err:
- print('Error: {}'.format(err))
- return False
- else:
- return local_url
-
- def upload_file(file_path, file_name):
- # upload file to minio
- minio_helper = MinioHelper()
- ret_dict = minio_helper.upload_file(file_path=file_path, object_name=file_name)
- if ret_dict['code'] == 0:
- return_url = ret_dict['public_url']
- try:
- os.remove(result_path)
- except Exception as ex:
- warnings.warn('Error in remove {} : {}'.format(result_path, ex))
- return str(ex)
- if ret_dict['code'] == 1:
- warnings.warn('Error in file uploading !\n{}'.format(ret_dict['code_msg']))
- return None
-
- def dehazing(haze_image):
- dehazing_net = DehazingNet(haze_image)
- dehaze_image = dehazing_net.dehaze()
- return dehaze_image
-
-
- @app.task
- def start_task(input_params, result_callback, progress_callback=None, extra_params=None):
- print('task_id: {current_task_id}'.format(current_task_id=current_task.request.id))
-
- # parse input params
- params_dict = json.loads(input_params)
- input_type = params_dict.get("input_type", None)
- input_url = params_dict.get("input_url", None)
-
- if input_type == IMAGE_FILE:
- try:
- # download image with haze
- haze_image = download_image(input_url)
- # run dehazing algorithm
- dehaze_image = dehazing(input_params=input_params, extra_params=extra_params)
- # save dehazed image in local
- image_name = input_url.split(os.sep)[-1]
- dehaze_image_name = image_name.split('.')[0]+"_dehaze."+image_name.split('.')[-1]
- if os.path.exists("celery_result"):
- os.mkdir("celery_result")
- dehaze_image_path = os.path.join("celery_result", dehaze_image_name)
- cv2.imwrite(dehaze_image_name, dehaze_image)
- # upload dehaze image
- dehaze_image_url = upload_file(dehaze_image_path, dehaze_image_name)
- except Exception as ex:
- # 如果处理失败
- result_dict['status'] = STATUS_FAILED
- result_dict['result'] = ex
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
- else:
- result_dict = {
- "status": STATUS_FINISHED,
- "result": dehaze_image_url
- }
- # 如果处理成功,将处理结果PATCH到result_callback中
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
-
- else:
- result_dict['status'] = STATUS_FAILED
- result_dict['result'] = 'wrong input type'
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
|