Browse Source

整合ServiceBoot和iBoot为新的ServiceBoot。将CubePy微服务框架中用到的部分通用组件放置到ServiceBoot中来提供。

pull/17/head
huolongshe 1 month ago
parent
commit
72de76f0a3
11 changed files with 32 additions and 294 deletions
  1. +4
    -4
      README.md
  2. +4
    -4
      app/global_data/config.py
  3. +0
    -86
      app/global_data/consul_client.py
  4. +2
    -4
      app/global_data/global_data.py
  5. +0
    -110
      app/global_data/oauth_client.py
  6. +4
    -4
      app/service/deploy_service.py
  7. +14
    -13
      app/service/lcm_service.py
  8. +0
    -53
      app/service/token_service.py
  9. +3
    -11
      application.yml
  10. +0
    -1
      pip-install-reqs.sh
  11. +1
    -4
      requirements.txt

+ 4
- 4
README.md View File

@@ -2,11 +2,11 @@

## CubeAI智立方简介

[CubeAI智立方](https://git.openi.org.cn/OpenI/cubeai)是集AI模型自动化服务封装、发布、共享、部署和能力开放等功能于一体的开源AI算能服务平台,其核心作用在于打通AI模型开发至实际生产应用之间的壁垒,加速AI创新和应用进程,促进AI应用从设计、开发直到部署、运营整个生命周期的自动化快速迭代和演进。
[CubeAI智立方](https://git.openi.org.cn/OpenI/cubeai) 是集AI模型自动化服务封装、发布、共享、部署和能力开放等功能于一体的开源AI算能服务平台,其核心作用在于打通AI模型开发至实际生产应用之间的壁垒,加速AI创新和应用进程,促进AI应用从设计、开发直到部署、运营整个生命周期的自动化快速迭代和演进。

AI模型部署(umd)是CubeAI智立方中负责AI模型部署以及部署后模型在k8s云平台中生命周期管理的微服务。

umd基于[CubePy微服务框架](https://git.openi.org.cn/OpenI/cubepy)和[ServiceBoot微服务引擎](https://pypi.org/project/serviceboot)开发。
umd基于 [CubePy微服务框架](https://git.openi.org.cn/OpenI/cubepy) 和 [ServiceBoot微服务引擎](https://git.openi.org.cn/OpenI/cubepy_serviceboot) 开发。

## 基本配置

@@ -17,7 +17,7 @@ umd基于[CubePy微服务框架](https://git.openi.org.cn/OpenI/cubepy)和[Servi

- 服务注册与发现,中心配置: Consul(8500)

- 用户认证授权: uaa
- 用户认证授权: [uaa](https://git.openi.org.cn/OpenI/cubepy_uaa)

- 数据库: 无

@@ -32,7 +32,7 @@ umd基于[CubePy微服务框架](https://git.openi.org.cn/OpenI/cubepy)和[Servi

2. 使用PyCharm打开本project所在目录。

3. 建议在PyCharm中专门为本project新建一个专用Python虚拟环境,Python版本选择3.5以上(建议Python 3.5.9)
3. 建议在PyCharm中专门为本project新建一个专用Python虚拟环境,Python版本选择3.5以上。

4. 在PyCharm的terminal窗口中执行如下命令安装依赖包:



+ 4
- 4
app/global_data/config.py View File

@@ -8,10 +8,10 @@ class Config:
with open('./application.yml', 'r') as f:
yml = yaml.load(f, Loader=yaml.SafeLoader)

self.app_name = yml['service']['ename']
self.app_name = yml['serviceboot']['ename']

try:
self.app_version = yml['service']['version']
self.app_version = yml['serviceboot']['version']
except:
self.app_version = '0.0.1'

@@ -21,14 +21,14 @@ class Config:

if self.app_profile == 'dev':
try:
self.server_port = yml['service']['port']['dev']
self.server_port = yml['serviceboot']['port']['dev']
except:
self.server_port = 80
self.consul_address = '127.0.0.1'
self.consul_tags = ['profile-dev']
else:
try:
self.server_port = yml['service']['port']['prod']
self.server_port = yml['serviceboot']['port']['prod']
except:
self.server_port = 80
self.consul_address = 'consul'


+ 0
- 86
app/global_data/consul_client.py View File

@@ -1,86 +0,0 @@
import json
import yaml
import random
import consul
import requests
import logging
import os


class ConsulClient:
def __init__(self, host=None, port=None, token=None):
self.host = host
self.port = port
self.token = token
self.consul = consul.Consul(host=host, port=port)
self.registered = False

def register(self, name, service_id, address, port, tags, http_check_url):
# 注册新的微服务节点之前,先去注册Consul中可能残存的本节点信息(例如微服务停止后在Consul中尚未清除之前就重新启动)
url = 'http://{}:{}/v1/health/service/{}'.format(self.host,self.port, name)
res = requests.get(url)
if res.status_code == 200:
instances = json.loads(res.text)
for instance in instances:
if instance['Service']['Address'] == address and instance['Service']['Port'] == port:
self.consul.agent.service.deregister(instance['Service']['ID'])

# 注册微服务
try:
self.consul.agent.service.register(
name,
service_id=service_id,
address=address,
port=port,
tags=tags,
check=consul.Check().http(http_check_url, '10s', '20s', '20s')
)
self.registered = True
logging.critical('Successful registered to Consul: {}:{}'.format(address, port))
except:
self.registered = False

def get_services(self):
return self.consul.agent.services()

def resolve_service(self, name): # 根据服务名解析其“IP地址:端口号”,负载均衡随机取其中之一
# 如果程序运行在K8S中,直接通过kube-dns解析服务名,服务端口号默认为80
if os.environ.get('APP_PROFILE', 'dev').lower() == 'k8s':
return name

# 非k8s环境,从Consul解析地址
url = 'http://{}:{}/v1/health/service/{}'.format(self.host,self.port, name)
res = requests.get(url)
if res.status_code != 200:
return None

instances = json.loads(res.text)
results = []
for instance in instances:
if instance['Checks'][0]['Status'] == 'passing':
results.append(instance['Service']['Address'] + ':' + str(instance['Service']['Port']))

if len(results) > 0:
return results[random.randint(0, len(results) - 1)] # 随机获取一个可用的服务实例

return None

def get_kv(self):
try:
kv_yaml = self.consul.kv.get('config/application/data')[1].get('Value')
return yaml.load(kv_yaml, Loader=yaml.SafeLoader)
except:
return None


if __name__ == "__main__":
consul_client = ConsulClient('127.0.0.1', 8500)
consul_client.register('demo', 'demo-test', '127.0.0.1', 8888, ['profile-dev'], 'http://127.0.0.1:8888/management/health')
if consul_client.registered:
print('Registered in Consul!')
print(consul_client.resolve_service('uaa'))
print(consul_client.resolve_service('demo'))
print(consul_client.get_kv())
else:
print('Not registered in Consul!!!')


+ 2
- 4
app/global_data/global_data.py View File

@@ -1,10 +1,8 @@
import uuid
import json
import time
from pykafka import KafkaClient
from serviceboot.consul_client import ConsulClient
from serviceboot.oauth_client import OauthClient
from app.global_data.config import Config
from app.global_data.consul_client import ConsulClient
from app.global_data.oauth_client import OauthClient
from app.global_data.k8s_client import K8sClient
import logging



+ 0
- 110
app/global_data/oauth_client.py View File

@@ -1,110 +0,0 @@
import json
import base64
import requests
from datetime import datetime


class OauthClient:
def __init__(self, consul_client):
self.consul_client = consul_client
self.uaa_name = 'uaa'
self.uaa_public_key =None
self.jwt = None
self.jwt_expire = None
self.update_public_key()
self.update_jwt()

def get_uaa_url(self):
return 'http://{}/api/data'.format(self.consul_client.resolve_service(self.uaa_name))

def update_public_key(self):
body = {
'action': 'get_public_key',
'args': {},
}
res = requests.post(url=self.get_uaa_url(), json=body)

if res.status_code == 200:
res = json.loads(res.text)
if res['status'] == 'ok':
self.uaa_public_key = res['value']['value']
else:
self.uaa_public_key = None
else:
self.uaa_public_key = None

return self.uaa_public_key

def update_jwt(self):
headers = {
'Authorization': 'Basic {}'.format(str(base64.b64encode(b'internal:internal'), encoding='utf-8'))
}
body = {
'action': 'get_token',
'args': {
'grant_type': 'client_credentials',
}
}
res = requests.post(url=self.get_uaa_url(), json=body, headers=headers)

if res.status_code == 200:
res = json.loads(res.text)
if res['status'] == 'ok':
self.jwt = res['value'].get('access_token')
self.jwt_expire = get_jwt_expire(self.jwt) # 通过缓存exp来验证jwt是否有效,节省验证时间
else:
self.jwt = None
self.jwt_expire = None
else:
self.jwt = None
self.jwt_expire = None

return self.jwt

def get_public_key(self):
return self.uaa_public_key if self.uaa_public_key is not None else self.update_public_key()

def get_jwt(self):
return self.jwt if not verify_expire(self.jwt_expire) else self.update_jwt()


def get_jwt_expire(jwt):
_, payload, _ = jwt.split('.')
missing_padding = (4 - len(payload) % 4) % 4
payload += '=' * missing_padding
try:
return json.loads(str(base64.b64decode(payload.encode()), encoding='utf-8')).get('exp')
except:
return None


def verify_expire(exp):
if exp is None:
return True
now = int(datetime.now().timestamp())
return now + 5 > exp


# 暂时不用该函数,用verify_expire代替,以节省验证时间
def verify_jwt_expire(jwt):
_, payload, _ = jwt.split('.')
missing_padding = (4 - len(payload) % 4) % 4
payload += '=' * missing_padding
try:
exp = json.loads(str(base64.b64decode(payload.encode()), encoding='utf-8')).get('exp')
now = int(datetime.now().timestamp())
return now + 5 > exp
except:
return True


if __name__ == "__main__":
from app.global_data.config import Config
from app.global_data.consul_client import ConsulClient

config = Config()
consul_client = ConsulClient(config.consul_address, config.consul_port)
oauth_client = OauthClient(consul_client)

print(oauth_client.get_public_key())
print(oauth_client.get_jwt())

+ 4
- 4
app/service/deploy_service.py View File

@@ -1,9 +1,10 @@
import time
import uuid
import threading
from serviceboot import token_service
from app.utils import mytime
from app.utils.file_tools import replace_special_char
from app.service import token_service, umm_client, message_service, kafka_service
from app.service import umm_client, message_service, kafka_service
from app.domain.task import Task
from app.domain.task_step import TaskStep
from app.domain.solution import Solution
@@ -58,11 +59,10 @@ def save_task_step_progress(taskUuid, stepName, stepStatus, stepProgress, descri


def deploy_model(**args):
http_request = args.get('http_request')
token = token_service.get_token(http_request)
token = token_service.get_token()
user_login = token.username
if user_login is None:
raise Exception('403 Forbidden')
raise Exception('用户未授权')

gpu_num = args.get('gpuNum')
try:


+ 14
- 13
app/service/lcm_service.py View File

@@ -1,5 +1,6 @@
from serviceboot import token_service
from app.utils.file_tools import replace_special_char
from app.service import token_service, umm_client
from app.service import umm_client
from app.service.deploy_service import get_image_name
from app.domain.solution import Solution
from app.domain.deployment_status import DeploymentStatus
@@ -9,11 +10,11 @@ from app.global_data.global_data import g
def get_deployment_status(**args):
username = args.get('username')
solution_uuid = args.get('solutionUuid')
token = token_service.get_token(args.get('http_request'))
token = token_service.get_token()
has_role = token.has_role('ROLE_OPERATOR')
user_login = token.username
if user_login is None or (user_login != username and not has_role):
raise Exception('403 Forbidden')
raise Exception('用户未授权')

try:
namespace = 'cubeai-' + replace_special_char(username)
@@ -39,11 +40,11 @@ def get_deployment_status(**args):
def get_deployment_logs(**args):
username = args.get('username')
solution_uuid = args.get('solutionUuid')
token = token_service.get_token(args.get('http_request'))
token = token_service.get_token()
has_role = token.has_role('ROLE_OPERATOR')
user_login = token.username
if user_login is None or (user_login != username and not has_role):
raise Exception('403 Forbidden')
raise Exception('用户未授权')

try:
namespace = 'cubeai-' + replace_special_char(username)
@@ -66,11 +67,11 @@ def scale_deployment(**args):
solution.__dict__ = args.get('solution')
target_status = DeploymentStatus()
target_status.__dict__ = args.get('targetStatus')
token = token_service.get_token(args.get('http_request'))
token = token_service.get_token()
has_role = token.has_role('ROLE_OPERATOR')
user_login = token.username
if user_login is None or (user_login != solution.deployer and not has_role):
raise Exception('403 Forbidden')
raise Exception('用户未授权')

try:
name = get_image_name(solution.uuid)
@@ -112,11 +113,11 @@ def scale_deployment(**args):
def pause_deployment(**args):
solution = Solution()
solution.__dict__ = args.get('solution')
token = token_service.get_token(args.get('http_request'))
token = token_service.get_token()
has_role = token.has_role('ROLE_OPERATOR')
user_login = token.username
if user_login is None or (user_login != solution.deployer and not has_role):
raise Exception('403 Forbidden')
raise Exception('用户未授权')

try:
name = get_image_name(solution.uuid)
@@ -134,11 +135,11 @@ def pause_deployment(**args):
def restart_deployment(**args):
solution = Solution()
solution.__dict__ = args.get('solution')
token = token_service.get_token(args.get('http_request'))
token = token_service.get_token()
has_role = token.has_role('ROLE_OPERATOR')
user_login = token.username
if user_login is None or (user_login != solution.deployer and not has_role):
raise Exception('403 Forbidden')
raise Exception('用户未授权')

try:
name = get_image_name(solution.uuid)
@@ -156,11 +157,11 @@ def restart_deployment(**args):
def stop_deployment(**args):
solution = Solution()
solution.__dict__ = args.get('solution')
token = token_service.get_token(args.get('http_request'))
token = token_service.get_token()
has_role = token.has_role('ROLE_OPERATOR')
user_login = token.username
if user_login is None or (user_login != solution.deployer and not has_role):
raise Exception('403 Forbidden')
raise Exception('用户未授权')

try:
name = get_image_name(solution.uuid)


+ 0
- 53
app/service/token_service.py View File

@@ -1,53 +0,0 @@
import python_jwt
from jwcrypto import jwk
from app.global_data.global_data import g
import logging


class Token:
def __init__(self):
self.is_valid = False
self.jwt = None
self.username = None
self.roles = []

def has_role(self, role):
return role in self.roles


def get_token(request):
token = Token()

public_key = g.oauth_client.get_public_key()
if public_key is None:
logging.error('Cannot get public key from UAA')
return token

authorization = request.headers.get('Authorization')
if authorization is None:
return token

if authorization[0:6].lower() != 'bearer':
return token

token.jwt = authorization[7:] # 去除前缀“Bearer ”

try:
jwt_decoded = python_jwt.verify_jwt(
token.jwt,
jwk.JWK.from_pem(public_key.encode()),
['RS256'],
checks_optional=True,
ignore_not_implemented=True
)
claims = jwt_decoded[1]
token.username = claims.get('user_name') or claims.get('client_id')
token.roles = claims.get('authorities') or []
token.is_valid = True
except Exception as e:
token.is_valid = False
token.jwt = None
token.username = None
token.roles = []

return token

+ 3
- 11
application.yml View File

@@ -1,18 +1,10 @@
serviceboot:
version: v2

service:
python_version: 3.5.9
cname: CubeAI模型部署
ename: umd
version: 1.0.0
port:
dev: 8206
prod: 80

gateway:
is_gateway: no

build:
image_name: umd
build_web: no
compile_python_to_so: no
has_web: no
process_num: 1 # default: 1. 值为0时进程数等于当前主机CPU核数

+ 0
- 1
pip-install-reqs.sh View File

@@ -1,2 +1 @@
# pip3 install -i https://pypi.douban.com/simple/ -r requirements.txt
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt

+ 1
- 4
requirements.txt View File

@@ -1,8 +1,5 @@
# python==3.5.9
serviceboot==1.0.6
requests==2.24.0
python-consul==1.1.0
python_jwt==3.2.6
serviceboot==2.0.5
pyyaml==5.3.1
kubernetes==11.0.0
pykafka==2.8.0


Loading…
Cancel
Save