#3964 V20230322.patch(优化回传功能)

Merged
ychao_1983 merged 42 commits from V20230322.patch into develop 1 year ago
  1. +20
    -6
      models/cloudbrain.go
  2. +215
    -0
      models/model_migrate_record.go
  3. +1
    -0
      models/models.go
  4. +14
    -0
      modules/cron/tasks_basic.go
  5. +7
    -3
      modules/grampus/grampus.go
  6. +66
    -0
      modules/grampus/resty.go
  7. +41
    -0
      modules/notification/model_schedule/schedule.go
  8. +2
    -0
      modules/notification/notification.go
  9. +9
    -0
      modules/redis/redis_key/model_schedule.go
  10. +6
    -0
      modules/setting/setting.go
  11. +47
    -0
      modules/storage/obs.go
  12. +3
    -253
      modules/urfs_client/urchin/schedule.go
  13. +11
    -0
      modules/util/string.go
  14. +1
    -0
      routers/api/v1/api.go
  15. +41
    -21
      routers/api/v1/repo/modelarts.go
  16. +0
    -5
      routers/repo/cloudbrain.go
  17. +1
    -9
      routers/repo/grampus.go
  18. +374
    -0
      services/ai_task_service/schedule/model_schedule.go
  19. +0
    -93
      services/cloudbrain/cloudbrainTask/schedule.go
  20. +0
    -5
      services/cloudbrain/cloudbrainTask/train.go
  21. +1
    -1
      templates/repo/cloudbrain/show.tmpl
  22. +2
    -1
      templates/repo/grampus/trainjob/show.tmpl
  23. +38
    -16
      web_src/js/features/cloudbrainShow.js
  24. +4
    -2
      web_src/js/features/i18nVue.js

+ 20
- 6
models/cloudbrain.go View File

@@ -1672,6 +1672,19 @@ type GrampusStopJobResponse struct {
Status string `json:"status"`
}

type GrampusModelMigrateInfoResponse struct {
GrampusResult
DestBucket string `json:"destBucket"`
DestEndpoint string `json:"destEndpoint"`
DestObjectKey string `json:"destObjectKey"`
DestProxy string `json:"destProxy"`
FailedReason string `json:"failedReason"`
SrcBucket string `json:"srcBucket"`
SrcEndpoint string `json:"srcEndpoint"`
SrcObjectKey string `json:"srcObjectKey"`
Status int `json:"status"` //0:初始化 1:成功 2:失败 3:调度中
}

