Browse Source

将ability改造为使用异步I/O模式,代替原先基于serviceboot的多线程模式。

pull/9/head
huolongshe 6 months ago
parent
commit
983b0f929d
15 changed files with 359 additions and 62 deletions
  1. +0
    -8
      README.md
  2. +0
    -14
      app/app_core.py
  3. +5
    -0
      app/global_data/config.py
  4. +30
    -6
      app/global_data/consul_client.py
  5. +5
    -2
      app/global_data/global_data.py
  6. +3
    -0
      app/global_data/oauth_client.py
  7. +14
    -7
      app/service/ability_service.py
  8. +15
    -14
      app/service/http_client.py
  9. +2
    -2
      app/service/umm_client.py
  10. +193
    -0
      app/serviceboot.py
  11. +3
    -5
      application.yml
  12. +1
    -1
      build-docker.sh
  13. +86
    -0
      build_docker.py
  14. +1
    -1
      requirements.txt
  15. +1
    -2
      start.py

+ 0
- 8
README.md View File

@@ -6,8 +6,6 @@

AI能力开放网关(ability)是CubeAI智立方中用于代理访问部署于k8s之上的AI能力开放平台的网关微服务。

ability基于[CubePy微服务框架](https://git.openi.org.cn/OpenI/cubepy)和[ServiceBoot微服务引擎](https://pypi.org/project/serviceboot)开发。

## 基本配置

- 监听端口
@@ -37,15 +35,9 @@ ability基于[CubePy微服务框架](https://git.openi.org.cn/OpenI/cubepy)和[S
4. 在PyCharm的terminal窗口中执行如下命令安装依赖包:

# sh pip-install-reqs.sh
依赖包安装完成后,可在terminal窗口中执行如下命令来查看serviceboot所有命令行格式:
# serviceboot

5. 在PyCharm窗口中右键单击“start.py”文件,选择“run 'start'”或者“debug 'start'”来运行或调试程序。

6. 开发完成后,可在terminal窗口中执行如下命令来生成微服务docker镜像:

# serviceboot build_docker
或者
# sh build-docker.sh

+ 0
- 14
app/app_core.py View File

@@ -1,14 +0,0 @@
# -*- coding: utf-8 -*-
from app.global_data.global_data import g
from app.service import ability_service


class AppCore(object):
def __init__(self):
g.load_global_data()
if not g.init_success:
raise Exception('初始化加载 global_data 失败!')

def forward_request(self, prev_request):
return ability_service.forward_request(prev_request)

+ 5
- 0
app/global_data/config.py View File

@@ -15,6 +15,11 @@ class Config:
except:
self.app_version = '0.0.1'

try:
self.max_connections = yml['http']['max_connections']
except:
self.max_connections = 3000

self.server_ip = get_local_ip()

self.app_profile = os.environ.get('APP_PROFILE', 'dev').lower()


+ 30
- 6
app/global_data/consul_client.py View File

@@ -1,17 +1,18 @@
import os
import json
import yaml
import random
import consul
import requests
import logging
import os
from tornado.httpclient import HTTPRequest


class ConsulClient:
def __init__(self, host=None, port=None, token=None):
def __init__(self, host=None, port=None, async_http_client=None):
self.host = host
self.port = port
self.token = token
self.async_http_client = async_http_client
self.consul = consul.Consul(host=host, port=port)
self.registered = False

@@ -40,9 +41,6 @@ class ConsulClient:
except:
self.registered = False

def get_services(self):
return self.consul.agent.services()

def resolve_service(self, name): # 根据服务名解析其“IP地址:端口号”,负载均衡随机取其中之一
# 如果程序运行在K8S中,直接通过kube-dns解析服务名,服务端口号默认为80
if os.environ.get('APP_PROFILE', 'dev').lower() == 'k8s':
@@ -65,6 +63,29 @@ class ConsulClient:

return None

async def async_resolve_service(self, name): # 根据服务名解析其“IP地址:端口号”,负载均衡随机取其中之一
# 如果程序运行在K8S中,直接通过kube-dns解析服务名,服务端口号默认为80
if os.environ.get('APP_PROFILE', 'dev').lower() == 'k8s':
return name

# 非k8s环境,从Consul解析地址
url = 'http://{}:{}/v1/health/service/{}'.format(self.host,self.port, name)
try:
res = await self.async_http_client.fetch(HTTPRequest(url=url, method='GET'))
except:
return None

instances = json.loads(str(res.body, encoding='utf-8'))
results = []
for instance in instances:
if instance['Checks'][0]['Status'] == 'passing':
results.append(instance['Service']['Address'] + ':' + str(instance['Service']['Port']))

if len(results) > 0:
return results[random.randint(0, len(results) - 1)] # 随机获取一个可用的服务实例

return None

def get_kv(self):
try:
kv_yaml = self.consul.kv.get('config/application/data')[1].get('Value')
@@ -72,6 +93,9 @@ class ConsulClient:
except:
return None

def get_services(self):
return self.consul.agent.services()

def get_service_names(self):
services = self.consul.agent.services()
names = []


+ 5
- 2
app/global_data/global_data.py View File

@@ -6,6 +6,7 @@ import threading
import websocket
import requests
from app.global_data.config import Config
from tornado.httpclient import AsyncHTTPClient
from app.global_data.consul_client import ConsulClient
from app.global_data.oauth_client import OauthClient
import logging
@@ -17,6 +18,8 @@ class GlobalData:
self.init_success = False

self.config = None
self.current_connections = 0
self.async_http_client = None
self.consul_client = None
self.redis_pool = None
self.oauth_client = None
@@ -26,8 +29,8 @@ class GlobalData:

def load_global_data(self):
self.config = Config()
self.consul_client = ConsulClient(self.config.consul_address, self.config.consul_port)
self.async_http_client = AsyncHTTPClient()
self.consul_client = ConsulClient(self.config.consul_address, self.config.consul_port, self.async_http_client)
service_id = '{}-{}'.format(self.config.app_name, str(uuid.uuid4()).replace('-', ''))
self.consul_client.register(self.config.app_name, service_id, self.config.server_ip, self.config.server_port,
self.config.consul_tags, self.config.consul_http_check_url)


+ 3
- 0
app/global_data/oauth_client.py View File

@@ -17,6 +17,9 @@ class OauthClient:
def get_uaa_url(self):
return 'http://{}/api/data'.format(self.consul_client.resolve_service(self.uaa_name))

async def async_get_uaa_url(self):
return 'http://{}/api/data'.format(await self.consul_client.async_resolve_service(self.uaa_name))

def update_public_key(self):
body = {
'action': 'get_public_key',


+ 14
- 7
app/service/ability_service.py View File

@@ -1,10 +1,10 @@
import redis
import requests
from tornado.httpclient import HTTPRequest
from app.service import umm_client
from app.global_data.global_data import g


def forward_request(prev_request):
async def forward_request(prev_request):
solution_uuid, api_name = prev_request.path[1:].split('/', maxsplit=1)

redis_conn = redis.Redis(connection_pool=g.redis_pool)
@@ -14,7 +14,7 @@ def forward_request(prev_request):
k8s_port = 0

if k8s_port < 1:
res = umm_client.find_solution(solution_uuid)
res = await umm_client.find_solution(solution_uuid)
if res['status'] != 'ok' or res['value']['total'] < 1:
raise Exception('未找到模型部署实例')

@@ -27,11 +27,18 @@ def forward_request(prev_request):
internal_ip = g.get_central_config()['kubernetes']['ability']['internalIP']
url = 'http://{}:{}/{}'.format(internal_ip, k8s_port, api_name)

res = requests.request(method=prev_request.method, url=url, data=prev_request.body, headers=prev_request.headers)
body = prev_request.body if prev_request.method == 'POST' else None
res = await g.async_http_client.fetch(
raise_error=False, # 对于“non-200 response code”,也返回response,而不是抛出异常,以保持与使用requests函数的一致性。
request=HTTPRequest(
method=prev_request.method,
url=url,
body=body,
headers=prev_request.headers
)
)

if not api_name.startswith('web/'):
redis_conn.incr('abcc_' + solution_uuid) # ABility Call Count

return {
'response': res,
}
return res

+ 15
- 14
app/service/http_client.py View File

@@ -1,9 +1,9 @@
import json
import requests
from tornado.httpclient import HTTPRequest
from app.global_data.global_data import g


def http_client(service_name, body=None, jwt=None):
async def http_client(service_name, body=None, jwt=None):
headers = {
'Content-Type': 'application/json;charset=UTF-8',
'Accept': '*/*',
@@ -12,28 +12,29 @@ def http_client(service_name, body=None, jwt=None):
if jwt is not None:
headers['Authorization'] = 'Bearer {}'.format(jwt)

host = g.consul_client.resolve_service(service_name)
host = await g.consul_client.async_resolve_service(service_name)
url = 'http://{}/api/data'.format(host)

try:
res = requests.post(url=url, json=body, headers=headers)
except:
return {
'status': 'err',
'value': 'HTTP访问失败!'
}

if res.status_code != 200:
res = await g.async_http_client.fetch(
request=HTTPRequest(
method='POST',
url=url,
body=json.dumps(body),
headers=headers
)
)
except Exception as e:
return {
'status': 'err',
'value': res.text
'value': str(e)
}

try:
# JSON数据,转化成JSON对象
result = json.loads(res.text, encoding='utf-8')
result = json.loads(str(res.body, encoding='utf-8'), encoding='utf-8')
except:
# 非JSON数据(二进制字节流),直接返回
result = res.content
result = res.body

return result

+ 2
- 2
app/service/umm_client.py View File

@@ -4,11 +4,11 @@ from app.service.http_client import http_client
service_name = 'umm'


def find_solution(uuid, jwt=None):
async def find_solution(uuid, jwt=None):
body = {
'action': 'get_solutions',
'args': {
'uuid': uuid,
},
}
return http_client(service_name, body=body, jwt=jwt)
return await http_client(service_name, body=body, jwt=jwt)

+ 193
- 0
app/serviceboot.py View File

@@ -0,0 +1,193 @@
# -*- coding: utf-8 -*-
import os
import yaml
import socket
import logging
import tornado.web
import tornado.ioloop
import tornado.websocket
import tornado.httpserver
from tornado.httputil import HTTPHeaders
from app.global_data.global_data import g
from app.service import ability_service


class GatewayApi(tornado.web.RequestHandler):

async def get(self, *args, **kwargs):
g.current_connections += 1
if g.current_connections > g.config.max_connections:
g.current_connections -= 1
self.set_status(500)
self.write('API网关限流,服务暂不可用!')
return

try:
response = await ability_service.forward_request(self.request)

self.set_status(response.code)
self._headers = HTTPHeaders(response.headers) # 注意:需要强制类型转换

if self._headers.get('Content-Type') == 'gzip':
try:
self._headers.pop('Content-Type')
self._headers.pop('Content-Length')
except:
pass

if self._headers.get('Transfer-Encoding'):
self._headers.pop('Transfer-Encoding')

if self._headers.get('Content-Length') is not None:
self.set_header('Content-Length', len(response.body))

if self.request.headers.get('Origin'):
self.set_header('Access-Control-Allow-Credentials', 'true')
self.set_header('Access-Control-Allow-Origin', self.request.headers.get('Origin'))

if self._status_code in (204, 304) or 100 <= self._status_code < 200:
# 这些状态下response中不能有body,所以不应该write
return

self.write(response.body)
except Exception as e:
logging.error(str(e))
self.set_status(500)
self.write('模型微服务无法访问!')

g.current_connections -= 1

async def post(self, *args, **kwargs):
g.current_connections += 1
if g.current_connections > g.config.max_connections:
g.current_connections -= 1
self.set_status(500)
self.write('API网关限流,服务暂不可用!')
return

try:
response = await ability_service.forward_request(self.request)

self.set_status(response.code)
self._headers = HTTPHeaders(response.headers) # 注意:需要强制类型转换

if self._headers.get('Content-Type') == 'gzip':
try:
self._headers.pop('Content-Type')
self._headers.pop('Content-Length')
except:
pass

if self._headers.get('Transfer-Encoding'):
self._headers.pop('Transfer-Encoding')

if self._headers.get('Content-Length') is not None:
self.set_header('Content-Length', len(response.body))

if self.request.headers.get('Origin'):
self.set_header('Access-Control-Allow-Credentials', 'true')
self.set_header('Access-Control-Allow-Origin', self.request.headers.get('Origin'))

if self._status_code in (204, 304) or 100 <= self._status_code < 200:
# 这些状态下response中不能有body,所以不应该write
return

self.write(response.body)
except Exception as e:
logging.error(str(e))
self.set_status(500)
self.write('模型微服务无法访问!')

g.current_connections -= 1

async def options(self, *args, **kwargs):
# 允许跨域
self.set_status(204)
self.set_header('Access-Control-Allow-Credentials', 'true')
self.set_header('Access-Control-Allow-Origin', self.request.headers.get('Origin'))
self.set_header("Access-Control-Allow-Headers", "content-type")
self.set_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')


class HealthChecker(tornado.web.RequestHandler):
async def get(self, *args, **kwargs):
self.write('{"description": "Micro-service Discovery Client", "status": "UP"}')


def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()

return ip


def start():
app_profile = os.environ.get('APP_PROFILE', 'dev').lower()
log_level = logging.DEBUG if app_profile == 'dev' else logging.ERROR
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s')

try:
with open('./application.yml', 'rb') as f:
yml = yaml.load(f, Loader=yaml.SafeLoader)
except:
logging.error('服务配置文件application.yml不存在!')
return

try:
ename = yml['service']['ename']
except:
logging.error('未指定服务英文名!请在application.yml文件中编辑修改...')
return

try:
cname = yml['service']['cname']
except:
cname = ename.upper()

if app_profile == 'dev':
try:
port = yml['service']['port']['dev']
except:
logging.error('未指定服务端口号!缺省使用80端口。')
port = 80
else:
try:
port = yml['service']['port']['prod']
except:
logging.error('未指定服务端口号!缺省使用80端口。')
port = 80

g.load_global_data()
if not g.init_success:
logging.error('微服务: {}/{} 初始化加载global_data失败!'.format(cname, ename))
return

handlers = [
(r'/management/health', HealthChecker),
(r'/(.*)', GatewayApi),
]

debug = 'dev' == app_profile # 开发模式缺省启动debug

app = tornado.web.Application(
handlers=handlers,
debug=debug
)
http_server = tornado.httpserver.HTTPServer(app, max_buffer_size=1000 * 1024 * 1024)
http_server.bind(port)
http_server.start(num_processes=1)

logging.critical('##################################################')
logging.critical(' 微服务: {}/{} started ...'.format(cname, ename))
logging.critical(' Listening at: {}:{}'.format(get_local_ip(), port))
logging.critical(' App profile: {}'.format(app_profile))
logging.critical('##################################################')
tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
start()

+ 3
- 5
application.yml View File

@@ -9,10 +9,8 @@ service:
dev: 8207
prod: 80

gateway:
is_gateway: yes

build:
image_name: ability
build_web: no
compile_python_to_so: no

http:
max_connections: 3000

+ 1
- 1
build-docker.sh View File

@@ -1 +1 @@
serviceboot build_docker
python3 build_docker.py

+ 86
- 0
build_docker.py View File

@@ -0,0 +1,86 @@
# -*- coding: utf-8 -*-
import os
import platform
import yaml


def build_docker():

is_windows = platform.system() == 'Windows'

if not os.path.exists('requirements.txt'):
print('错误: requirements.txt文件不存在!')
return

try:
with open('./application.yml', 'rb') as f:
yml = yaml.load(f, Loader=yaml.SafeLoader)
except:
print('错误: 模型配置文件application.yml不存在!')
return

try:
image_name = yml['build']['image_name']
except:
print('错误: 未指定docker镜像名称!')
print('请在application.yml文件中编辑修改...')
return

try:
image_tag = str(yml['build']['tag'])
except:
print('未指定docker镜像tag,使用:latest')
image_tag = 'latest'

try:
build_web = yml['build']['build_web']
except:
build_web = False

if is_windows:
os.system('rd /s /q temp')
os.system('mkdir temp')
os.system('xcopy /q application.yml temp')
os.system('xcopy /q requirements.txt temp')
os.system('xcopy /q Dockerfile temp')
else:
os.system('rm -rf temp')
os.system('mkdir temp')
os.system('cp ./application.yml ./temp')
os.system('cp ./requirements.txt ./temp')
os.system('cp ./Dockerfile ./temp')

if is_windows:
os.system('mkdir temp\\app')
os.system('xcopy /y /q /s /e app temp\\app')
else:
os.system('cp -rf ./app ./temp/')

if build_web:
if os.path.exists('./webapp/src'):
cwd = os.getcwd()
os.chdir(os.path.join(cwd, 'webapp'))
if not os.path.exists('./node_modules'):
os.system('npm install')
os.system('ng build --prod')
os.chdir(cwd)
if os.path.exists('./webapp/www'):
if is_windows:
os.system('mkdir temp\\webapp\\www')
os.system('xcopy /y /q /s /e webapp\\www temp\\webapp\\www')
else:
os.system('mkdir temp/webapp')
os.system('cp -rf ./webapp/www ./temp/webapp/')

os.system('docker image rm {}:{}'.format(image_name, image_tag))
os.system('docker build -t {}:{} ./temp'.format(image_name, image_tag))
if is_windows:
os.system('rd /s /q temp')
else:
os.system('rm -rf temp')

print('模型docker镜像 {}:{} 构建完成! '.format(image_name, image_tag))


if __name__ == '__main__':
build_docker()

+ 1
- 1
requirements.txt View File

@@ -1,5 +1,5 @@
# python==3.5.9
serviceboot==1.0.6
tornado==6.0.4
requests==2.24.0
python-consul==1.1.0
python_jwt==3.2.6


+ 1
- 2
start.py View File

@@ -1,4 +1,3 @@
from serviceboot import serviceboot

from app import serviceboot

serviceboot.start()

Loading…
Cancel
Save