#777 master章鱼优化任务监控数据查询接口

Merged
yangxzh1 merged 26 commits from openioctopus/octopus:master into master 3 months ago
  1. +37
    -3
      admin-portal/src/views/resourceManager/components/nodeList.vue
  2. +2
    -1
      build/application/admin-portal/dockerfile
  3. +2
    -1
      build/application/openai-portal/dockerfile
  4. +1
    -1
      deploy/charts/octopus/templates/ascend.yaml
  5. +4
    -4
      deploy/charts/octopus/templates/grafana.yaml
  6. +1
    -1
      deploy/charts/octopus/templates/logger.yaml
  7. +22
    -2
      deploy/charts/octopus/templates/prometheus.yaml
  8. +7
    -0
      deploy/charts/octopus/values.yaml
  9. +1
    -0
      server/admin-server/api/v1/develop.proto
  10. +26
    -0
      server/base-server/api/v1/develop.proto
  11. +1
    -2
      server/base-server/api/v1/ftpproxy.proto
  12. +1
    -0
      server/base-server/api/v1/resource.proto
  13. +1
    -0
      server/base-server/api/v1/resourcespec.proto
  14. +6
    -0
      server/base-server/api/v1/trainJob.proto
  15. +101
    -0
      server/base-server/configs/config_project.yaml
  16. +14
    -2
      server/base-server/internal/data/dao/resource.go
  17. +14
    -1
      server/base-server/internal/data/dao/resourcespec.go
  18. +66
    -0
      server/base-server/internal/data/prometheus/prometheus.go
  19. +178
    -0
      server/base-server/internal/service/develop/develop.go
  20. +10
    -2
      server/base-server/internal/service/ftpproxy/ftp.go
  21. +68
    -0
      server/base-server/internal/service/resources/resource.go
  22. +25
    -0
      server/base-server/internal/service/resources/resourcespec.go
  23. +134
    -24
      server/base-server/internal/service/trainjob/train_job.go
  24. +9
    -5
      server/base-server/internal/service/user/user.go
  25. +47
    -1
      server/openai-server/api/v1/develop.proto
  26. +12
    -0
      server/openai-server/api/v1/trainJob.proto
  27. +20
    -0
      server/openai-server/internal/service/develop.go

+ 37
- 3
admin-portal/src/views/resourceManager/components/nodeList.vue View File

@@ -60,6 +60,17 @@
</el-table-column>
</el-table>