type GetGrampusJobEventsResponse struct {
GrampusResult
JobEvents []GrampusJobEvents `json:"jobEvents"`
@@ -1718,12 +1731,13 @@ type GrampusNotebookTask struct {
}

type GrampusDataset struct {
Name string `json:"name"`
Bucket string `json:"bucket"`
EndPoint string `json:"endPoint"`
ObjectKey string `json:"objectKey"`
ContainerPath string `json:"containerPath"`
ReadOnly bool `json:"readOnly"`
Name string `json:"name"`
Bucket string `json:"bucket"`
EndPoint string `json:"endPoint"`
ObjectKey string `json:"objectKey"`
ContainerPath string `json:"containerPath"`
ReadOnly bool `json:"readOnly"`
GetBackEndpoint string `json:"getBackEndpoint"`
}

type CreateGrampusJobRequest struct {


+ 215
- 0
models/model_migrate_record.go View File

@@ -0,0 +1,215 @@
package models

import (
"code.gitea.io/gitea/modules/log"
"errors"
"time"
"xorm.io/builder"

"code.gitea.io/gitea/modules/timeutil"
)

type GrampusMigrateResponse int

const (
GrampusMigrateResponseMigrateInit GrampusMigrateResponse = 0
GrampusMigrateResponseSuccess GrampusMigrateResponse = 1
GrampusMigrateResponseFailed GrampusMigrateResponse = 2
GrampusMigrateResponseMigrating GrampusMigrateResponse = 3
GrampusMigrateResponseNoNeedMigrate GrampusMigrateResponse = 4
)

func (r GrampusMigrateResponse) ConvertToModelMigrateStep() ModelMigrateStep {
switch r {
case GrampusMigrateResponseMigrateInit:
return GrampusMigrateInit
case GrampusMigrateResponseSuccess:
return GrampusMigrateSuccess
case GrampusMigrateResponseFailed:
return GrampusMigrateFailed
case GrampusMigrateResponseMigrating:
return GrampusMigrating
case GrampusMigrateResponseNoNeedMigrate:
return GrampusMigrateNoNeed
}
return -1
}

type ModelMigrateStep int

const (
GrampusMigrateInit ModelMigrateStep = 0
GrampusMigrating ModelMigrateStep = 1
GrampusMigrateSuccess ModelMigrateStep = 2
GrampusMigrateFailed ModelMigrateStep = 3
GrampusMigrateNoNeed ModelMigrateStep = 4
BucketMoving ModelMigrateStep = 10
BucketMoveSuccess ModelMigrateStep = 11
BucketMoveFailed ModelMigrateStep = 12
)

func (m ModelMigrateStep) GetStatus() ModelMigrateStatus {
switch m {
case BucketMoveSuccess, GrampusMigrateNoNeed:
return ModelMigrateSuccess
case GrampusMigrateFailed, BucketMoveFailed:
return ModelMigrateFailed
case GrampusMigrateInit:
return ModelMigrateWaiting
case GrampusMigrateSuccess, GrampusMigrating, BucketMoving:
return ModelMigrating
}
return -1
}

type ModelMigrateStatus int

const (
ModelMigrateSuccess ModelMigrateStatus = 0
ModelMigrating ModelMigrateStatus = 1
ModelMigrateFailed ModelMigrateStatus = 2
ModelMigrateWaiting ModelMigrateStatus = 3
)

var UnFinishedMigrateSteps = []ModelMigrateStep{GrampusMigrateInit, GrampusMigrating, GrampusMigrateSuccess, BucketMoving}

type ModelMigrateRecord struct {
ID int64 `xorm:"pk autoincr"`
CloudbrainID int64 `xorm:"INDEX NOT NULL unique"`
DestBucket string
DestEndpoint string
DestObjectKey string
DestProxy string
SrcBucket string
SrcEndpoint string
SrcObjectKey string
Status ModelMigrateStatus `xorm:"NOT NULL DEFAULT 3"`
CurrentStep ModelMigrateStep `xorm:"NOT NULL DEFAULT 0"`
RetryCount int
CreatedUnix timeutil.TimeStamp `xorm:"created"`
UpdatedUnix timeutil.TimeStamp `xorm:"updated"`
DeletedAt time.Time `xorm:"deleted"`
Remark string
}

func (r *ModelMigrateRecord) IsFinished() bool {
for _, s := range UnFinishedMigrateSteps {
if s == r.CurrentStep {
return false
}
}
return true
}

func updateModelMigrateRecordCols(e Engine, record *ModelMigrateRecord, cols ...string) error {
_, err := e.ID(record.ID).Cols(cols...).Update(record)
return err
}

func UpdateModelMigrateRecordCols(record *ModelMigrateRecord, cols ...string) error {
return updateModelMigrateRecordCols(x, record, cols...)
}
func IncreaseModelMigrateRetryCount(recordId int64) error {
_, err := x.ID(recordId).Incr("retry_count", 1).Update(&ModelMigrateRecord{})
return err
}

func UpdateModelMigrateStatusByStep(record *ModelMigrateRecord, newStep ModelMigrateStep) error {
status := newStep.GetStatus()
if status < 0 {
log.Error("Step format error.id = %d,newStep = %d", record.ID, newStep)
return errors.New("Step format error")
}
record.Status = status
record.CurrentStep = newStep
//正常情况下状态只能向更大的状态更新
n, err := x.Where(builder.NewCond().And(builder.Eq{"id": record.ID}).
And(builder.Lt{"current_step": newStep})).
Cols("status", "current_step").
Update(record)
if err != nil {
log.Error("UpdateModelMigrateStatusByStep err.%v", err)
return err
}

if n == 0 {
log.Error("UpdateModelMigrateStatusByStep total num is 0.r.ID=%d", record.ID)
return errors.New("current_step not valid")
}

return nil
}

func RollBackMigrateStatus(record *ModelMigrateRecord, newStep ModelMigrateStep) error {
status := newStep.GetStatus()
if status < 0 {
log.Error("Step format error.id = %d,newStep = %d", record.ID, newStep)
return errors.New("Step format error")
}
record.Status = status
record.CurrentStep = newStep
_, err := x.ID(record.ID).
Cols("status", "current_step").
Update(record)
if err != nil {
log.Error("RollBackMigrateStatus err.%v", err)
return err
}

return nil
}

func UpdateModelMigrateRecordByStep(record *ModelMigrateRecord) error {
n, err := x.
Where(builder.NewCond().And(builder.Eq{"id": record.ID})).
Update(record)
if err != nil {
log.Error("UpdateModelMigrateRecordByStep err. ID=%d err=%v", record.ID, err)
return err
}
if n == 0 {
log.Error("UpdateModelMigrateRecordByStep total num is 0.r.ID=%d", record.ID)
return errors.New("current_step not valid")
}
return nil
}

func GetUnfinishedModelMigrateRecords() ([]*ModelMigrateRecord, error) {
records := make([]*ModelMigrateRecord, 0, 10)
return records, x.
Where(builder.NewCond().And(builder.In("current_step", UnFinishedMigrateSteps))).
Limit(100).
Find(&records)
}

func InsertModelMigrateRecord(record *ModelMigrateRecord) (_ *ModelMigrateRecord, err error) {

if _, err := x.Insert(record); err != nil {
return nil, err
}

return record, nil
}

func GetModelMigrateRecordByCloudbrainId(cloudbrainId int64) (*ModelMigrateRecord, error) {
r := &ModelMigrateRecord{}
if has, err := x.Where("cloudbrain_id = ?", cloudbrainId).Get(r); err != nil {
log.Error("GetModelMigrateRecordByCloudbrainId err. %v", err)
return nil, err
} else if !has {
return nil, ErrRecordNotExist{}
}
return r, nil

}
func GetModelMigrateRecordById(id int64) (*ModelMigrateRecord, error) {
r := &ModelMigrateRecord{}
if has, err := x.ID(id).Get(r); err != nil {
log.Error("GetModelMigrateRecordByCloudbrainId err. %v", err)
return nil, err
} else if !has {
return nil, ErrRecordNotExist{}
}
return r, nil

}

+ 1
- 0
models/models.go View File

@@ -170,6 +170,7 @@ func init() {
new(TechConvergeBaseInfo),
new(RepoConvergeInfo),
new(UserRole),
new(ModelMigrateRecord),
)

tablesStatistic = append(tablesStatistic,


+ 14
- 0
modules/cron/tasks_basic.go View File

@@ -5,6 +5,7 @@
package cron

import (
"code.gitea.io/gitea/services/ai_task_service/schedule"
"context"
"time"

@@ -249,6 +250,17 @@ func registerHandleScheduleRecord() {
})
}

func registerHandleModelMigrateRecord() {
RegisterTaskFatal("handle_model_migrate_record", &BaseConfig{
Enabled: true,
RunAtStart: false,
Schedule: "@every 1m",
}, func(ctx context.Context, _ *models.User, _ Config) error {
schedule.HandleUnfinishedMigrateRecords()
return nil
})
}

func registerRewardPeriodTask() {
RegisterTaskFatal("reward_period_task", &BaseConfig{
Enabled: true,
@@ -335,4 +347,6 @@ func initBasicTasks() {

registerHandleScheduleRecord()
registerHandleCloudbrainDurationStatistic()

registerHandleModelMigrateRecord()
}

+ 7
- 3
modules/grampus/grampus.go View File

@@ -349,6 +349,9 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str
EndPoint: getEndPoint(),
ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
}
outputGrampus = models.GrampusDataset{
GetBackEndpoint: getEndPoint(),
}
} else if ProcessorTypeGPU == req.ProcessType {
datasetGrampus = getDatasetGPUGrampus(req.DatasetInfos, "/tmp/dataset")
if len(req.ModelName) != 0 {
@@ -372,7 +375,8 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str
ContainerPath: "/tmp/code/" + cloudbrain.DefaultBranchName + ".zip",
}
outputGrampus = models.GrampusDataset{
ContainerPath: "/tmp/output",
ContainerPath: "/tmp/output",
GetBackEndpoint: setting.Attachment.Minio.Endpoint,
}

} else if ProcessorTypeGCU == req.ProcessType {
@@ -398,9 +402,9 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str
ContainerPath: "/tmp/code/" + cloudbrain.DefaultBranchName + ".zip",
}
outputGrampus = models.GrampusDataset{
ContainerPath: "/tmp/output",
ContainerPath: "/tmp/output",
GetBackEndpoint: setting.Attachment.Minio.Endpoint,
}

}

jobResult, err := createJob(models.CreateGrampusJobRequest{


+ 66
- 0
modules/grampus/resty.go View File

@@ -393,6 +393,72 @@ sendjob:
return &result, nil
}

func PostModelMigrate(jobID string) (*models.GrampusModelMigrateInfoResponse, error) {
checkSetting()
client := getRestyClient()
var result models.GrampusModelMigrateInfoResponse

retry := 0

sendjob:
res, err := client.R().
//SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetResult(&result).
Post(HOST + urlTrainJob + "/" + jobID + "/modelMigrate")

if err != nil {
return &result, fmt.Errorf("resty ModelMigrate: %v", err)
}
log.Info("call modelMigrate res=%+v", res)
if result.ErrorCode == errorIllegalToken && retry < 1 {
retry++
log.Info("retry get token")
_ = getToken()
goto sendjob
}

if result.ErrorCode != 0 {
log.Error("ModelMigrate failed(%d): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg)
}

return &result, nil
}

func ModelMigrateInfo(jobID string) (*models.GrampusModelMigrateInfoResponse, error) {
checkSetting()
client := getRestyClient()
var result models.GrampusModelMigrateInfoResponse

retry := 0

sendjob:
res, err := client.R().
//SetHeader("Content-Type", "application/json").
SetAuthToken(TOKEN).
SetResult(&result).
Get(HOST + urlTrainJob + "/" + jobID + "/modelMigrateInfo")

if err != nil {
return &result, fmt.Errorf("resty ModelMigrateInfo: %v", err)
}
log.Info("call modelMigrateInfo res=%+v", res)
if result.ErrorCode == errorIllegalToken && retry < 1 {
retry++
log.Info("retry get token")
_ = getToken()
goto sendjob
}

if result.ErrorCode != 0 {
log.Error("ModelMigrateInfo failed(%d): %s", result.ErrorCode, result.ErrorMsg)
return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg)
}

return &result, nil
}

func GetAiCenters(pageIndex, pageSize int) (*models.GetGrampusAiCentersResult, error) {
checkSetting()
client := getRestyClient()


+ 41
- 0
modules/notification/model_schedule/schedule.go View File

@@ -0,0 +1,41 @@
package model_schedule

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification/base"
)

type scheduleNotifier struct {
base.NullNotifier
}

var (
_ base.Notifier = &scheduleNotifier{}
)

// NewNotifier create a new wechatNotifier notifier
func NewNotifier() base.Notifier {
return &scheduleNotifier{}
}

func (*scheduleNotifier) NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) {
if !cloudbrain.IsTerminal() {
return
}
log.Info("try to InsertModelMigrateRecord.cloudbrainId=%d oldStatus=%s newStatus=%d", cloudbrain.ID, oldStatus, cloudbrain.Status)
switch cloudbrain.Type {
case models.TypeC2Net:
if cloudbrain.JobType == string(models.JobTypeDebug) {
return
}
_, err := models.InsertModelMigrateRecord(&models.ModelMigrateRecord{
CloudbrainID: cloudbrain.ID,
Status: models.ModelMigrating,
CurrentStep: models.GrampusMigrating,
})
if err != nil {
log.Error("InsertModelMigrateRecord err.cloudbrain.id=%d err=%v", cloudbrain.ID, err)
}
}
}

+ 2
- 0
modules/notification/notification.go View File

@@ -11,6 +11,7 @@ import (
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/notification/indexer"
"code.gitea.io/gitea/modules/notification/mail"
"code.gitea.io/gitea/modules/notification/model_schedule"
"code.gitea.io/gitea/modules/notification/reward"
"code.gitea.io/gitea/modules/notification/ui"
"code.gitea.io/gitea/modules/notification/webhook"
@@ -41,6 +42,7 @@ func NewContext() {
RegisterNotifier(action.NewNotifier())
RegisterNotifier(wechatNotifier.NewNotifier())
RegisterNotifier(reward.NewNotifier())
RegisterNotifier(model_schedule.NewNotifier())
}

// NotifyUploadAttachment notifies attachment upload message to notifiers


+ 9
- 0
modules/redis/redis_key/model_schedule.go View File

@@ -0,0 +1,9 @@
package redis_key

import "fmt"

const MODEL_SCHEDULE_PREFIX = "model_schedule"

func RecordHandleLock(jobId string) string {
return KeyJoin(MODEL_SCHEDULE_PREFIX, fmt.Sprint(jobId), "handle")
}

+ 6
- 0
modules/setting/setting.go View File

@@ -676,6 +676,9 @@ var (
DeductTaskRangeForFirst time.Duration
DeductTaskMinTimestamp int64

//model-migrate config
UseLocalMinioMigrate bool

//badge config
BadgeIconMaxFileSize int64
BadgeIconMaxWidth int
@@ -1677,6 +1680,9 @@ func NewContext() {
DeductTaskRangeForFirst = sec.Key("DEDUCT_TASK_RANGE_FOR_FIRST").MustDuration(3 * time.Hour)
DeductTaskMinTimestamp = sec.Key("DEDUCT_TASK_MIN_TIMESTAMP").MustInt64(0)

sec = Cfg.Section("model-migrate")
UseLocalMinioMigrate = sec.Key("USE_LOCAL_MINIO_MIGRATE").MustBool(false)

sec = Cfg.Section("icons")
BadgeIconMaxFileSize = sec.Key("BADGE_ICON_MAX_FILE_SIZE").MustInt64(1048576)
BadgeIconMaxWidth = sec.Key("BADGE_ICON_MAX_WIDTH").MustInt(4096)


+ 47
- 0
modules/storage/obs.go View File

@@ -418,6 +418,53 @@ func GetOneLevelAllObjectUnderDir(bucket string, prefixRootPath string, relative
return fileInfos, nil
}

func GetOneLevelObjectsUnderDir(bucket string, prefixRootPath string, relativePath string) ([]FileInfo, error) {
input := &obs.ListObjectsInput{}
input.Bucket = bucket
input.Prefix = prefixRootPath + relativePath
input.Delimiter = "/"
if !strings.HasSuffix(input.Prefix, "/") {
input.Prefix += "/"
}
fileInfos := make([]FileInfo, 0)
prefixLen := len(input.Prefix)
index := 1
output, err := ObsCli.ListObjects(input)
if err != nil {
if obsError, ok := err.(obs.ObsError); ok {
log.Error("Code:%s, Message:%s", obsError.Code, obsError.Message)
}
return nil, err
}
log.Info("Page:%d\n", index)
index++
for _, val := range output.Contents {
var fileName string
if val.Key == input.Prefix {
continue
}
fileName = val.Key[prefixLen:]
fileInfo := FileInfo{
ModTime: val.LastModified.Local().Format("2006-01-02 15:04:05"),
FileName: fileName,
Size: val.Size,
IsDir: false,
ParenDir: relativePath,
}
fileInfos = append(fileInfos, fileInfo)
}
for _, val := range output.CommonPrefixes {
fileName := strings.TrimSuffix(strings.TrimPrefix(val, input.Prefix), "/")
fileInfo := FileInfo{
FileName: fileName,
IsDir: true,
ParenDir: strings.TrimPrefix(val, prefixRootPath),
}
fileInfos = append(fileInfos, fileInfo)
}
return fileInfos, nil
}

func GetAllObjectByBucketAndPrefix(bucket string, prefix string) ([]FileInfo, error) {
input := &obs.ListObjectsInput{}
input.Bucket = bucket


+ 3
- 253
modules/urfs_client/urchin/schedule.go View File

@@ -1,18 +1,15 @@
package urchin

import (
"encoding/json"
"fmt"
"strings"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/labelmsg"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"encoding/json"
"fmt"
"github.com/minio/minio-go"
"strings"
)

type DecompressReq struct {
@@ -30,253 +27,6 @@ func getUrfsClient() {
urfsClient = New()
}

func GetAITaskOutPutBack(cloudbrainID int64, jobName, centerId, computerResource string) error {
switch computerResource {
case models.NPUResource:
return GetNPUDataBack(cloudbrainID, jobName, centerId)
case models.GPUResource:
return GetGPUDataBack(cloudbrainID, jobName, centerId)
case models.GCUResource:
return GetGCUDataBack(cloudbrainID, jobName, centerId)
}
return nil
}

func GetGPUDataBack(cloudbrainID int64, jobName, centerId string) error {
endpoint := grampus.GetRemoteEndPoint(centerId)
bucket := grampus.BucketRemote
objectKey := grampus.GetGPUModelObjectKey4Grampus(jobName)
destPeerHost := grampus.GetCenterProxy(setting.Grampus.GPULocalCenterID)
getUrfsClient()

var res *PeerResult
var err error
var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute}
for i, retryInterval := range retryIntervalList {
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
if err == nil {
log.Info("ScheduleDirToPeerByKey res=%v", res)
break
}
log.Error("ScheduleDataToPeerByKey failed:%v, ObjectKey is:%s,retry in %v", err, objectKey, retryInterval)
time.Sleep(retryInterval)
// If it's the last retry, break
if i == len(retryIntervalList)-1 {
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
}
}

// If err is still not nil after retrying, insert a default value
if err != nil {
log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v",
endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err)
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageUrchinScheduleFailed,
IsDir: true,
ComputeSource: models.GPUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
Remark: interceptErrorMessages(err),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
return err
}
return fmt.Errorf("GetBackModel failed after retrying:%v", err)
}

_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: true,
ComputeSource: models.GPUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
return err
}

r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID)
if err != nil {
log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err)
return err
}

err = handleScheduleResult(r, res)
if err != nil {
log.Error("GetGPUDataBack handleScheduleResult err.%v", err)
return err
}
return nil
}

func GetGCUDataBack(cloudbrainID int64, jobName, centerId string) error {
endpoint := grampus.GetRemoteEndPoint(centerId)
bucket := grampus.BucketRemote
objectKey := grampus.GetGPUModelObjectKey4Grampus(jobName)
destPeerHost := grampus.GetCenterProxy(setting.Grampus.GPULocalCenterID)

getUrfsClient()

var res *PeerResult
var err error
var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute}
for i, retryInterval := range retryIntervalList {
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
if err == nil {
break
}
log.Error("ScheduleDataToPeerByKey failed:%v, retry in %v", err, retryInterval)
time.Sleep(retryInterval)
// If it's the last retry, break
if i == len(retryIntervalList)-1 {
res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
}
}

// If err is still not nil after retrying, insert a default value
if err != nil {
log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v",
endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err)
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageUrchinScheduleFailed,
IsDir: true,
ComputeSource: models.GCUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
return err
}
return fmt.Errorf("GetBackModel failed after retrying:%v", err)
}

