|
- from celery import Celery, current_task
- import requests
- from file_upload import MinioHelper
- from net.api import DehazingNet
- import json
- import warnings
- import os
- import cv2
- import torchvision.utils as vutils
- import traceback
- from PIL import Image
-
-
- app = Celery(broker="redis://:pcl1305@192.168.202.90:6379/0", backend="redis://:pcl1305@192.168.202.90:6379/1")
- app.conf.result_backend_transport_options = {'visibility_timeout': 18000}
- app.conf.broker_transport_options = {'visibility_timeout': 18000}
-
- IMAGE_FILE = 0
- VIDEO_FILE = 1
- RTSP_STREAM = 2
- RTMP_STREAM = 3
-
- STATUS_FINISHED = 0
- STATUS_IN_PROGRESS = 1
- STATUS_FAILED = 2
- STATUS_NOT_START = 3
- STATUS_STOPPING = 4
- STATUS_STOPPED = 5
-
- STATUS_ITEMS = (
- (STATUS_FINISHED, '处理完毕'),
- (STATUS_IN_PROGRESS, '正在处理'),
- (STATUS_FAILED, '异常'),
- (STATUS_NOT_START, '未处理'),
- (STATUS_STOPPING, '正在停止'),
- (STATUS_STOPPED, '已停止')
- )
-
-
- def download_image(url):
- file_name = url.split(os.sep)[-1]
- local_url = os.path.join(os.path.dirname(os.path.abspath("__file__")), file_name)
- r = requests.get(url)
- with open(local_url, "wb") as f:
- f.write(r.content)
- return local_url
-
- def upload_file(file_path, file_name):
- # upload file to minio
- minio_helper = MinioHelper()
- ret_dict = minio_helper.upload_file(file_path=file_path, object_name=file_name)
- if ret_dict['code'] == 0:
- return_url = ret_dict['public_url']
- os.remove(file_path)
- return return_url
- if ret_dict['code'] == 1:
- raise Exception('Error in file uploading !\n{}'.format(ret_dict['code_msg']))
-
-
- def dehazing(haze_image_url):
- haze_image = Image.open(haze_image_url)
- dehazing_net = DehazingNet(haze_image)
- dehaze_image = dehazing_net.dehaze()
- return dehaze_image
-
-
- @app.task
- def start_task(input_params, result_callback, progress_callback=None, extra_params=None):
- print('task_id: {current_task_id}'.format(current_task_id=current_task.request.id))
- result_dict = {}
-
- # parse input params
- params_dict = json.loads(input_params)
- input_type = params_dict.get("input_type", None)
- input_url = params_dict.get("input_url", None)
-
- if input_type == IMAGE_FILE:
- try:
- # download image with haze
- print('downloading image {}...'.format(input_url))
- haze_image_url = download_image(input_url)
- haze_image_name = input_url.split(os.sep)[-1][:-4]
- haze_image_type = input_url.split(os.sep)[-1][-4:]
- haze_image_type = '.png'
- # run dehazing algorithm
- print('running algorithm ...')
- dehaze_image = dehazing(haze_image_url)
- # save dehazed image in local
- print('saving algorigthm result ...')
- dehaze_image_name = haze_image_name+"_dehaze"+haze_image_type
- if not os.path.exists(os.path.join(os.getcwd(), "celery_result")):
- os.mkdir(os.path.join(os.getcwd(), "celery_result"))
- dehaze_image_path = os.path.join(os.path.join(os.getcwd(), "celery_result"), dehaze_image_name)
- print(dehaze_image_path)
- vutils.save_image(dehaze_image, dehaze_image_path)
- # upload dehaze image
- print('uploading result ...')
- dehaze_image_url = upload_file(dehaze_image_path, dehaze_image_name)
- print(dehaze_image_url)
- except Exception as ex:
- # 如果处理失败
- print(ex)
- traceback.print_exc()
- result_dict['status'] = STATUS_FAILED
- result_dict['result'] = str(ex)
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
- else:
- result_dict = {
- "status": STATUS_FINISHED,
- "result": dehaze_image_url
- }
- # 如果处理成功,将处理结果PATCH到result_callback中
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
- print('---- task finished ----')
-
- else:
- result_dict['status'] = STATUS_FAILED
- result_dict['result'] = 'wrong input type'
- ret = requests.patch(url=result_callback,
- data=json.dumps(result_dict, ensure_ascii=False),
- headers={'content-type': 'application/json'})
|