<div class="block">
<el-pagination
@size-change="handleSizeChange"
@current-change="handleCurrentChange"
:current-page="currentPage"
:page-sizes="[20, 50, 100]"
:page-size="pageSize"
layout="total, sizes, prev, pager, next, jumper"
:total="totalSize">
</el-pagination>
</div>
</div>
</template>
<script>
@@ -73,15 +84,31 @@
data() {
return {
input: '',
totalData: [],
tableData: [],


currentPage: 1,
pageSize: 20,
totalSize: 0,
tooldata: []
}
},
created() {
this.getNodeList()
},
methods: {
handleSizeChange(val) {
this.pageSize = val
this.showCurrentdata()
},
handleCurrentChange(val) {
this.currentPage = val
this.showCurrentdata()
},
showCurrentdata() {
let start = (this.currentPage-1)*this.pageSize*2
let end = this.currentPage*this.pageSize*2-1
this.tableData = this.totalData.slice(start,end)
},
getDetail(val) {
let data = []
for (const key1 in val.allocated) {
@@ -124,7 +151,9 @@
else { item.children = [] }
}
)
this.tableData = this.handleTableData(response.data.nodes)
this.totalSize = response.data.nodes.length
this.totalData = this.handleTableData(response.data.nodes)
this.showCurrentdata()
}
} else {
this.$message({
@@ -207,4 +236,9 @@
.el-progress-circle__track {
stroke: #409EFF;
}

.block {
float: right;
margin: 20px;
}
</style>

+ 2
- 1
build/application/admin-portal/dockerfile View File

@@ -5,7 +5,8 @@ ENV NODE_ENV production

WORKDIR /admin-portal
COPY ./admin-portal/package.json ./
RUN npm config set registry https://registry.npm.taobao.org && \
RUN npm config set strict-ssl false && \
npm config set registry https://registry.npm.taobao.org && \
npm install --dev
COPY ./admin-portal ./
RUN npm run build:prod


+ 2
- 1
build/application/openai-portal/dockerfile View File

@@ -5,7 +5,8 @@ ENV NODE_ENV production

WORKDIR /openai-portal
COPY ./openai-portal/package.json ./
RUN npm config set registry https://registry.npm.taobao.org && \
RUN npm config set strict-ssl false && \
npm config set registry https://registry.npm.taobao.org && \
npm install --dev
COPY ./openai-portal ./
RUN npm run build:prod


+ 1
- 1
deploy/charts/octopus/templates/ascend.yaml View File

@@ -67,7 +67,7 @@ spec:
- name: log-path
hostPath:
path: /var/log/mindx-dl/devicePlugin
type: Directory
type: DirectoryOrCreate
- name: tmp
hostPath:
path: /tmp

+ 4
- 4
deploy/charts/octopus/templates/grafana.yaml View File

@@ -443,7 +443,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "npu_chip_info_utilization{id=~\"$npu_id\", name=\"$npu_hostname\"}",
"expr": "container_npu_utilization{pod_name=\"$pod_name\"}",
"format": "time_series",
"hide": false,
"intervalFactor": 2,
@@ -532,7 +532,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "npu_chip_info_hbm_used_memory{id=~\"$npu_id\", name=\"$npu_hostname\"} / npu_chip_info_hbm_total_memory{id=~\"$npu_id\", name=\"$npu_hostname\"} * 100",
"expr": "100 * container_npu_used_memory{pod_name=\"$pod_name\"} / container_npu_total_memory{pod_name=\"$pod_name\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "NPU hbm memory utilization",
@@ -2088,7 +2088,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "sum((sum(dcgm_mem_copy_utilization{pod_name=~\".+\"}) / count(dcgm_gpu_utilization{pod_name=~\".+\"})) or vector(0))",
"expr": "sum((sum(dcgm_mem_copy_utilization{pod_name=~\".*\"}) / count(dcgm_gpu_utilization{pod_name=~\".*\"})) or vector(0))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "Cluster GPU memory utilization(avg)",
@@ -5767,7 +5767,7 @@ data:
"steppedLine": false,
"targets": [
{
"expr": "sum((sum(dcgm_mem_copy_utilization{pod_name=~\".+\",name=~\"^$Node$\"}) / count(dcgm_gpu_utilization{pod_name=~\".+\",name=~\"^$Node$\"})) or vector(0))",
"expr": "sum((sum(dcgm_mem_copy_utilization{pod_name=~\".*\",name=~\"^$Node$\"}) / count(dcgm_gpu_utilization{pod_name=~\".*\",name=~\"^$Node$\"})) or vector(0))",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "Node GPU memory utilization(avg)",


+ 1
- 1
deploy/charts/octopus/templates/logger.yaml View File

@@ -289,7 +289,7 @@ spec:
{{- end }}
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash:7.17.13
image: docker.elastic.co/logstash/logstash:7.13.0
args: [
"bin/logstash","-f", "/usr/share/logstash/pipeline/logstash.conf",
]


+ 22
- 2
deploy/charts/octopus/templates/prometheus.yaml View File

@@ -299,7 +299,7 @@ spec:
- name: device-metrics
readOnly: true
mountPath: /run/prometheus
- image: nvidia/dcgm-exporter:1.4.6
- image: ybh1998/dcgm-exporter:0.1
name: nvidia-dcgm-exporter
securityContext:
runAsNonRoot: false
@@ -361,6 +361,14 @@ spec:
mountPath: /run/prometheus
- name: syspath
mountPath: /sys
- name: localtime
mountPath: /etc/localtime
- name: varlock
mountPath: /var/lock
- name: libefml
mountPath: /usr/lib/libefml.so
- name: libefml-real
mountPath: /usr/local/efsmi
volumes:
- name: pod-gcu-resources
hostPath:
@@ -371,6 +379,18 @@ spec:
- name: syspath
hostPath:
path: /sys
- name: localtime
hostPath:
path: /etc/localtime
- name: varlock
hostPath:
path: /var/lock
- name: libefml
hostPath:
path: /usr/lib/libefml.so
- name: libefml-real
hostPath:
path: /usr/local/efsmi
---
apiVersion: apps/v1
kind: DaemonSet
@@ -505,7 +525,7 @@ spec:
- name: log-npu-exporter
hostPath:
path: /var/log/mindx-dl/npu-exporter
type: Directory
type: DirectoryOrCreate
- name: localtime
hostPath:
path: /etc/localtime


+ 7
- 0
deploy/charts/octopus/values.yaml View File

@@ -475,6 +475,9 @@ sftpgo:
- name: "data"
persistentVolumeClaim:
claimName: "octopus-sftpgo-pvc"
- name: "minio"
persistentVolumeClaim:
claimName: "octopus-minio-pvc"
volumeMounts:
- name: "data"
mountPath: "/var/lib/sftpgo"
@@ -482,6 +485,8 @@ sftpgo:
- name: "data"
mountPath: "/srv/sftpgo"
subPath: "data"
- name: "minio"
mountPath: "/minio"
envVars:
- name: "SFTPGO_DEFAULT_ADMIN_USERNAME"
value: "admin"
@@ -490,6 +495,8 @@ sftpgo:
config:
data_provider:
create_default_admin: true
securityContext:
runAsUser: 0
nodeSelector:
<<: *nodeSelector



+ 1
- 0
server/admin-server/api/v1/develop.proto View File

@@ -92,6 +92,7 @@ message Notebook {
string userEmail = 26;
string resourcePool=27;
string exitMsg = 28;
string command = 29;
}

message ListNotebookReply {


+ 26
- 0
server/base-server/api/v1/develop.proto View File

@@ -29,6 +29,8 @@ service Develop {
rpc SaveNotebook (SaveNotebookRequest) returns (SaveNotebookReply);
// 查询事件记录列表
rpc ListNotebookEventRecord (ListNotebookEventRecordRequest) returns (ListNotebookEventRecordReply);
// 获取调试任务监控数据
rpc GetNotebookMetric (GetNotebookMetricRequest) returns (GetNotebookMetricReply);
}

message CreateNotebookRequest {
@@ -124,6 +126,7 @@ message Notebook {
string resourcePool=24;
string imageUrl = 25;
string exitMsg = 26;
string command = 27;
}

message ListNotebookReply {
@@ -201,4 +204,27 @@ message ListNotebookEventRecordRequest {
message ListNotebookEventRecordReply {
int64 totalSize = 1;
repeated NotebookEventRecord records = 2;
}

message GetNotebookMetricRequest {
string id = 1[(validate.rules).string = {min_len: 1}];
int32 taskIndex = 2;
int64 start = 3;
int32 size = 4[(validate.rules).int32 = {gte:0,lte:1000}];
int32 step = 5;
}

//值为-1表示该时间点没有数据
message GetNotebookMetricReply {
repeated double cpuUsage = 1;
repeated double memUsage = 2;
repeated double gpuUtil = 3;
repeated double gpuMemUsage = 4;
repeated double memUsagePercent = 5;
repeated double accCardUtil = 6;
repeated double accCardMemUsage = 7;
repeated double networkReceiveBytes = 8;
repeated double networkTransmitBytes = 9;
repeated double fsUsageBytes = 10;
string company = 11;
}

+ 1
- 2
server/base-server/api/v1/ftpproxy.proto View File

@@ -16,8 +16,7 @@ message CreateOrUpdateFtpAccountRequest {
string username = 1;
string email = 2;
string password = 3;
string homeS3Bucket = 4;
string homeS3Object = 5;
string homeDir = 4;
}

message CreateOrUpdateFtpAccountReply {


+ 1
- 0
server/base-server/api/v1/resource.proto View File

@@ -9,6 +9,7 @@ option go_package = "server/base-server/api/v1;v1";

service ResourceService {
rpc ListResource(google.protobuf.Empty) returns (ResourceList);
rpc ListResourceAll(google.protobuf.Empty) returns (ResourceList);
rpc UpdateResource(UpdateResourceRequest) returns (UpdateResourceReply);
rpc CreateCustomizedResource(CreateCustomizedResourceRequest) returns (CreateCustomizedResourceReply);
rpc DeleteCustomizedResource(DeleteCustomizedResourceRequest) returns (DeleteCustomizedResourceReply);


+ 1
- 0
server/base-server/api/v1/resourcespec.proto View File

@@ -11,6 +11,7 @@ service ResourceSpecService {
rpc CreateResourceSpec(CreateResourceSpecRequest) returns (CreateResourceSpecReply);
rpc DeleteResourceSpec(DeleteResourceSpecRequest) returns (DeleteResourceSpecReply);
rpc GetResourceSpec(GetResourceSpecRequest) returns (GetResourceSpecReply);
rpc GetResourceSpecIgnore(GetResourceSpecRequest) returns (GetResourceSpecReply);
}

message ListResourceSpecRequest {


+ 6
- 0
server/base-server/api/v1/trainJob.proto View File

@@ -304,4 +304,10 @@ message GetJobMetricReply {
repeated double gpuUtil = 3;
repeated double gpuMemUsage = 4;
repeated double memUsagePercent = 5;
repeated double accCardUtil = 6;
repeated double accCardMemUsage = 7;
repeated double networkReceiveBytes = 8;
repeated double networkTransmitBytes = 9;
repeated double fsUsageBytes = 10;
string company = 11;
}

+ 101
- 0
server/base-server/configs/config_project.yaml View File

@@ -0,0 +1,101 @@
app:
name: baseserver
version: v1.0
isDev: true #是否本地调试
logLevel: info
server:
http:
addr: 0.0.0.0:8001
timeout: 60s
grpc:
addr: 0.0.0.0:9001
timeout: 60s
data:
database:
driver: mysql
source: root:root@tcp(192.168.202.73:30336)/octopus?charset=utf8&parseTime=True&loc=Local
kubernetes:
masterUrl: https://192.168.202.73:6443/
configPath: ./kubeconfig
qps: 20
minio:
base:
endPoint: 192.168.202.73:31311
accessKeyID: minioadmin
secretAccessKey: minioadmin
useSSL: false
mountPath: /nfsdata/octopus-242-41/minio
pvcName: octopus-minio-pvc
proxyPath: /oss
business:
downloadExpiry: 86400
uploadExpiry: 86400
harbor:
host: 192.168.202.74:5000
username: openi
password: OpenI2018
apiVersion: v1.0
useSSL: false
redis:
addr: 192.168.202.73:30667
username:
password: abcde
influxdb:
addr: 192.168.202.73:30086
username: octopus
password: octopus
database: octopus
jointCloud:
baseUrl: http://192.168.207.141:8709
username: test
password: 7ee15bc8fee766cad1bd70ccf5f4dc14
sessionExpirySec: 540 #实际有效期为600
ambassador:
baseUrl: 192.168.202.73
pytorchServer:
imageAddr: swr.cn-south-1.myhuaweicloud.com/openioctopus/pytorchserver
version: 2.0.2
sftpgo:
baseUrl: 192.168.202.73:30022
username: admin
password: abcde
prometheus:
baseUrl: http://192.168.202.73:30003
service:
baseServerAddr: http://127.0.0.1:8001
dockerDatasetPath: /dataset
dockerCodePath: /code
dockerModelPath: /model
dockerUserHomePath: /userhome
resourceLabelKey: octopus.pcl.ac.cn/type
billingPeriodSec: 60
isUseMultusCNI: false
networksConf: default/macvlan-cx5-bond-conf
routineNum: 10
develop:
autoStopIntervalSec: 7200
isSetUploadFileSize: true #值为false时,上传文件大小不能超过1M;为true时,不限制
resource:
customizedResourceBindingNodeLabelKeyFormat: openi.octopus.resource.%s
customizedResourceBindingNodeLabelValue: bound
defaultPoolName: common-pool
poolInfoStoreKey: ResourcePoolInfo
poolBindingNodeLabelKeyFormat: openi.octopus.resourcepool.%s
poolBindingNodeLabelValue: bound
poolSelectLabelKey: platform
poolSelectLabelValue: openi.octopus
discoveryLeaderLeaseLockName: resourcediscovery
discoveryDuration: 15s
ignoreSystemResources: hugepages-1Gi,pods,hugepages-2Mi,ephemeral-storage
administrator:
username: "admin"
password: "123456"
email: ""
phone: ""
module:
storage:
source:
capacity: "100Gi"
nfs:
server: 192.168.203.72
path: "/data/datasets/data/octopus-dev-minio"

+ 14
- 2
server/base-server/internal/data/dao/resource.go View File

@@ -12,6 +12,7 @@ import (

type ResourceDao interface {
ListResource() ([]*resources.Resource, error)
ListResourceAll() ([]*resources.Resource, error)
CreateResource(request *resources.CreateResourceRequest) (string, error)
GetResource(id string) (*resources.Resource, error)
UpdateResource(resource *resources.Resource) (string, error)
@@ -42,6 +43,17 @@ func (d *resourceDao) ListResource() ([]*resources.Resource, error) {
return resources, nil
}

func (d *resourceDao) ListResourceAll() ([]*resources.Resource, error) {
db := d.db
resources := make([]*resources.Resource, 0)

if err := db.Unscoped().Find(&resources).Error; err != nil {
return nil, err
}

return resources, nil
}

func (d *resourceDao) CreateResource(request *resources.CreateResourceRequest) (string, error) {
db := d.db
id := utils.GetUUIDWithoutSeparator()
@@ -84,7 +96,7 @@ func (d *resourceDao) DeleteResource(id string) (string, error) {
db := d.db
resource := &resources.Resource{Id: id}

if err := db.Unscoped().Delete(resource).Error; err != nil {
if err := db.Delete(resource).Error; err != nil {
return "", err
}

@@ -99,7 +111,7 @@ func (d *resourceDao) DeleteResourceByName(name string) (string, error) {
return "", err
}

if err := db.Unscoped().Delete(resource).Error; err != nil {
if err := db.Delete(resource).Error; err != nil {
return "", err
}



+ 14
- 1
server/base-server/internal/data/dao/resourcespec.go View File

@@ -13,6 +13,7 @@ type ResourceSpecDao interface {
CreateResourceSpec(request *resources.CreateResourceSpecRequest) (string, error)
DeleteResourceSpec(id string) (string, error)
GetResourceSpec(id string) (*resources.ResourceSpec, error)
GetResourceSpecIgnore(id string) (*resources.ResourceSpec, error)
}

type resourceSepcDao struct {
@@ -72,7 +73,7 @@ func (d *resourceSepcDao) CreateResourceSpec(request *resources.CreateResourceSp
func (d *resourceSepcDao) DeleteResourceSpec(id string) (string, error) {
db := d.db

if err := db.Unscoped().Delete(&resources.ResourceSpec{Id: id}).Error; err != nil {
if err := db.Delete(&resources.ResourceSpec{Id: id}).Error; err != nil {
return "", errors.Errorf(err, errors.ErrorDBDeleteFailed)
}

@@ -90,3 +91,15 @@ func (d *resourceSepcDao) GetResourceSpec(id string) (*resources.ResourceSpec, e

return resourceSpec, nil
}

func (d *resourceSepcDao) GetResourceSpecIgnore(id string) (*resources.ResourceSpec, error) {
db := d.db

resourceSpec := &resources.ResourceSpec{Id: id}

if err := db.Unscoped().Find(&resourceSpec).Error; err != nil {
return nil, err
}

return resourceSpec, nil
}

+ 66
- 0
server/base-server/internal/data/prometheus/prometheus.go View File

@@ -26,8 +26,16 @@ type Reply struct {
type Prometheus interface {
QueryCpuUsage(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)
QueryMemUsage(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)

QueryGpuUtil(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)
QueryGpuMemUtil(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)

QueryAccCardUtil(ctx context.Context, podName, company string, start int64, size int, step int) ([]float64, error)
QueryAccCardMemUtil(ctx context.Context, podName, company string, start int64, size int, step int) ([]float64, error)

QueryNetworkReceiveBytes(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)
QueryNetworkTransmitBytes(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)
QueryFSUsageBytes(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error)
}

func NewPrometheus(baseUrl string) Prometheus {
@@ -63,6 +71,64 @@ func (p *prometheus) QueryGpuMemUtil(ctx context.Context, podName string, start
return p.query(query, start, size, step)
}

func (p *prometheus) QueryAccCardUtil(
ctx context.Context, podName, company string, start int64, size int, step int) ([]float64, error) {

items := map[string]string{
"nvidia": "dcgm_gpu_utilization", // GPU utilization
"huawei": "container_npu_utilization", // NPU utilization
"enflame": "enflame_gcu_usage", // GCU utilization
"metax-tech": "", // "gopkg.in/resty.v1"
}
query := fmt.Sprintf(`%s{pod_name="%s"}`, items[company], podName)
if company == "cambricon" {
query = fmt.Sprintf(`mlu_utilization * on(uuid) group_right mlu_container{pod="%s"}`, podName) // MLU utilization
}
if company == "iluvatar" {
query = fmt.Sprintf(`ix_gpu_utilization{pod="%s"}`, podName) // iluvatar GPU utilization
}
return p.query(query, start, size, step)
}

func (p *prometheus) QueryAccCardMemUtil(
ctx context.Context, podName, company string, start int64, size int, step int) ([]float64, error) {

items := map[string]string{
"nvidia": "dcgm_mem_copy_utilization", // GPU memory utilization
"enflame": "100 * enflame_gcu_memory_usage", // GCU memory utilization
"metax-tech": "", //
}
query := fmt.Sprintf(`%s{pod_name="%s"}`, items[company], podName)
if company == "huawei" {
query = fmt.Sprintf(
`100 * container_npu_used_memory{pod_name="%s"} / container_npu_total_memory{pod_name="%s"}`, // NPU hbm memory utilization
podName, podName)
}
if company == "cambricon" {
query = fmt.Sprintf(
`mlu_memory_utilization * on(uuid) group_right mlu_container{pod="%s"}`, podName) // MLU memory utilization
}
if company == "iluvatar" {
query = fmt.Sprintf(`ix_mem_utilization{pod="%s"}`, podName) // iluvatar GPU memory utilization
}
return p.query(query, start, size, step)
}

func (p *prometheus) QueryNetworkReceiveBytes(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error) {
query := fmt.Sprintf(`sum(rate(container_network_receive_bytes_total{pod_name="%s"}[1m]) or rate(container_network_receive_bytes_total{pod="%s"}[1m]))`, podName, podName)
return p.query(query, start, size, step)
}

func (p *prometheus) QueryNetworkTransmitBytes(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error) {
query := fmt.Sprintf(`sum(rate (container_network_transmit_bytes_total{pod_name="%s"}[1m]) or rate(container_network_receive_bytes_total{pod="%s"}[1m]))`, podName, podName)
return p.query(query, start, size, step)
}

func (p *prometheus) QueryFSUsageBytes(ctx context.Context, podName string, start int64, size int, step int) ([]float64, error) {
query := fmt.Sprintf(`sum(container_fs_usage_bytes{device=~"^/dev/.*$",pod_name="%s"} or container_fs_usage_bytes{device=~"^/dev/.*$",pod="%s"})`, podName, podName)
return p.query(query, start, size, step)
}

func (p *prometheus) query(query string, start int64, size int, step int) ([]float64, error) {
r := &Reply{}
params := map[string]string{


+ 178
- 0
server/base-server/internal/service/develop/develop.go View File

@@ -67,6 +67,8 @@ const (
k8sTaskNamePrefix = "task"
servicePort = 8888
shmResource = "shm"
cpuResource = "cpu"
memResource = "memory"
nodeActionLabelNotebookId = "nodebook.octopus.dev/id"
nodeActionLabelImageId = "image.octopus.dev/id"
kubeAnnotationsProxyBodySize = "nginx.ingress.kubernetes.io/proxy-body-size"
@@ -1107,6 +1109,10 @@ func (s *developService) GetPodNameFromNoteBookTask(notebook *model.Notebook, ta
return fmt.Sprintf("%s-%s-0", notebook.NotebookJobId, taskName)
}

func (s *developService) GetPodNameFromNoteBookTaskByIndex(notebook *model.Notebook, taskIndex int32) string {
return fmt.Sprintf("%s-task%d-0", notebook.NotebookJobId, taskIndex)
}

func (s *developService) getNotebookTaskContainer(ctx context.Context, notebook *model.Notebook, taskName string) (string, string, error) {
pod, err := s.data.Cluster.GetPod(ctx, notebook.UserId, s.GetPodNameFromNoteBookTask(notebook, taskName))
if err != nil {
@@ -1204,3 +1210,175 @@ func defaultDetail(userID string, nbJob *model.NotebookJob) *typeJob.JobStatusDe
},
}
}

func (s *developService) GetNotebookMetric(ctx context.Context, req *api.GetNotebookMetricRequest) (*api.GetNotebookMetricReply, error) {
notebook, err := s.data.DevelopDao.GetNotebook(ctx, req.Id)
if err != nil {
return nil, err
}
resources, err := s.resourceService.ListResourceAll(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}
resourceSpec, err := s.resourceSpecService.GetResourceSpecIgnore(ctx, &api.GetResourceSpecRequest{Id: notebook.ResourceSpecId})
if err != nil {
return nil, err
}
company, err := s.getCompany(ctx, resources, resourceSpec.ResourceSpec)
if err != nil {
return nil, err
}
podName := s.GetPodNameFromNoteBookTaskByIndex(notebook, req.TaskIndex)
cpuUsage, err := s.data.Prometheus.QueryCpuUsage(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
memUsage, err := s.data.Prometheus.QueryMemUsage(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
gpuUtil, err := s.data.Prometheus.QueryGpuUtil(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
gpuMemUtil, err := s.data.Prometheus.QueryGpuMemUtil(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
accCardUtil, err := s.data.Prometheus.QueryAccCardUtil(ctx, podName, company, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
accCardMemUtil, err := s.data.Prometheus.QueryAccCardMemUtil(ctx, podName, company, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
networkReceiveBytes, err := s.data.Prometheus.QueryNetworkReceiveBytes(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
networkTransmitBytes, err := s.data.Prometheus.QueryNetworkTransmitBytes(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
fsUsageBytes, err := s.data.Prometheus.QueryFSUsageBytes(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}

res := &api.GetNotebookMetricReply{
MemUsage: memUsage,
GpuUtil: gpuUtil,
GpuMemUsage: gpuMemUtil,
AccCardUtil: accCardUtil,
AccCardMemUsage: accCardMemUtil,
NetworkReceiveBytes: networkReceiveBytes,
NetworkTransmitBytes: networkTransmitBytes,
FsUsageBytes: fsUsageBytes,
Company: company,
}

cpuAverageUsage, err := s.getCpuAverageUsage(ctx, resources, resourceSpec.ResourceSpec, cpuUsage)
if err == nil { //忽略err
res.CpuUsage = cpuAverageUsage
} else {
for range cpuUsage {
res.CpuUsage = append(res.CpuUsage, -1)
}
}

memUsagePercent, err := s.getMemUsagePercent(ctx, resources, resourceSpec.ResourceSpec, memUsage)
if err == nil { //忽略err
res.MemUsagePercent = memUsagePercent
} else {
for range memUsage {
res.MemUsagePercent = append(res.MemUsagePercent, -1)
}
}

return res, nil
}

func (s *developService) getCpuAverageUsage(
ctx context.Context,
resources *api.ResourceList,
resourceSpec *api.ResourceSpec,
cpuUsage []float64) ([]float64, error) {

for _, r := range resources.Resources {
for k, v := range resourceSpec.ResourceQuantity {
if r.Name == k {
if r.ResourceRef == cpuResource || r.Name == cpuResource {
quantity, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
res := make([]float64, 0)
for _, v := range cpuUsage {
if v == -1 {
res = append(res, v)
} else if quantity.Value() <= 0 {
res = append(res, -1)
} else {
res = append(res, float64(v)/float64(quantity.Value()))
}
}
return res, nil
}
}
}
}
return nil, errors.Errorf(nil, errors.ErrorResourceNotExist)
}

func (s *developService) getMemUsagePercent(
ctx context.Context,
resources *api.ResourceList,
resourceSpec *api.ResourceSpec,
memUsage []float64) ([]float64, error) {

for _, r := range resources.Resources {
for k, v := range resourceSpec.ResourceQuantity {
if r.Name == k {
if r.ResourceRef == memResource || r.Name == memResource {
quantity, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
res := make([]float64, 0)
for _, v := range memUsage {
if v == -1 {
res = append(res, v)
} else if quantity.Value() <= 0 {
res = append(res, -1)
} else {
res = append(res, float64(v)*100/float64(quantity.Value()))
}
}
return res, nil
}
}
}
}
return nil, errors.Errorf(nil, errors.ErrorResourceNotExist)
}

func (s *developService) getCompany(
ctx context.Context,
resources *api.ResourceList,
resourceSpec *api.ResourceSpec) (string, error) {

companyResource := []string{"nvidia", "huawei", "cambricon", "enflame", "iluvatar", "metax-tech"}
for _, v := range companyResource {
for _, r := range resources.Resources {
for k, _ := range resourceSpec.ResourceQuantity {
if r.Name == k {
if strings.Contains(r.ResourceRef, v) || strings.Contains(r.Name, v) {
return v, nil
}
}
}
}
}
return "", nil
}

+ 10
- 2
server/base-server/internal/service/ftpproxy/ftp.go View File

@@ -114,7 +114,8 @@ func (s *FtpProxyService) CreateOrUpdateFtpAccount(ctx context.Context, req *pb.

if fuser == nil {
fuser = sftpgov2.NewUser()
fileSystemConfig := s.newFileSystemConfig(req.HomeS3Bucket, req.HomeS3Object)
//去掉minio中转
//fileSystemConfig := s.newFileSystemConfig(req.HomeS3Bucket, req.HomeS3Object)
permissions := map[string][]sftpgov2.Permission{
"/": {sftpgov2.PERMISSION_STAR},
}
@@ -127,7 +128,8 @@ func (s *FtpProxyService) CreateOrUpdateFtpAccount(ctx context.Context, req *pb.
fuser.SetQuotaFiles(UNLIMITED)
fuser.SetExpirationDate(UNLIMITED)
fuser.SetPermissions(permissions)
fuser.SetFilesystem(*fileSystemConfig)
//fuser.SetFilesystem(*fileSystemConfig)
fuser.SetHomeDir(req.HomeDir)
fuser.SetUploadBandwidth(UNLIMITED)
fuser.SetDownloadBandwidth(UNLIMITED)

@@ -149,6 +151,12 @@ func (s *FtpProxyService) CreateOrUpdateFtpAccount(ctx context.Context, req *pb.
if req.Password != "" {
fuser.SetPassword(password)
}
if req.HomeDir != "" {
fileSystemConfig := sftpgov2.NewFilesystemConfig()
fileSystemConfig.SetProvider(sftpgov2.FSPROVIDERS__0)
fuser.SetFilesystem(*fileSystemConfig)
fuser.SetHomeDir(req.HomeDir)
}
err := s.updateFtpUser(ctx, *fuser, 1)
if err != nil {
return nil, err


+ 68
- 0
server/base-server/internal/service/resources/resource.go View File

@@ -256,6 +256,74 @@ func (rsvc *ResourceService) ListResource(ctx context.Context, req *empty.Empty)
return resourceList, err
}

func (rsvc *ResourceService) ListResourceAll(ctx context.Context, req *empty.Empty) (*api.ResourceList, error) {
resourceList := &api.ResourceList{
Resources: make([]*api.Resource, 0),
}

allResources, err := rsvc.data.ResourceDao.ListResourceAll()

if err != nil {
return &api.ResourceList{}, err
}

allNodes, err := rsvc.data.Cluster.GetAllNodes(ctx)

if err != nil {
return &api.ResourceList{}, err
}

systemResourceNodesMap := collectSystemResourceNodesInfo(rsvc.conf.Service.Resource.IgnoreSystemResources, allNodes)
customizedResourceNodesMap := rsvc.collectCustomizedResourceNodesInfo(allResources, allNodes)

cResourceBindingNodeLabelKeyFormat := rsvc.conf.Service.Resource.CustomizedResourceBindingNodeLabelKeyFormat

for _, dbr := range allResources {

var tempResourceNodes []string
var cResourceBindingNodeLabelKey string
var cResourceBindingNodeLabelValue string

if dbr.ResourceRef == "" {

if nodes, ok := systemResourceNodesMap[dbr.Name]; ok {
tempResourceNodes = nodes
sort.Strings(tempResourceNodes)
}

//let 'shm' resource bingdingNode as memory
if dbr.Name == "shm" {
if nodes, ok := systemResourceNodesMap["memory"]; ok {
tempResourceNodes = nodes
sort.Strings(tempResourceNodes)
}
}
} else {
if nodes, ok := customizedResourceNodesMap[dbr.Name]; ok {
tempResourceNodes = nodes
sort.Strings(tempResourceNodes)
}

cResourceBindingNodeLabelValue = rsvc.conf.Service.Resource.CustomizedResourceBindingNodeLabelValue
cResourceBindingNodeLabelKey = fmt.Sprintf(cResourceBindingNodeLabelKeyFormat, dbr.Name)
}

rmr := &api.Resource{
Id: dbr.Id,
Name: dbr.Name,
Desc: dbr.Desc,
ResourceRef: dbr.ResourceRef,
BindingNodes: tempResourceNodes,
BindingNodeLabelKey: cResourceBindingNodeLabelKey,
BindingNodeLabelValue: cResourceBindingNodeLabelValue,
}

resourceList.Resources = append(resourceList.Resources, rmr)
}

return resourceList, err
}

//Do not save resource nodes info to db!!!
//Because when list resources, if delete resouce label in cluster node labels, it will occurs data consistence problem between db and k8s cluster
func (rsvc *ResourceService) CreateCustomizedResource(ctx context.Context, req *api.CreateCustomizedResourceRequest) (*api.CreateCustomizedResourceReply, error) {


+ 25
- 0
server/base-server/internal/service/resources/resourcespec.go View File

@@ -167,3 +167,28 @@ func (rsvc *ResourceSpecService) GetResourceSpec(ctx context.Context, req *api.G

return &api.GetResourceSpecReply{ResourceSpec: rspec}, nil
}

func (rsvc *ResourceSpecService) GetResourceSpecIgnore(ctx context.Context, req *api.GetResourceSpecRequest) (*api.GetResourceSpecReply, error) {
dbr, err := rsvc.data.ResourceSpecDao.GetResourceSpecIgnore(req.Id)

if err != nil {
return &api.GetResourceSpecReply{}, errors.Errorf(err, errors.ErrorGetResourceSpec)
}

var tempResourceQuantity map[string]string

err = json.Unmarshal([]byte(dbr.ResourceQuantity), &tempResourceQuantity)

if err != nil {
return &api.GetResourceSpecReply{}, errors.Errorf(err, errors.ErrorGetResourceSpec)
}

rspec := &api.ResourceSpec{
Id: dbr.Id,
Name: dbr.Name,
Price: dbr.Price,
ResourceQuantity: tempResourceQuantity,
}

return &api.GetResourceSpecReply{ResourceSpec: rspec}, nil
}

+ 134
- 24
server/base-server/internal/service/trainjob/train_job.go View File

@@ -39,6 +39,8 @@ const (
k8sTaskNamePrefix = "task"
NoDistributedJobNum = 1
shmResource = "shm"
cpuResource = "cpu"
memResource = "memory"
readonlyCodeDir = "/readonlycode"
)

@@ -1363,6 +1365,22 @@ func (s *trainJobService) onJobUpdate(old, obj interface{}) {

func (s *trainJobService) GetJobMetric(ctx context.Context, req *api.GetJobMetricRequest) (*api.GetJobMetricReply, error) {
podName := fmt.Sprintf("%s-task%d-%d", req.Id, req.TaskIndex, req.ReplicaIndex)
trainJob, err := s.data.TrainJobDao.GetTrainJob(ctx, req.Id)
if err != nil {
return nil, err
}
resources, err := s.resourceService.ListResourceAll(ctx, &emptypb.Empty{})
if err != nil {
return nil, err
}
resourceSpec, err := s.resourceSpecService.GetResourceSpecIgnore(ctx, &api.GetResourceSpecRequest{Id: trainJob.Config[req.TaskIndex].ResourceSpecId})
if err != nil {
return nil, err
}
company, err := s.getCompany(ctx, resources, resourceSpec.ResourceSpec)
if err != nil {
return nil, err
}
cpuUsage, err := s.data.Prometheus.QueryCpuUsage(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
@@ -1379,14 +1397,49 @@ func (s *trainJobService) GetJobMetric(ctx context.Context, req *api.GetJobMetri
if err != nil {
return nil, err
}
accCardUtil, err := s.data.Prometheus.QueryAccCardUtil(ctx, podName, company, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
accCardMemUtil, err := s.data.Prometheus.QueryAccCardMemUtil(ctx, podName, company, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
networkReceiveBytes, err := s.data.Prometheus.QueryNetworkReceiveBytes(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
networkTransmitBytes, err := s.data.Prometheus.QueryNetworkTransmitBytes(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}
fsUsageBytes, err := s.data.Prometheus.QueryFSUsageBytes(ctx, podName, req.Start, int(req.Size), int(req.Step))
if err != nil {
return nil, err
}

res := &api.GetJobMetricReply{
CpuUsage: cpuUsage,
MemUsage: memUsage,
GpuUtil: gpuUtil,
GpuMemUsage: gpuMemUtil,
MemUsage: memUsage,
GpuUtil: gpuUtil,
GpuMemUsage: gpuMemUtil,
AccCardUtil: accCardUtil,
AccCardMemUsage: accCardMemUtil,
NetworkReceiveBytes: networkReceiveBytes,
NetworkTransmitBytes: networkTransmitBytes,
FsUsageBytes: fsUsageBytes,
Company: company,
}

cpuAverageUsage, err := s.getCpuAverageUsage(ctx, resources, resourceSpec.ResourceSpec, cpuUsage)
if err == nil { //忽略err
res.CpuUsage = cpuAverageUsage
} else {
for range cpuUsage {
res.CpuUsage = append(res.CpuUsage, -1)
}
}
memUsagePercent, err := s.getMemUsagePercent(ctx, req, memUsage)

memUsagePercent, err := s.getMemUsagePercent(ctx, resources, resourceSpec.ResourceSpec, memUsage)
if err == nil { //忽略err
res.MemUsagePercent = memUsagePercent
} else {
@@ -1398,29 +1451,86 @@ func (s *trainJobService) GetJobMetric(ctx context.Context, req *api.GetJobMetri
return res, nil
}

func (s *trainJobService) getMemUsagePercent(ctx context.Context, req *api.GetJobMetricRequest, memUsage []float64) ([]float64, error) {
trainJob, err := s.data.TrainJobDao.GetTrainJob(ctx, req.Id)
if err != nil {
return nil, err
}
resourceSpec, err := s.resourceSpecService.GetResourceSpec(ctx, &api.GetResourceSpecRequest{Id: trainJob.Config[req.TaskIndex].ResourceSpecId})
if err != nil {
return nil, err
}
func (s *trainJobService) getCpuAverageUsage(
ctx context.Context,
resources *api.ResourceList,
resourceSpec *api.ResourceSpec,
cpuUsage []float64) ([]float64, error) {

quantity, err := resource.ParseQuantity(resourceSpec.ResourceSpec.ResourceQuantity["memory"])
if err != nil {
return nil, err
for _, r := range resources.Resources {
for k, v := range resourceSpec.ResourceQuantity {
if r.Name == k {
if r.ResourceRef == cpuResource || r.Name == cpuResource {
quantity, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
res := make([]float64, 0)
for _, v := range cpuUsage {
if v == -1 {
res = append(res, v)
} else if quantity.Value() <= 0 {
res = append(res, -1)
} else {
res = append(res, float64(v)/float64(quantity.Value()))
}
}
return res, nil
}
}
}
}
return nil, errors.Errorf(nil, errors.ErrorResourceNotExist)
}

res := make([]float64, 0)
for _, v := range memUsage {
if v == -1 {
res = append(res, v)
} else {
res = append(res, float64(int64(v)*100/quantity.Value()))
func (s *trainJobService) getMemUsagePercent(
ctx context.Context,
resources *api.ResourceList,
resourceSpec *api.ResourceSpec,
memUsage []float64) ([]float64, error) {

for _, r := range resources.Resources {
for k, v := range resourceSpec.ResourceQuantity {
if r.Name == k {
if r.ResourceRef == memResource || r.Name == memResource {
quantity, err := resource.ParseQuantity(v)
if err != nil {
return nil, err
}
res := make([]float64, 0)
for _, v := range memUsage {
if v == -1 {
res = append(res, v)
} else if quantity.Value() <= 0 {
res = append(res, -1)
} else {
res = append(res, float64(v)*100/float64(quantity.Value()))
}
}
return res, nil
}
}
}
}
return nil, errors.Errorf(nil, errors.ErrorResourceNotExist)
}

return res, nil
func (s *trainJobService) getCompany(
ctx context.Context,
resources *api.ResourceList,
resourceSpec *api.ResourceSpec) (string, error) {

companyResource := []string{"nvidia", "huawei", "cambricon", "enflame", "iluvatar", "metax-tech"}
for _, v := range companyResource {
for _, r := range resources.Resources {
for k, _ := range resourceSpec.ResourceQuantity {
if r.Name == k {
if strings.Contains(r.ResourceRef, v) || strings.Contains(r.Name, v) {
return v, nil
}
}
}
}
}
return "", nil
}

+ 9
- 5
server/base-server/internal/service/user/user.go View File

@@ -2,6 +2,7 @@ package user

import (
"context"
"fmt"
api "server/base-server/api/v1"
"server/base-server/internal/common"
"server/base-server/internal/conf"
@@ -428,17 +429,20 @@ func (s *UserService) UpdateUserConfig(ctx context.Context, req *api.UpdateUserC
return &api.UpdateUserConfigReply{}, nil
}

func (s *UserService) buildFtpHomeDir(ctx context.Context, userId string) string {
return fmt.Sprintf("/minio/%s/%s", userId, common.USERHOME)
}

func (s *UserService) UpdateUserFtpAccount(ctx context.Context, req *api.UpdateUserFtpAccountRequest) (*api.UpdateUserFtpAccountReply, error) {
user, err := s.data.UserDao.Find(ctx, &model.UserQuery{Id: req.UserId})
if err != nil {
return nil, err
}
_, err = s.ftpProxyService.CreateOrUpdateFtpAccount(ctx, &api.CreateOrUpdateFtpAccountRequest{
Username: req.FtpUserName,
Email: user.Email,
Password: req.FtpPassword,
HomeS3Bucket: common.GetUserBucket(req.UserId),
HomeS3Object: common.GetUserHomeObject(),
Username: req.FtpUserName,
Email: user.Email,
Password: req.FtpPassword,
HomeDir: s.buildFtpHomeDir(ctx, req.UserId),
})
if err != nil {
return nil, err


+ 47
- 1
server/openai-server/api/v1/develop.proto View File

@@ -60,13 +60,19 @@ service Develop {
option (google.api.http) = {
post: "/v1/developmanage/notebook/{notebookId}/save"
};
}
};
// 查询事件记录列表
rpc ListNotebookEventRecord (ListNotebookEventRecordRequest) returns (ListNotebookEventRecordReply) {
option (google.api.http) = {
get: "/v1/developmanage/notebook/{notebookId}/eventrecord"
};
};
// 获取调试任务监控数据
rpc GetNotebookMetric (GetNotebookMetricRequest) returns (GetNotebookMetricReply) {
option (google.api.http) = {
get: "/v1/developmanage/notebookmetric"
};
};
}

message CreateNotebookRequest {
@@ -164,6 +170,7 @@ message Notebook {
repeated Task tasks = 23;
string imageUrl = 24;
string exitMsg = 25;
string command = 26;
}

message ListNotebookReply {
@@ -234,3 +241,42 @@ message ListNotebookEventRecordReply {
int64 totalSize = 1;
repeated NotebookEventRecord records = 2;
}

message GetNotebookMetricRequest {
//Notebook ID
string id = 1[(validate.rules).string = {min_len: 1}];
//子任务序号,从0开始
int32 taskIndex = 2;
//开始时间
int64 start = 3;
//数量, 最大1000
int32 size = 4[(validate.rules).int32 = {gte:0,lte:1000}];
//间隔(秒)
int32 step = 5;
}

//值为-1表示该时间点没有数据
message GetNotebookMetricReply {
//百分比
repeated double cpuUsage = 1;
//字节
repeated double memUsage = 2;
//百分比
repeated double gpuUtil = 3;
//百分比
repeated double gpuMemUsage = 4;
//百分比
repeated double memUsagePercent = 5;
//百分比
repeated double accCardUtil = 6;
//百分比
repeated double accCardMemUsage = 7;
//字节
repeated double networkReceiveBytes = 8;
//字节
repeated double networkTransmitBytes = 9;
//字节
repeated double fsUsageBytes = 10;
//厂商
string company = 11;
}

+ 12
- 0
server/openai-server/api/v1/trainJob.proto View File

@@ -457,4 +457,16 @@ message GetJobMetricReply {
repeated double gpuMemUsage = 4;
//百分比
repeated double memUsagePercent = 5;
//百分比
repeated double accCardUtil = 6;
//百分比
repeated double accCardMemUsage = 7;
//字节
repeated double networkReceiveBytes = 8;
//字节
repeated double networkTransmitBytes = 9;
//字节
repeated double fsUsageBytes = 10;
//厂商
string company = 11;
}

+ 20
- 0
server/openai-server/internal/service/develop.go View File

@@ -217,3 +217,23 @@ func (s *DevelopService) ListNotebookEventRecord(ctx context.Context, req *api.L

return reply, nil
}

func (s *DevelopService) GetNotebookMetric(ctx context.Context, req *api.GetNotebookMetricRequest) (*api.GetNotebookMetricReply, error) {
innerReq := &innerapi.GetNotebookMetricRequest{}
err := copier.Copy(innerReq, req)
if err != nil {
return nil, errors.Errorf(err, errors.ErrorStructCopy)
}

innerReply, err := s.data.DevelopClient.GetNotebookMetric(ctx, innerReq)
if err != nil {
return nil, err
}

reply := &api.GetNotebookMetricReply{}
err = copier.Copy(reply, innerReply)
if err != nil {
return nil, err
}
return reply, nil
}

Loading…
Cancel
Save