_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: true,
ComputeSource: models.GCUResource,
TargetObjectKey: grampus.GetGPUModelObjectKey(jobName),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
return err
}

r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID)
if err != nil {
log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err)
return err
}

err = handleScheduleResult(r, res)
if err != nil {
log.Error("GetGCUDataBack handleScheduleResult err.%v", err)
return err
}

return nil
}

func GetNPUDataBack(cloudbrainID int64, jobName, centerId string) error {
endpoint := grampus.GetRemoteEndPoint(centerId)
bucket := grampus.BucketRemote
objectKey := grampus.GetNpuModelObjectKey(jobName)
destPeerHost := grampus.GetCenterProxy(setting.Grampus.LocalCenterID)

getUrfsClient()
var res *PeerResult
var err error
var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute}
for i, retryInterval := range retryIntervalList {
res, err = urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
if err == nil {
break
}
log.Error("ScheduleDataToPeerByKey failed:%v, ObjectKey is:%s,retry in %v", err, objectKey, retryInterval)
time.Sleep(retryInterval)
// If it's the last retry, break
if i == len(retryIntervalList)-1 {
res, err = urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost)
}
}

// If err is still not nil after retrying, insert a default value
if err != nil {
log.Error("ScheduleDataToPeerByKey failed after retrying, errorInfo is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,error:%v",
endpoint, bucket, objectKey, destPeerHost, err)
_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: models.StorageUrchinScheduleFailed,
IsDir: false,
ComputeSource: models.NPUResource,
Remark: interceptErrorMessages(err),
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
return err
}
return fmt.Errorf("GetBackModel failed after retrying:%v", err)
}

