|
- import logging
- from flask import Flask
- from flask import request,jsonify
- import json
- from cluster import *
-
-
- # the logging.basicConfig() once set cannot be changed
- logging.basicConfig(level = logging.DEBUG,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
-
- app = Flask(__name__)
-
- @app.route("/")
- def hello_world():
- return "Hello, CouldBrain2!"
-
-
- @app.route("/platformserver/v1/oauth/token", methods=['GET', 'POST'])
- def token():
- grant_type = request.args.get('grant_type')
- client_id = request.args.get('client_id')
- client_secret = request.args.get('client_secret')
- resp = {}
- if grant_type == 'client_credentials' and \
- client_id == '3af33be470f1481ca3445e9d37484b31' and \
- client_secret == 'f3449bd481f646859e3142c8b3f019f1':
- resp = {
- "access_token": "ODK4NWIWOGUTNWYYNI0ZN2E3LWEXOWYTOGM4N2RMMDNMMTK1",
- "expires_in": 86400,
- "token_type": "Bearer"
- }
- else:
- resp = {
- "code": -1,
- "msg": "invalid credential"
- }
-
- return jsonify(resp)
-
- @app.route("/platformserver/v1/platform/trainjob", methods=['POST'])
- def trainjob():
- resp = {
- "code": -2,
- "msg": "invalid parameter"
- }
-
- auth = request.headers.get('Authorization')
- if len(auth) < 48 or auth[-48:] != 'ODK4NWIWOGUTNWYYNI0ZN2E3LWEXOWYTOGM4N2RMMDNMMTK1':
- resp = {
- "code": -1,
- "msg": "invalid token"
- }
- logger.info('invalid token received')
- return json.dumps(resp)
-
- job_params = request.json
- logger.debug(job_params)
- job_name=job_params["name"]
- try:
- job_desc=job_params["desc"]
- except:
- job_desc=""
- image = job_params["image"]["name"]+":"+job_params["image"]["version"]
- dataset = ""
- tasks=job_params["tasks"]
- task_cnt=0
- for task in tasks:
- task_cnt += 1
- taskName=task["name"]
- command=task["command"]
- command=command.strip(';')
- print(command)
- taskNumber = task["taskNumber"]
- resources = task["resources"]
- parameters=task["parameters"]
- npuNumber = 0
- shmMB = 0
- cpuNumber = 0
- memoryMB = 0
- minFailedTaskCount = task["minFailedTaskCount"]
- minSucceededTaskCount = task["minSucceededTaskCount"]
- for resource in resources:
- if(resource["name"]=="cpu"):
- cpuNumber = int(resource["size"])
- if(resource["name"]=="npu.huawei.com/NPU"):
- npuNumber = int(resource["size"])
- if(resource["name"]=="memory"):
- if resource["size"][-2:]=="Ki":
- memory_Ki=float(resource["size"][:-2])
- memoryMB=int(memory_Ki/1024)
- elif resource["size"][-2:]=="Mi":
- memoryMB=int(resource["size"][:-2])
- elif resource["size"][-2:]=="Gi":
- memory_Gi=float(resource["size"][:-2])
- memoryMB=int(memory_Gi*1024)
- if(resource["name"]=="shm"):
- if resource["size"][-2:]=="Ki":
- shm_Ki=float(resource["size"][:-2])
- shmMB=int(shm_Ki/1024)
- elif resource["size"][-2:]=="Mi":
- shmMB=int(resource["size"][:-2])
- elif resource["size"][-2:]=="Gi":
- shm_Gi=float(resource["size"][:-2])
- shmMB=int(shm_Gi*1024)
- if(resource["name"]=="nvidia.com/gpu"):
- return jsonify(
- {
- "success": False,
- "payload": None,
- "error": {
- "code": 1,
- "subcode": 12345,
- "message": "this center just has npu resource"
- }
- })
- if(task_cnt>1):
- return jsonify(
- {
- "success": False,
- "payload": None,
- "error": {
- "code": 1,
- "subcode": 10086,
- "message": "this center just supply one task"
- }
- })
- spec_id=13
- if(npuNumber>1):
- spec_id=14
-
- token = get_token()
-
- job_body={
- "job_name": job_name,
- "job_desc": job_desc,
- "workspace_id": "0",
- "config": {
- "worker_server_num": 1,
- "app_url": "/intercloud/mindspore-code/resnet/",
- "boot_file_url": "/intercloud/mindspore-code/resnet/cifar_resnet50.py",
- "parameter": [
- ],
- "data_url":"/intercloud/dataset/cifar-10-batches-bin/",
- "spec_id": spec_id,
- "engine_id": 122,
- "train_url": "/intercloud/train/",
- "log_url": "/intercloud/job-log/"
- }
- }
- job_id=-1
- post_job_res = post_job(job_body,token)
- job_id = post_job_res["job_id"]
- if(job_id == -1):
- resp = {
- "success": False,
- "payload": None,
- "error": {
- "code": 1,
- "subcode": 12345,
- "message": "未知错误"
- }
- }
- else:
- resp = {
- "success": True,
- "payload": {
- "jobId": str(job_id)
- },
- "error": None
- }
-
- return jsonify(resp)
-
- @app.route("/platformserver/v1/platform/resource", methods=['GET'])
- def resource():
- resp = {
- "success": True,
- "payload": get_cluster_resource(),
- "error": None
- }
-
- return jsonify(resp)
- @app.route("/log/user/trainjob/<job_id>/task0/0/index.log", methods=['GET'])
- def log(job_id):
-
- token=get_token()
- job_version=get_job_version(job_id,token)
- log_filename=get_job_log_filename(job_id,job_version,token)
- log_content=get_job_log(job_id,job_version,log_filename,token)
- return log_content
-
- if __name__ == '__main__':
- app.run(host="0.0.0.0", debug=True)
|