_, err = models.InsertScheduleRecord(&models.ScheduleRecord{
CloudbrainID: cloudbrainID,
EndPoint: endpoint,
Bucket: bucket,
ObjectKey: objectKey,
ProxyServer: destPeerHost,
Status: res.StatusCode,
IsDir: false,
ComputeSource: models.NPUResource,
LocalOperateStatus: models.MoveBucketWaiting,
})
if err != nil {
log.Error("InsertScheduleRecord failed:%v", err)
return err
}

r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID)
if err != nil {
log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err)
return err
}

err = handleScheduleResult(r, res)
if err != nil {
log.Error("GetNPUDataBack handleScheduleResult err.%v", err)
return err
}

return nil
}

func tryScheduleDir(endpoint, bucket, objectKey, dstPeer string) {
println("new request dstPeer: ", dstPeer)
urfs := New()


+ 11
- 0
modules/util/string.go View File

@@ -0,0 +1,11 @@
package util

func TruncateString(msg string, maxLength int) string {
if msg == "" {
return ""
}
if len(msg) < maxLength {
maxLength = len(msg)
}
return msg[0:maxLength]
}

+ 1
- 0
routers/api/v1/api.go View File

@@ -1053,6 +1053,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Put("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GeneralCloudBrainJobStop)
m.Group("/model", func() {
m.Get("/schedule_status", repo.GetModelScheduleStatus)
m.Post("/reschedule", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.RetryModelSchedule)
})
})
})


+ 41
- 21
routers/api/v1/repo/modelarts.go View File

@@ -6,6 +6,7 @@
package repo

import (
"code.gitea.io/gitea/services/ai_task_service/schedule"
"encoding/json"
"net/http"
"path"
@@ -20,8 +21,6 @@ import (

"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask"

"code.gitea.io/gitea/modules/urfs_client/urchin"

"code.gitea.io/gitea/modules/notification"

"code.gitea.io/gitea/modules/grampus"
@@ -169,9 +168,6 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {
}
if oldStatus != job.Status {
notification.NotifyChangeCloudbrainStatus(job, oldStatus)
if models.IsTrainJobTerminal(job.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 {
go urchin.GetAITaskOutPutBack(job.ID, job.JobName, result.JobInfo.Tasks[0].CenterID[0], job.ComputeResource)
}
}
err = models.UpdateTrainJobVersion(job)
if err != nil {
@@ -191,7 +187,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) {

func GetModelScheduleStatus(ctx *context.APIContext) {
jobID := ctx.Params(":jobid")
status, err := cloudbrainTask.GetModelScheduleStatus(jobID)
status, err := schedule.GetModelScheduleStatus(jobID)
if err != nil {
ctx.JSON(http.StatusOK, response.OuterResponseError(err))
return
@@ -200,6 +196,16 @@ func GetModelScheduleStatus(ctx *context.APIContext) {
ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m))
}

func RetryModelSchedule(ctx *context.APIContext) {
jobID := ctx.Params(":jobid")
err := schedule.RetryModelMigrate(jobID)
if err != nil {
ctx.JSON(http.StatusOK, response.OuterResponseError(err))
return
}
ctx.JSON(http.StatusOK, response.OuterSuccess())
}

func TrainJobForModelConvertGetLog(ctx *context.APIContext) {
var (
err error
@@ -466,14 +472,41 @@ func ModelList(ctx *context.APIContext) {
return
}

status := models.ModelScheduleSucceed
status := models.ModelMigrateSuccess

if task.Type == models.TypeC2Net {
if !task.IsTerminal() {
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobID)
status = models.JobNoTeminal
} else {
status, err = schedule.GetModelScheduleStatus(task.JobID)
if err != nil {
log.Error("GetModelScheduleStatus(%s) failed:%v", task.JobName, err.Error())
return
}
}
}

if status != models.ModelMigrateSuccess {
ctx.JSON(http.StatusOK, map[string]interface{}{
"JobID": jobID,
"VersionName": versionName,
"StatusOK": status,
"Path": dirArray,
"Dirs": []storage.FileInfo{},
"task": task,
"PageIsCloudBrain": true,
})
return
}

var fileInfos []storage.FileInfo
if task.ComputeResource == models.NPUResource {
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, task.JobName, setting.OutPutPath, versionName), "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
fileInfos, err = storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, parentDir)
fileInfos, err = storage.GetOneLevelObjectsUnderDir(setting.Bucket, prefix, parentDir)
if err != nil {
log.Info("get TrainJobListModel failed:", err)
ctx.ServerError("GetObsListObject:", err)
@@ -504,19 +537,6 @@ func ModelList(ctx *context.APIContext) {
})
}

if task.Type == models.TypeC2Net {
if !task.IsTerminal() {
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobID)
status = models.JobNoTeminal
} else {
status, err = cloudbrainTask.GetModelScheduleStatus(task.JobID)
if err != nil {
log.Error("GetModelScheduleStatus(%s) failed:%v", task.JobName, err.Error())
return
}
}
}

ctx.JSON(http.StatusOK, map[string]interface{}{
"JobID": jobID,
"VersionName": versionName,


+ 0
- 5
routers/repo/cloudbrain.go View File

@@ -21,8 +21,6 @@ import (

cloudbrainService "code.gitea.io/gitea/services/cloudbrain"

"code.gitea.io/gitea/modules/urfs_client/urchin"

"code.gitea.io/gitea/modules/dataset"

"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask"
@@ -1953,9 +1951,6 @@ func SyncCloudbrainStatus() {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 {
go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource)
}
}
err = models.UpdateJob(task)
if err != nil {


+ 1
- 9
routers/repo/grampus.go View File

@@ -16,7 +16,6 @@ import (

"code.gitea.io/gitea/services/lock"

"code.gitea.io/gitea/modules/urfs_client/urchin"
"code.gitea.io/gitea/routers/response"

"code.gitea.io/gitea/services/cloudbrain/cloudbrainTask"
@@ -1379,11 +1378,6 @@ func GrampusNotebookShow(ctx *context.Context) {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource {
if len(result.JobInfo.Tasks[0].CenterID) == 1 {
urchin.GetNPUDataBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0])
}
}
}
}
err = models.UpdateJob(task)
@@ -1535,9 +1529,6 @@ func GrampusTrainJobShow(ctx *context.Context) {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 {
go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource)
}
}
}
err = models.UpdateJob(task)
@@ -1576,6 +1567,7 @@ func GrampusTrainJobShow(ctx *context.Context) {
ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false)
ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task)
ctx.Data["displayJobName"] = task.DisplayJobName
ctx.Data["canReschedule"] = cloudbrain.CanDeleteJob(ctx, task)

ctx.Data["ai_center"] = cloudbrainService.GetAiCenterShow(task.AiCenter, ctx)



+ 374
- 0
services/ai_task_service/schedule/model_schedule.go View File

@@ -0,0 +1,374 @@
package schedule

import (
"bytes"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/grampus"
"code.gitea.io/gitea/modules/labelmsg"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/util"
"encoding/json"
"errors"
"fmt"
"github.com/minio/minio-go"
"os/exec"
"path"
"strings"
"time"
)

const NPUModelDefaultName = "models.zip"

func GetModelScheduleStatus(jobId string) (models.ModelMigrateStatus, error) {
job, err := models.GetCloudbrainByJobID(jobId)
if err != nil {
log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err)
return 0, errors.New("jobId not correct")
}
if !job.IsTerminal() {
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId)
return models.ModelMigrateWaiting, nil
}

record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID)
if err != nil {
log.Error("GetModelScheduleStatus GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err)
if models.IsErrRecordNotExist(err) {
return models.ModelMigrateSuccess, nil
}
return models.ModelMigrateFailed, err
}

if !record.IsFinished() {
go HandleUnfinishedMigrateRecord(record)
}

return record.Status, nil
}

func RetryModelMigrate(jobId string) error {
job, err := models.GetCloudbrainByJobID(jobId)
if err != nil {
log.Error("RetryModelMigrate GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err)
return errors.New("jobId not correct")
}
if !job.IsTerminal() {
log.Info("RetryModelMigrate job is not terminal.jobId=%s", jobId)
return errors.New("task is not terminal")
}

//避免并发问题,先尝试获取锁,获取锁以后再查最新的记录
lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(jobId))
success, err := lock.LockWithWait(10*time.Second, 10*time.Second)
if err != nil {
log.Error("HandleUnfinishedMigrateRecord lock err.jobId=%d %v", jobId, err)
return err
}
if !success {
log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", jobId)
return nil
}
defer lock.UnLock()

record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID)
if err != nil {
log.Error("RetryModelMigrate GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err)
if models.IsErrRecordNotExist(err) {
return nil
}
return err
}

//只有两种情况可以再次调度,一是虎鲸调度失败 二是本地移桶失败
if record.CurrentStep == models.GrampusMigrateFailed {
log.Info("retry PostModelMigrate. record.id = %d", record.ID)
_, err := grampus.PostModelMigrate(jobId)
if err != nil {
log.Error("PostModelMigrate err.%v", err)
return err
}
models.IncreaseModelMigrateRetryCount(record.ID)
if err := models.RollBackMigrateStatus(record, models.GrampusMigrating); err != nil {
log.Error("UpdateModelMigrateStatusByStep err.%v", err)
return err
}
return nil
}

if record.CurrentStep == models.BucketMoveFailed {
log.Info("retry BucketMove. record.id = %d", record.ID)
if err := models.RollBackMigrateStatus(record, models.GrampusMigrateSuccess); err != nil {
log.Error("UpdateModelMigrateStatusByStep err.%v", err)
return err
}
models.IncreaseModelMigrateRetryCount(record.ID)
return nil
}

return errors.New("No need to retry,the model migration has been successful or is in the process.")

}

func HandleUnfinishedMigrateRecords() {
list, err := models.GetUnfinishedModelMigrateRecords()
if err != nil {
log.Error("GetUnfinishedModelMigrateRecords err=%v", err)
return
}
for _, r := range list {
HandleUnfinishedMigrateRecord(r)
}
}

func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error {
cloudbrain, err := models.GetCloudbrainByID(fmt.Sprint(r.CloudbrainID))
if err != nil {
log.Error("GetCloudbrainByID err. %v", err)
return err
}

lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(cloudbrain.JobID))
success, err := lock.LockWithWait(10*time.Second, 10*time.Second)
if err != nil {
log.Error("HandleUnfinishedMigrateRecord lock err.ID=%d %v", r.ID, err)
return err
}
if !success {
log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", r.ID)
return nil
}
defer lock.UnLock()

//拿到锁以后重新查询一次
r, err = models.GetModelMigrateRecordById(r.ID)
if err != nil {
log.Error("RetryModelMigrate GetModelMigrateRecordById err.Id=%s err=%v", r.ID, err)
if models.IsErrRecordNotExist(err) {
return nil
}
return err
}

if r.CurrentStep == models.GrampusMigrateInit || r.CurrentStep == models.GrampusMigrating {
if err := UpdateModelMigrateStatusFromGrampus(r, cloudbrain.JobID); err != nil {
log.Error("UpdateModelMigrateStatusFromGrampus err. %v", err)
return err
}
}

if r.CurrentStep == models.GrampusMigrateSuccess {
if err := LocalMigrateOperate(cloudbrain.JobName, cloudbrain.ComputeResource, r); err != nil {
log.Error("LocalMigrateOperate err. %v", err)
return err
}
}

if r.CurrentStep == models.BucketMoving {
//尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功
if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) {
TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName)
}
}
return nil
}

func UpdateModelMigrateStatusFromGrampus(r *models.ModelMigrateRecord, jobId string) error {
res, err := grampus.ModelMigrateInfo(jobId)
if err != nil {
log.Error("ModelMigrateInfo err. r.ID=%d %v", r.ID, err)
return err
}
log.Info("grampus ModelMigrateInfo r.ID=%d res=%+v", r.ID, res)
newStep := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep()
if newStep == r.CurrentStep {
log.Info("The status has not changed. r.ID=%d status=%d", r.ID, res.Status)
return nil
}
err = updateModelMigrateFromRes(r, res)
if err != nil {
log.Error("updateModelMigrateFromRes err. r.ID=%d %v", r.ID, err)
return err
}
return nil
}

func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRecord) error {
log.Info("Grampus model migrate succeed,objectKey = %s computeSource= %s", r.DestObjectKey, computeSource)
err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoving)
if err != nil {
log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err)
return err
}
if computeSource == models.NPUResource {
//因为NPU的输出会被压缩,因此需要解压+移桶
decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix))
} else {
//因为调度无法指定桶,所以调度成功后我们还需要移桶
if setting.UseLocalMinioMigrate {
if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil {
log.Error("MoveBucketJust4LocalMinio err.%v", err)
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr)
}
return err
}
} else {
if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil {
log.Error("MoveBucketInOpenIMinio err.%v", err)
if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr)
}
return err
}
}

if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err)
}
}
return nil
}

func TryToUpdateNPUMoveBucketResult(record *models.ModelMigrateRecord, jobName, versionName string) error {
if IsNPUModelDirHasFile(jobName, versionName) {
if err := models.UpdateModelMigrateStatusByStep(record, models.BucketMoveSuccess); err != nil {
log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", record.ID, models.BucketMoveSuccess, err)
return err
}
}
return nil
}

func updateModelMigrateFromRes(r *models.ModelMigrateRecord, res *models.GrampusModelMigrateInfoResponse) error {
step := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep()
err := models.UpdateModelMigrateStatusByStep(r, step)
if err != nil {
log.Error("UpdateModelMigrateStatusByStep err,ID=%d err=%v", r.ID, err)
return err
}
r.DestBucket = res.DestBucket
r.DestEndpoint = res.DestEndpoint
r.DestObjectKey = res.DestObjectKey
r.DestProxy = res.DestProxy
r.Remark = strings.TrimPrefix(r.Remark+";"+util.TruncateString(res.FailedReason, 200), ";")
r.SrcBucket = res.SrcBucket
r.SrcEndpoint = res.SrcEndpoint
r.SrcObjectKey = res.SrcObjectKey
err = models.UpdateModelMigrateRecordByStep(r)
if err != nil {
log.Error("updateModelMigrateFromRes UpdateModelMigrateRecord error.id=%d.err=%v", r.ID, err)
return err
}
return nil
}

func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error {
var core = storage.ScheduleMinioCore
objectInfo := core.Client.ListObjects(oldBucket, objectKeyPrefix, true, nil)
log.Info("MoveBucketInOpenIMinio start.objectKeyPrefix=%s", objectKeyPrefix)
count := 0
for object := range objectInfo {
count++
if object.Err != nil {
log.Error("MoveBucketInOpenIMinio object.Err=%v", object.Err)
return object.Err
}
log.Debug("MoveBucketInOpenIMinio object.Key=%s", object.Key)
newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1)
err := MoveMinioFileBucket(core, object.Key, newObjectKey, oldBucket, newBucket)
if err != nil {
log.Error("MoveBucketInOpenIMinio MoveMinioFileBucket object.Key=%s Err=%v", object.Key, err)
continue
}
}
log.Info("MoveBucketInOpenIMinio finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count)
return nil
}

func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error {
oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix)
newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix)
log.Info("MoveBucketJust4LocalMinio start.oldPath=%s newPath=%s", oldPath, newPath)
//重命名原有文件夹,防止已有该文件
err, errStr := sudoMv(newPath, fmt.Sprintf("%s_%d", newPath, time.Now().Unix()))
if err != nil {
log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr)
}
//移动(重命名)文件夹
err, errStr = sudoMv(oldPath, newPath)
if err != nil {
log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr)
return err
}
log.Info("MoveBucketInOpenIMinio finished.oldPath=%s newPath=%s ", oldPath, newPath)
return nil
}

func sudoMv(oldPath, newPath string) (error, string) {
c := fmt.Sprintf("sudo mv %s %s", oldPath, newPath)
log.Info("start to sudoMv,oldPath=%s newPath=%s", oldPath, newPath)
cmd := exec.Command("/bin/sh", "-c", c)
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout // 标准输出
cmd.Stderr = &stderr // 标准错误
err := cmd.Run()
outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes())
log.Debug("out:\n%s\nerr:\n%s\n", outStr, errStr)
if err != nil {
log.Error("cmd.Run() failed,oldPath=%s newPath=%s err=%v\n", oldPath, newPath, err)
return err, errStr
}
return nil, errStr
}

func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket, newBucket string) error {
_, err := core.CopyObject(oldBucket, oldObjectKey, newBucket, newObjectKey, map[string]string{})

if err != nil {
log.Error("MoveBucketInOpenIMinio CopyObject err oldObjectKey=%s .%v", oldObjectKey, err)
return err
}

err = core.RemoveObject(oldBucket, oldObjectKey)
if err != nil {
log.Error("MoveBucketInOpenIMinio RemoveObject err oldObjectKey=%s .%v", oldObjectKey, err)
}
return err
}

type DecompressReq struct {
SourceFile string `json:"source_file"`
DestPath string `json:"dest_path"`
}

func decompress(sourceFile, destPath string) {
req, _ := json.Marshal(DecompressReq{
SourceFile: sourceFile,
DestPath: destPath,
})
err := labelmsg.SendDecompressAttachToLabelOBS(string(req))
if err != nil {
log.Error("SendDecompressTask to labelsystem (%s) failed:%s", sourceFile, err.Error())
}
}

func IsNPUModelDirHasFile(jobName string, versionName string) bool {
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "")
if err != nil {
log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err)
return false
}

if len(fileInfos) > 0 {
return true
}
return len(fileInfos) > 0
}

+ 0
- 93
services/cloudbrain/cloudbrainTask/schedule.go View File

@@ -1,93 +0,0 @@
package cloudbrainTask

import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"errors"
"path"
"strings"
)

func GetModelScheduleStatus(jobId string) (models.ModelScheduleStatus, error) {
job, err := models.GetCloudbrainByJobID(jobId)
if err != nil {
log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err)
return 0, errors.New("jobId not correct")
}
if !job.IsTerminal() {
log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId)
return models.ModelScheduleWaiting, nil
}

record, err := models.GetScheduleRecordByCloudbrainID(job.ID)
if err != nil {
log.Error("GetModelScheduleStatus GetScheduleRecordByCloudbrainID err.jobId=%s err=%v", jobId, err)
if models.IsErrRecordNotExist(err) {
return models.ModelScheduleSucceed, nil
}
return models.ModelScheduleFailed, err
}

switch record.Status {
case models.StorageUrchinScheduleWaiting:
return models.ModelScheduleWaiting, nil
case models.StorageUrchinScheduleProcessing:
return models.ModelScheduleOperating, nil
case models.StorageUrchinScheduleFailed:
return models.ModelScheduleFailed, nil
case models.StorageUrchinNoFile:
return models.ModelScheduleSucceed, nil
case models.StorageUrchinScheduleSucceed:
moveStatus, err := GetMoveBucketStatus(record, job.JobName, job.VersionName)
if err != nil {
log.Error("GetMoveBucketStatus err.%v", err)
return models.ModelScheduleFailed, err
}
switch moveStatus {
case models.MoveBucketSucceed:
return models.ModelScheduleSucceed, nil
case models.MoveBucketOperating:
return models.ModelScheduleOperating, nil
case models.MoveBucketFailed:
return models.ModelScheduleFailed, nil
}
}

return models.ModelScheduleFailed, nil
}

func GetMoveBucketStatus(record *models.ScheduleRecord, jobName, versionName string) (int, error) {

if record.ComputeSource == models.GPUResource || record.ComputeSource == models.GCUResource {
return record.LocalOperateStatus, nil
}
if record.LocalOperateStatus != models.MoveBucketOperating {
return record.LocalOperateStatus, nil
}
//由于NPU回传后还有异步的解压,所以对于进行中的状态需要进一步查询是否已解压结束
//判断方法是查询模型目录是否有文件
if IsNPUModelDirHasFile(jobName, versionName) {
models.UpdateScheduleLocalOperateStatus(record, models.MoveBucketSucceed)
return models.MoveBucketSucceed, nil
}
return record.LocalOperateStatus, nil
}

func IsNPUModelDirHasFile(jobName string, versionName string) bool {
prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "")
if err != nil {
log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err)
return false
}

if len(fileInfos) > 0 {
return true
}
return len(fileInfos) > 0
}

+ 0
- 5
services/cloudbrain/cloudbrainTask/train.go View File

@@ -13,8 +13,6 @@ import (
"strconv"
"strings"

"code.gitea.io/gitea/modules/urfs_client/urchin"

"code.gitea.io/gitea/modules/timeutil"

"code.gitea.io/gitea/modules/notification"
@@ -1087,9 +1085,6 @@ func SyncTaskStatus(task *models.Cloudbrain) error {
task.CorrectCreateUnix()
if oldStatus != task.Status {
notification.NotifyChangeCloudbrainStatus(task, oldStatus)
if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 {
go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource)
}
}
err = models.UpdateJob(task)
if err != nil {


+ 1
- 1
templates/repo/cloudbrain/show.tmpl View File

@@ -389,7 +389,7 @@

function parseLog() {
let jsonValue = document.getElementById("json_value").value;
let jsonObj = JSON.parse(jsonValue);
let jsonObj = jsonValue&&JSON.parse(jsonValue);
let podRoleName = jsonObj["podRoleName"];
let html = "";
if (podRoleName != null) {


+ 2
- 1
templates/repo/grampus/trainjob/show.tmpl View File

@@ -94,7 +94,7 @@
{{if eq .ComputeResource "CPU/GPU"}}
<a class="item run_info" data-tab="five{{$k}}" data-version="{{.VersionName}}">{{$.i18n.Tr "repo.cloudbrain.runinfo"}}</a>
{{end}}
<a class="item load-model-file" data-tab="third{{$k}}" data-download-flag="{{$.canDownload}}" data-path="{{$.RepoLink}}/modelarts/train-job/{{.JobID}}/model_list" data-version="{{.VersionName}}" data-parents="" data-filename="" data-init="init" >{{$.i18n.Tr "repo.model_download"}}</a>
<a class="item load-model-file" data-tab="third{{$k}}" data-can-reschedule="{{$.canReschedule}}" data-retry-path="{{$.RepoLink}}/cloudbrain/train-job/{{.JobID}}/model/reschedule" data-download-flag="{{$.canDownload}}" data-path="{{$.RepoLink}}/modelarts/train-job/{{.JobID}}/model_list" data-version="{{.VersionName}}" data-parents="" data-filename="" data-init="init" >{{$.i18n.Tr "repo.model_download"}}</a>
</div>
<div class="ui tab active" data-tab="first{{$k}}">
<div style="padding-top: 10px;">
@@ -587,6 +587,7 @@
});
$('td.ti-text-form-content.spec').text(specStr);
})();
console.log({{.version_list_task}})
var setting = {
check: {
enable: true,


+ 38
- 16
web_src/js/features/cloudbrainShow.js View File

@@ -565,6 +565,7 @@ export default async function initCloudrainSow() {
activeTab.trigger('click');
}
//

$(".content-pad").on("click", ".load-model-file", function () {
let downloadFlag = $(this).data("download-flag") || "";
let gpuFlag = $(this).data("gpu-flag") || "";
@@ -573,9 +574,12 @@ export default async function initCloudrainSow() {
let filename = $(this).data("filename");
let init = $(this).data("init") || "";
let path = $(this).data("path");
let retryPath = `/api/v1/repos${$(this).data("retry-path")}`;
const rescheduleFlag = $(this).data("can-reschedule") || "";
$(`#dir_list${version_name}`).empty();
let url = `/api/v1/repos${path}?version_name=${version_name}&parentDir=${parents}`;
$.get(url, (data) => {
if (data.StatusOK == 0) { // 成功 0
if (data.Dirs) {
data.Dirs.length !==0 && $(`#${version_name}-result-down`).show()
@@ -595,6 +599,7 @@ export default async function initCloudrainSow() {
$(`#file_breadcrumb${version_name}`).append(htmlBread);
} else {
renderBrend(
this,
path,
version_name,
parents,
@@ -609,7 +614,7 @@ export default async function initCloudrainSow() {
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);">
<div style="display:flex;justify-content:center;align-items:center;height:24px;width:24px;margin-right:5px;">
<svg xmlns="http://www.w3.org/2000/svg" class="styles__StyledSVGIconPathComponent-sc-16fsqc8-0 iKfgJk svg-icon-path-icon fill" viewBox="0 0 32 32" width="16" height="16"><defs data-reactroot=""></defs><g><path d="M16 29.333c-7.364 0-13.333-5.969-13.333-13.333s5.969-13.333 13.333-13.333 13.333 5.969 13.333 13.333-5.969 13.333-13.333 13.333zM16 26.667c5.891 0 10.667-4.776 10.667-10.667s-4.776-10.667-10.667-10.667v0c-5.891 0-10.667 4.776-10.667 10.667s4.776 10.667 10.667 10.667v0zM17.333 16h5.333v2.667h-8v-9.333h2.667v6.667z"></path></g></svg>
</div>
</div>
<span>${i18n['task_not_finished']}</span>
</div>`);
}else if (data.StatusOK == 1) { // 处理中 1
@@ -628,12 +633,23 @@ export default async function initCloudrainSow() {
</div>`);
} else if (data.StatusOK == 2) { // 失败 2
$(`#file_breadcrumb${version_name}`).empty();
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);">
<div style="display:flex;justify-content:center;align-items:center;height:24px;width:24px;margin-right:5px;">
<svg xmlns="http://www.w3.org/2000/svg" class="styles__StyledSVGIconPathComponent-sc-16fsqc8-0 iKfgJk svg-icon-path-icon fill" viewBox="64 64 896 896" width="16" height="16"><defs data-reactroot=""></defs><g><path d="M464 720a48 48 0 1 0 96 0 48 48 0 1 0-96 0zm16-304v184c0 4.4 3.6 8 8 8h48c4.4 0 8-3.6 8-8V416c0-4.4-3.6-8-8-8h-48c-4.4 0-8 3.6-8 8zm475.7 440l-416-720c-6.2-10.7-16.9-16-27.7-16s-21.6 5.3-27.7 16l-416 720C56 877.4 71.4 904 96 904h832c24.6 0 40-26.6 27.7-48zm-783.5-27.9L512 239.9l339.8 588.2H172.2z"></path></g></svg>
</div>
<span>${i18n['file_sync_fail']}</span>
</div>`);
if (rescheduleFlag) {
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);">
<div style="display:flex;justify-content:center;align-items:center;height:24px;width:24px;margin-right:5px;">
<svg xmlns="http://www.w3.org/2000/svg" class="styles__StyledSVGIconPathComponent-sc-16fsqc8-0 iKfgJk svg-icon-path-icon fill" viewBox="64 64 896 896" width="16" height="16"><defs data-reactroot=""></defs><g><path d="M464 720a48 48 0 1 0 96 0 48 48 0 1 0-96 0zm16-304v184c0 4.4 3.6 8 8 8h48c4.4 0 8-3.6 8-8V416c0-4.4-3.6-8-8-8h-48c-4.4 0-8 3.6-8 8zm475.7 440l-416-720c-6.2-10.7-16.9-16-27.7-16s-21.6 5.3-27.7 16l-416 720C56 877.4 71.4 904 96 904h832c24.6 0 40-26.6 27.7-48zm-783.5-27.9L512 239.9l339.8 588.2H172.2z"></path></g></svg>
</div>
<span>${i18n['file_sync_fail']}</span>
<a href="javascript:void(0)" id="retry_result" style='text-decoration: underline;margin-left:0.5rem'>${i18n['retrieve_results']}</a>
</div>`);
}
else {
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);">
<div style="display:flex;justify-content:center;align-items:center;height:24px;width:24px;margin-right:5px;">
<svg xmlns="http://www.w3.org/2000/svg" class="styles__StyledSVGIconPathComponent-sc-16fsqc8-0 iKfgJk svg-icon-path-icon fill" viewBox="64 64 896 896" width="16" height="16"><defs data-reactroot=""></defs><g><path d="M464 720a48 48 0 1 0 96 0 48 48 0 1 0-96 0zm16-304v184c0 4.4 3.6 8 8 8h48c4.4 0 8-3.6 8-8V416c0-4.4-3.6-8-8-8h-48c-4.4 0-8 3.6-8 8zm475.7 440l-416-720c-6.2-10.7-16.9-16-27.7-16s-21.6 5.3-27.7 16l-416 720C56 877.4 71.4 904 96 904h832c24.6 0 40-26.6 27.7-48zm-783.5-27.9L512 239.9l339.8 588.2H172.2z"></path></g></svg>
</div>
<span>${i18n['file_sync_fail']}</span>
</div>`);
}
} else if (data.StatusOK == 3) { // 等待同步 3
$(`#file_breadcrumb${version_name}`).empty();
$(`#dir_list${version_name}`).html(`<div style="height:200px;display:flex;justify-content:center;align-items:center;font-size:14px;color:rgb(16, 16, 16);">
@@ -651,10 +667,21 @@ export default async function initCloudrainSow() {
<span>${i18n['no_file_to_download']}</span>
</div>`);
}
$('#retry_result').on('click', function () {
$.post(retryPath, (data) => {
if (data.code === 0) {
$('.load-model-file').trigger('click');
}
}).fail(function (err) {
console.log(err);
});
})
}).fail(function (err) {
console.log(err, version_name);
});
});

function renderSize(value) {
if (null == value || value == "") {
return "0 Bytes";
@@ -678,6 +705,7 @@ export default async function initCloudrainSow() {
return size + unitArr[index];
}
function renderBrend(
that,
path,
version_name,
parents,
@@ -711,15 +739,9 @@ export default async function initCloudrainSow() {
} else {
$(`input[name=model${version_name}]`).val(parents);
$(`input[name=modelback${version_name}]`).val(filename);

let selectEle = $(`#file_breadcrumb${version_name} a.section`).filter(
(index, item) => {
return item.text == filename;
}
);
selectEle.nextAll().remove();
selectEle.after("<div class='divider'> / </div>");
selectEle.replaceWith(`<div class='active section'>${filename}</div>`);
$(that).nextAll().remove();
$(that).after("<div class='divider'> / </div>");
$(that).replaceWith(`<div class='active section'>${filename}</div>`);
}
}



+ 4
- 2
web_src/js/features/i18nVue.js View File

@@ -74,7 +74,8 @@ export const i18nVue = {
file_sync_wait:"文件等待同步中,请稍侯",
file_sync_fail:"文件同步失败",
no_file_to_download:"没有文件可以下载,稍后再来看看",
task_not_finished:"任务还未结束,稍后再来看看",
task_not_finished: "任务还未结束,稍后再来看看",
retrieve_results: "重新获取结果",
local:"本地",
online:"线上",
modify:"修改",
@@ -215,7 +216,8 @@ export const i18nVue = {
file_sync_ing:"File synchronization in waitting, please wait",
file_sync_fail:"File synchronization failed",
no_file_to_download:"No files can be downloaded",
task_not_finished:"Task not finished yet, please wait",
task_not_finished: "Task not finished yet, please wait",
retrieve_results: "Retrieve results",
local:"Local",
online:"Online",
modify:"Modify",


Loading…
Cancel
Save