diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 8f2e6dd5ea..0591ab568e 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -1672,6 +1672,19 @@ type GrampusStopJobResponse struct { Status string `json:"status"` } +type GrampusModelMigrateInfoResponse struct { + GrampusResult + DestBucket string `json:"destBucket"` + DestEndpoint string `json:"destEndpoint"` + DestObjectKey string `json:"destObjectKey"` + DestProxy string `json:"destProxy"` + FailedReason string `json:"failedReason"` + SrcBucket string `json:"srcBucket"` + SrcEndpoint string `json:"srcEndpoint"` + SrcObjectKey string `json:"srcObjectKey"` + Status int `json:"status"` //0:初始化 1:成功 2:失败 3:调度中 +} + type GetGrampusJobEventsResponse struct { GrampusResult JobEvents []GrampusJobEvents `json:"jobEvents"` @@ -1718,12 +1731,13 @@ type GrampusNotebookTask struct { } type GrampusDataset struct { - Name string `json:"name"` - Bucket string `json:"bucket"` - EndPoint string `json:"endPoint"` - ObjectKey string `json:"objectKey"` - ContainerPath string `json:"containerPath"` - ReadOnly bool `json:"readOnly"` + Name string `json:"name"` + Bucket string `json:"bucket"` + EndPoint string `json:"endPoint"` + ObjectKey string `json:"objectKey"` + ContainerPath string `json:"containerPath"` + ReadOnly bool `json:"readOnly"` + GetBackEndpoint string `json:"getBackEndpoint"` } type CreateGrampusJobRequest struct { diff --git a/models/model_migrate_record.go b/models/model_migrate_record.go new file mode 100644 index 0000000000..4ad388fda0 --- /dev/null +++ b/models/model_migrate_record.go @@ -0,0 +1,215 @@ +package models + +import ( + "code.gitea.io/gitea/modules/log" + "errors" + "time" + "xorm.io/builder" + + "code.gitea.io/gitea/modules/timeutil" +) + +type GrampusMigrateResponse int + +const ( + GrampusMigrateResponseMigrateInit GrampusMigrateResponse = 0 + GrampusMigrateResponseSuccess GrampusMigrateResponse = 1 + GrampusMigrateResponseFailed GrampusMigrateResponse = 2 + GrampusMigrateResponseMigrating GrampusMigrateResponse = 3 + GrampusMigrateResponseNoNeedMigrate GrampusMigrateResponse = 4 +) + +func (r GrampusMigrateResponse) ConvertToModelMigrateStep() ModelMigrateStep { + switch r { + case GrampusMigrateResponseMigrateInit: + return GrampusMigrateInit + case GrampusMigrateResponseSuccess: + return GrampusMigrateSuccess + case GrampusMigrateResponseFailed: + return GrampusMigrateFailed + case GrampusMigrateResponseMigrating: + return GrampusMigrating + case GrampusMigrateResponseNoNeedMigrate: + return GrampusMigrateNoNeed + } + return -1 +} + +type ModelMigrateStep int + +const ( + GrampusMigrateInit ModelMigrateStep = 0 + GrampusMigrating ModelMigrateStep = 1 + GrampusMigrateSuccess ModelMigrateStep = 2 + GrampusMigrateFailed ModelMigrateStep = 3 + GrampusMigrateNoNeed ModelMigrateStep = 4 + BucketMoving ModelMigrateStep = 10 + BucketMoveSuccess ModelMigrateStep = 11 + BucketMoveFailed ModelMigrateStep = 12 +) + +func (m ModelMigrateStep) GetStatus() ModelMigrateStatus { + switch m { + case BucketMoveSuccess, GrampusMigrateNoNeed: + return ModelMigrateSuccess + case GrampusMigrateFailed, BucketMoveFailed: + return ModelMigrateFailed + case GrampusMigrateInit: + return ModelMigrateWaiting + case GrampusMigrateSuccess, GrampusMigrating, BucketMoving: + return ModelMigrating + } + return -1 +} + +type ModelMigrateStatus int + +const ( + ModelMigrateSuccess ModelMigrateStatus = 0 + ModelMigrating ModelMigrateStatus = 1 + ModelMigrateFailed ModelMigrateStatus = 2 + ModelMigrateWaiting ModelMigrateStatus = 3 +) + +var UnFinishedMigrateSteps = []ModelMigrateStep{GrampusMigrateInit, GrampusMigrating, GrampusMigrateSuccess, BucketMoving} + +type ModelMigrateRecord struct { + ID int64 `xorm:"pk autoincr"` + CloudbrainID int64 `xorm:"INDEX NOT NULL unique"` + DestBucket string + DestEndpoint string + DestObjectKey string + DestProxy string + SrcBucket string + SrcEndpoint string + SrcObjectKey string + Status ModelMigrateStatus `xorm:"NOT NULL DEFAULT 3"` + CurrentStep ModelMigrateStep `xorm:"NOT NULL DEFAULT 0"` + RetryCount int + CreatedUnix timeutil.TimeStamp `xorm:"created"` + UpdatedUnix timeutil.TimeStamp `xorm:"updated"` + DeletedAt time.Time `xorm:"deleted"` + Remark string +} + +func (r *ModelMigrateRecord) IsFinished() bool { + for _, s := range UnFinishedMigrateSteps { + if s == r.CurrentStep { + return false + } + } + return true +} + +func updateModelMigrateRecordCols(e Engine, record *ModelMigrateRecord, cols ...string) error { + _, err := e.ID(record.ID).Cols(cols...).Update(record) + return err +} + +func UpdateModelMigrateRecordCols(record *ModelMigrateRecord, cols ...string) error { + return updateModelMigrateRecordCols(x, record, cols...) +} +func IncreaseModelMigrateRetryCount(recordId int64) error { + _, err := x.ID(recordId).Incr("retry_count", 1).Update(&ModelMigrateRecord{}) + return err +} + +func UpdateModelMigrateStatusByStep(record *ModelMigrateRecord, newStep ModelMigrateStep) error { + status := newStep.GetStatus() + if status < 0 { + log.Error("Step format error.id = %d,newStep = %d", record.ID, newStep) + return errors.New("Step format error") + } + record.Status = status + record.CurrentStep = newStep + //正常情况下状态只能向更大的状态更新 + n, err := x.Where(builder.NewCond().And(builder.Eq{"id": record.ID}). + And(builder.Lt{"current_step": newStep})). + Cols("status", "current_step"). + Update(record) + if err != nil { + log.Error("UpdateModelMigrateStatusByStep err.%v", err) + return err + } + + if n == 0 { + log.Error("UpdateModelMigrateStatusByStep total num is 0.r.ID=%d", record.ID) + return errors.New("current_step not valid") + } + + return nil +} + +func RollBackMigrateStatus(record *ModelMigrateRecord, newStep ModelMigrateStep) error { + status := newStep.GetStatus() + if status < 0 { + log.Error("Step format error.id = %d,newStep = %d", record.ID, newStep) + return errors.New("Step format error") + } + record.Status = status + record.CurrentStep = newStep + _, err := x.ID(record.ID). + Cols("status", "current_step"). + Update(record) + if err != nil { + log.Error("RollBackMigrateStatus err.%v", err) + return err + } + + return nil +} + +func UpdateModelMigrateRecordByStep(record *ModelMigrateRecord) error { + n, err := x. + Where(builder.NewCond().And(builder.Eq{"id": record.ID})). + Update(record) + if err != nil { + log.Error("UpdateModelMigrateRecordByStep err. ID=%d err=%v", record.ID, err) + return err + } + if n == 0 { + log.Error("UpdateModelMigrateRecordByStep total num is 0.r.ID=%d", record.ID) + return errors.New("current_step not valid") + } + return nil +} + +func GetUnfinishedModelMigrateRecords() ([]*ModelMigrateRecord, error) { + records := make([]*ModelMigrateRecord, 0, 10) + return records, x. + Where(builder.NewCond().And(builder.In("current_step", UnFinishedMigrateSteps))). + Limit(100). + Find(&records) +} + +func InsertModelMigrateRecord(record *ModelMigrateRecord) (_ *ModelMigrateRecord, err error) { + + if _, err := x.Insert(record); err != nil { + return nil, err + } + + return record, nil +} + +func GetModelMigrateRecordByCloudbrainId(cloudbrainId int64) (*ModelMigrateRecord, error) { + r := &ModelMigrateRecord{} + if has, err := x.Where("cloudbrain_id = ?", cloudbrainId).Get(r); err != nil { + log.Error("GetModelMigrateRecordByCloudbrainId err. %v", err) + return nil, err + } else if !has { + return nil, ErrRecordNotExist{} + } + return r, nil + +} +func GetModelMigrateRecordById(id int64) (*ModelMigrateRecord, error) { + r := &ModelMigrateRecord{} + if has, err := x.ID(id).Get(r); err != nil { + log.Error("GetModelMigrateRecordByCloudbrainId err. %v", err) + return nil, err + } else if !has { + return nil, ErrRecordNotExist{} + } + return r, nil + +} diff --git a/models/models.go b/models/models.go index 13626868b8..eea3bd1833 100755 --- a/models/models.go +++ b/models/models.go @@ -170,6 +170,7 @@ func init() { new(TechConvergeBaseInfo), new(RepoConvergeInfo), new(UserRole), + new(ModelMigrateRecord), ) tablesStatistic = append(tablesStatistic, diff --git a/modules/cron/tasks_basic.go b/modules/cron/tasks_basic.go index 28f6698e30..9c06d9931a 100755 --- a/modules/cron/tasks_basic.go +++ b/modules/cron/tasks_basic.go @@ -5,6 +5,7 @@ package cron import ( + "code.gitea.io/gitea/services/ai_task_service/schedule" "context" "time" @@ -249,6 +250,17 @@ func registerHandleScheduleRecord() { }) } +func registerHandleModelMigrateRecord() { + RegisterTaskFatal("handle_model_migrate_record", &BaseConfig{ + Enabled: true, + RunAtStart: false, + Schedule: "@every 1m", + }, func(ctx context.Context, _ *models.User, _ Config) error { + schedule.HandleUnfinishedMigrateRecords() + return nil + }) +} + func registerRewardPeriodTask() { RegisterTaskFatal("reward_period_task", &BaseConfig{ Enabled: true, @@ -335,4 +347,6 @@ func initBasicTasks() { registerHandleScheduleRecord() registerHandleCloudbrainDurationStatistic() + + registerHandleModelMigrateRecord() } diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 06d572f349..d0da396608 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -349,6 +349,9 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str EndPoint: getEndPoint(), ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip", } + outputGrampus = models.GrampusDataset{ + GetBackEndpoint: getEndPoint(), + } } else if ProcessorTypeGPU == req.ProcessType { datasetGrampus = getDatasetGPUGrampus(req.DatasetInfos, "/tmp/dataset") if len(req.ModelName) != 0 { @@ -372,7 +375,8 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str ContainerPath: "/tmp/code/" + cloudbrain.DefaultBranchName + ".zip", } outputGrampus = models.GrampusDataset{ - ContainerPath: "/tmp/output", + ContainerPath: "/tmp/output", + GetBackEndpoint: setting.Attachment.Minio.Endpoint, } } else if ProcessorTypeGCU == req.ProcessType { @@ -398,9 +402,9 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str ContainerPath: "/tmp/code/" + cloudbrain.DefaultBranchName + ".zip", } outputGrampus = models.GrampusDataset{ - ContainerPath: "/tmp/output", + ContainerPath: "/tmp/output", + GetBackEndpoint: setting.Attachment.Minio.Endpoint, } - } jobResult, err := createJob(models.CreateGrampusJobRequest{ diff --git a/modules/grampus/resty.go b/modules/grampus/resty.go index 0315fd9d38..9d76e27649 100755 --- a/modules/grampus/resty.go +++ b/modules/grampus/resty.go @@ -393,6 +393,72 @@ sendjob: return &result, nil } +func PostModelMigrate(jobID string) (*models.GrampusModelMigrateInfoResponse, error) { + checkSetting() + client := getRestyClient() + var result models.GrampusModelMigrateInfoResponse + + retry := 0 + +sendjob: + res, err := client.R(). + //SetHeader("Content-Type", "application/json"). + SetAuthToken(TOKEN). + SetResult(&result). + Post(HOST + urlTrainJob + "/" + jobID + "/modelMigrate") + + if err != nil { + return &result, fmt.Errorf("resty ModelMigrate: %v", err) + } + log.Info("call modelMigrate res=%+v", res) + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("ModelMigrate failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} + +func ModelMigrateInfo(jobID string) (*models.GrampusModelMigrateInfoResponse, error) { + checkSetting() + client := getRestyClient() + var result models.GrampusModelMigrateInfoResponse + + retry := 0 + +sendjob: + res, err := client.R(). + //SetHeader("Content-Type", "application/json"). + SetAuthToken(TOKEN). + SetResult(&result). + Get(HOST + urlTrainJob + "/" + jobID + "/modelMigrateInfo") + + if err != nil { + return &result, fmt.Errorf("resty ModelMigrateInfo: %v", err) + } + log.Info("call modelMigrateInfo res=%+v", res) + if result.ErrorCode == errorIllegalToken && retry < 1 { + retry++ + log.Info("retry get token") + _ = getToken() + goto sendjob + } + + if result.ErrorCode != 0 { + log.Error("ModelMigrateInfo failed(%d): %s", result.ErrorCode, result.ErrorMsg) + return &result, fmt.Errorf("GetJob failed(%d): %s", result.ErrorCode, result.ErrorMsg) + } + + return &result, nil +} + func GetAiCenters(pageIndex, pageSize int) (*models.GetGrampusAiCentersResult, error) { checkSetting() client := getRestyClient() diff --git a/modules/notification/model_schedule/schedule.go b/modules/notification/model_schedule/schedule.go new file mode 100644 index 0000000000..c214b1087f --- /dev/null +++ b/modules/notification/model_schedule/schedule.go @@ -0,0 +1,41 @@ +package model_schedule + +import ( + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/notification/base" +) + +type scheduleNotifier struct { + base.NullNotifier +} + +var ( + _ base.Notifier = &scheduleNotifier{} +) + +// NewNotifier create a new wechatNotifier notifier +func NewNotifier() base.Notifier { + return &scheduleNotifier{} +} + +func (*scheduleNotifier) NotifyChangeCloudbrainStatus(cloudbrain *models.Cloudbrain, oldStatus string) { + if !cloudbrain.IsTerminal() { + return + } + log.Info("try to InsertModelMigrateRecord.cloudbrainId=%d oldStatus=%s newStatus=%d", cloudbrain.ID, oldStatus, cloudbrain.Status) + switch cloudbrain.Type { + case models.TypeC2Net: + if cloudbrain.JobType == string(models.JobTypeDebug) { + return + } + _, err := models.InsertModelMigrateRecord(&models.ModelMigrateRecord{ + CloudbrainID: cloudbrain.ID, + Status: models.ModelMigrating, + CurrentStep: models.GrampusMigrating, + }) + if err != nil { + log.Error("InsertModelMigrateRecord err.cloudbrain.id=%d err=%v", cloudbrain.ID, err) + } + } +} diff --git a/modules/notification/notification.go b/modules/notification/notification.go index 5742afde76..df468c6cdf 100644 --- a/modules/notification/notification.go +++ b/modules/notification/notification.go @@ -11,6 +11,7 @@ import ( "code.gitea.io/gitea/modules/notification/base" "code.gitea.io/gitea/modules/notification/indexer" "code.gitea.io/gitea/modules/notification/mail" + "code.gitea.io/gitea/modules/notification/model_schedule" "code.gitea.io/gitea/modules/notification/reward" "code.gitea.io/gitea/modules/notification/ui" "code.gitea.io/gitea/modules/notification/webhook" @@ -41,6 +42,7 @@ func NewContext() { RegisterNotifier(action.NewNotifier()) RegisterNotifier(wechatNotifier.NewNotifier()) RegisterNotifier(reward.NewNotifier()) + RegisterNotifier(model_schedule.NewNotifier()) } // NotifyUploadAttachment notifies attachment upload message to notifiers diff --git a/modules/redis/redis_key/model_schedule.go b/modules/redis/redis_key/model_schedule.go new file mode 100644 index 0000000000..d7cf7c9c97 --- /dev/null +++ b/modules/redis/redis_key/model_schedule.go @@ -0,0 +1,9 @@ +package redis_key + +import "fmt" + +const MODEL_SCHEDULE_PREFIX = "model_schedule" + +func RecordHandleLock(jobId string) string { + return KeyJoin(MODEL_SCHEDULE_PREFIX, fmt.Sprint(jobId), "handle") +} diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 3a688e2a21..1310d5088e 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -676,6 +676,9 @@ var ( DeductTaskRangeForFirst time.Duration DeductTaskMinTimestamp int64 + //model-migrate config + UseLocalMinioMigrate bool + //badge config BadgeIconMaxFileSize int64 BadgeIconMaxWidth int @@ -1677,6 +1680,9 @@ func NewContext() { DeductTaskRangeForFirst = sec.Key("DEDUCT_TASK_RANGE_FOR_FIRST").MustDuration(3 * time.Hour) DeductTaskMinTimestamp = sec.Key("DEDUCT_TASK_MIN_TIMESTAMP").MustInt64(0) + sec = Cfg.Section("model-migrate") + UseLocalMinioMigrate = sec.Key("USE_LOCAL_MINIO_MIGRATE").MustBool(false) + sec = Cfg.Section("icons") BadgeIconMaxFileSize = sec.Key("BADGE_ICON_MAX_FILE_SIZE").MustInt64(1048576) BadgeIconMaxWidth = sec.Key("BADGE_ICON_MAX_WIDTH").MustInt(4096) diff --git a/modules/storage/obs.go b/modules/storage/obs.go index 68eb0f3ea7..f5764cd777 100755 --- a/modules/storage/obs.go +++ b/modules/storage/obs.go @@ -418,6 +418,53 @@ func GetOneLevelAllObjectUnderDir(bucket string, prefixRootPath string, relative return fileInfos, nil } +func GetOneLevelObjectsUnderDir(bucket string, prefixRootPath string, relativePath string) ([]FileInfo, error) { + input := &obs.ListObjectsInput{} + input.Bucket = bucket + input.Prefix = prefixRootPath + relativePath + input.Delimiter = "/" + if !strings.HasSuffix(input.Prefix, "/") { + input.Prefix += "/" + } + fileInfos := make([]FileInfo, 0) + prefixLen := len(input.Prefix) + index := 1 + output, err := ObsCli.ListObjects(input) + if err != nil { + if obsError, ok := err.(obs.ObsError); ok { + log.Error("Code:%s, Message:%s", obsError.Code, obsError.Message) + } + return nil, err + } + log.Info("Page:%d\n", index) + index++ + for _, val := range output.Contents { + var fileName string + if val.Key == input.Prefix { + continue + } + fileName = val.Key[prefixLen:] + fileInfo := FileInfo{ + ModTime: val.LastModified.Local().Format("2006-01-02 15:04:05"), + FileName: fileName, + Size: val.Size, + IsDir: false, + ParenDir: relativePath, + } + fileInfos = append(fileInfos, fileInfo) + } + for _, val := range output.CommonPrefixes { + fileName := strings.TrimSuffix(strings.TrimPrefix(val, input.Prefix), "/") + fileInfo := FileInfo{ + FileName: fileName, + IsDir: true, + ParenDir: strings.TrimPrefix(val, prefixRootPath), + } + fileInfos = append(fileInfos, fileInfo) + } + return fileInfos, nil +} + func GetAllObjectByBucketAndPrefix(bucket string, prefix string) ([]FileInfo, error) { input := &obs.ListObjectsInput{} input.Bucket = bucket diff --git a/modules/urfs_client/urchin/schedule.go b/modules/urfs_client/urchin/schedule.go index cddc06b91a..62db9133b8 100755 --- a/modules/urfs_client/urchin/schedule.go +++ b/modules/urfs_client/urchin/schedule.go @@ -1,18 +1,15 @@ package urchin import ( - "encoding/json" - "fmt" - "strings" - "time" - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/labelmsg" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/storage" + "encoding/json" + "fmt" "github.com/minio/minio-go" + "strings" ) type DecompressReq struct { @@ -30,253 +27,6 @@ func getUrfsClient() { urfsClient = New() } -func GetAITaskOutPutBack(cloudbrainID int64, jobName, centerId, computerResource string) error { - switch computerResource { - case models.NPUResource: - return GetNPUDataBack(cloudbrainID, jobName, centerId) - case models.GPUResource: - return GetGPUDataBack(cloudbrainID, jobName, centerId) - case models.GCUResource: - return GetGCUDataBack(cloudbrainID, jobName, centerId) - } - return nil -} - -func GetGPUDataBack(cloudbrainID int64, jobName, centerId string) error { - endpoint := grampus.GetRemoteEndPoint(centerId) - bucket := grampus.BucketRemote - objectKey := grampus.GetGPUModelObjectKey4Grampus(jobName) - destPeerHost := grampus.GetCenterProxy(setting.Grampus.GPULocalCenterID) - getUrfsClient() - - var res *PeerResult - var err error - var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute} - for i, retryInterval := range retryIntervalList { - res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) - if err == nil { - log.Info("ScheduleDirToPeerByKey res=%v", res) - break - } - log.Error("ScheduleDataToPeerByKey failed:%v, ObjectKey is:%s,retry in %v", err, objectKey, retryInterval) - time.Sleep(retryInterval) - // If it's the last retry, break - if i == len(retryIntervalList)-1 { - res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) - } - } - - // If err is still not nil after retrying, insert a default value - if err != nil { - log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v", - endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err) - _, err = models.InsertScheduleRecord(&models.ScheduleRecord{ - CloudbrainID: cloudbrainID, - EndPoint: endpoint, - Bucket: bucket, - ObjectKey: objectKey, - ProxyServer: destPeerHost, - Status: models.StorageUrchinScheduleFailed, - IsDir: true, - ComputeSource: models.GPUResource, - TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), - Remark: interceptErrorMessages(err), - LocalOperateStatus: models.MoveBucketWaiting, - }) - if err != nil { - log.Error("InsertScheduleRecord failed:%v", err) - return err - } - return fmt.Errorf("GetBackModel failed after retrying:%v", err) - } - - _, err = models.InsertScheduleRecord(&models.ScheduleRecord{ - CloudbrainID: cloudbrainID, - EndPoint: endpoint, - Bucket: bucket, - ObjectKey: objectKey, - ProxyServer: destPeerHost, - Status: res.StatusCode, - IsDir: true, - ComputeSource: models.GPUResource, - TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), - LocalOperateStatus: models.MoveBucketWaiting, - }) - if err != nil { - log.Error("InsertScheduleRecord failed:%v", err) - return err - } - - r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID) - if err != nil { - log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err) - return err - } - - err = handleScheduleResult(r, res) - if err != nil { - log.Error("GetGPUDataBack handleScheduleResult err.%v", err) - return err - } - return nil -} - -func GetGCUDataBack(cloudbrainID int64, jobName, centerId string) error { - endpoint := grampus.GetRemoteEndPoint(centerId) - bucket := grampus.BucketRemote - objectKey := grampus.GetGPUModelObjectKey4Grampus(jobName) - destPeerHost := grampus.GetCenterProxy(setting.Grampus.GPULocalCenterID) - - getUrfsClient() - - var res *PeerResult - var err error - var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute} - for i, retryInterval := range retryIntervalList { - res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) - if err == nil { - break - } - log.Error("ScheduleDataToPeerByKey failed:%v, retry in %v", err, retryInterval) - time.Sleep(retryInterval) - // If it's the last retry, break - if i == len(retryIntervalList)-1 { - res, err = urfsClient.ScheduleDirToPeerByKey(endpoint, bucket, objectKey, destPeerHost) - } - } - - // If err is still not nil after retrying, insert a default value - if err != nil { - log.Error("ScheduleDataToPeerByKey failed info is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,TargetObjectKey:%s,error:%v", - endpoint, bucket, objectKey, destPeerHost, grampus.GetGPUModelObjectKey(jobName), err) - _, err = models.InsertScheduleRecord(&models.ScheduleRecord{ - CloudbrainID: cloudbrainID, - EndPoint: endpoint, - Bucket: bucket, - ObjectKey: objectKey, - ProxyServer: destPeerHost, - Status: models.StorageUrchinScheduleFailed, - IsDir: true, - ComputeSource: models.GCUResource, - TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), - LocalOperateStatus: models.MoveBucketWaiting, - }) - if err != nil { - log.Error("InsertScheduleRecord failed:%v", err) - return err - } - return fmt.Errorf("GetBackModel failed after retrying:%v", err) - } - - _, err = models.InsertScheduleRecord(&models.ScheduleRecord{ - CloudbrainID: cloudbrainID, - EndPoint: endpoint, - Bucket: bucket, - ObjectKey: objectKey, - ProxyServer: destPeerHost, - Status: res.StatusCode, - IsDir: true, - ComputeSource: models.GCUResource, - TargetObjectKey: grampus.GetGPUModelObjectKey(jobName), - LocalOperateStatus: models.MoveBucketWaiting, - }) - if err != nil { - log.Error("InsertScheduleRecord failed:%v", err) - return err - } - - r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID) - if err != nil { - log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err) - return err - } - - err = handleScheduleResult(r, res) - if err != nil { - log.Error("GetGCUDataBack handleScheduleResult err.%v", err) - return err - } - - return nil -} - -func GetNPUDataBack(cloudbrainID int64, jobName, centerId string) error { - endpoint := grampus.GetRemoteEndPoint(centerId) - bucket := grampus.BucketRemote - objectKey := grampus.GetNpuModelObjectKey(jobName) - destPeerHost := grampus.GetCenterProxy(setting.Grampus.LocalCenterID) - - getUrfsClient() - var res *PeerResult - var err error - var retryIntervalList = []time.Duration{1 * time.Minute, 1 * time.Minute, 3 * time.Minute} - for i, retryInterval := range retryIntervalList { - res, err = urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost) - if err == nil { - break - } - log.Error("ScheduleDataToPeerByKey failed:%v, ObjectKey is:%s,retry in %v", err, objectKey, retryInterval) - time.Sleep(retryInterval) - // If it's the last retry, break - if i == len(retryIntervalList)-1 { - res, err = urfsClient.ScheduleDataToPeerByKey(endpoint, bucket, objectKey, destPeerHost) - } - } - - // If err is still not nil after retrying, insert a default value - if err != nil { - log.Error("ScheduleDataToPeerByKey failed after retrying, errorInfo is EndPoint:%s,Bucket:%s,ObjectKey:%s,ProxyServer:%s,error:%v", - endpoint, bucket, objectKey, destPeerHost, err) - _, err = models.InsertScheduleRecord(&models.ScheduleRecord{ - CloudbrainID: cloudbrainID, - EndPoint: endpoint, - Bucket: bucket, - ObjectKey: objectKey, - ProxyServer: destPeerHost, - Status: models.StorageUrchinScheduleFailed, - IsDir: false, - ComputeSource: models.NPUResource, - Remark: interceptErrorMessages(err), - LocalOperateStatus: models.MoveBucketWaiting, - }) - if err != nil { - log.Error("InsertScheduleRecord failed:%v", err) - return err - } - return fmt.Errorf("GetBackModel failed after retrying:%v", err) - } - - _, err = models.InsertScheduleRecord(&models.ScheduleRecord{ - CloudbrainID: cloudbrainID, - EndPoint: endpoint, - Bucket: bucket, - ObjectKey: objectKey, - ProxyServer: destPeerHost, - Status: res.StatusCode, - IsDir: false, - ComputeSource: models.NPUResource, - LocalOperateStatus: models.MoveBucketWaiting, - }) - if err != nil { - log.Error("InsertScheduleRecord failed:%v", err) - return err - } - - r, err := models.GetScheduleRecordByCloudbrainID(cloudbrainID) - if err != nil { - log.Error("GetScheduleRecordByCloudbrainID err.cloudbrainID=%d err=%v", cloudbrainID, err) - return err - } - - err = handleScheduleResult(r, res) - if err != nil { - log.Error("GetNPUDataBack handleScheduleResult err.%v", err) - return err - } - - return nil -} - func tryScheduleDir(endpoint, bucket, objectKey, dstPeer string) { println("new request dstPeer: ", dstPeer) urfs := New() diff --git a/modules/util/string.go b/modules/util/string.go new file mode 100644 index 0000000000..e1239195d5 --- /dev/null +++ b/modules/util/string.go @@ -0,0 +1,11 @@ +package util + +func TruncateString(msg string, maxLength int) string { + if msg == "" { + return "" + } + if len(msg) < maxLength { + maxLength = len(msg) + } + return msg[0:maxLength] +} diff --git a/routers/api/v1/api.go b/routers/api/v1/api.go index 0615e28d80..ed63133aca 100755 --- a/routers/api/v1/api.go +++ b/routers/api/v1/api.go @@ -1053,6 +1053,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Put("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GeneralCloudBrainJobStop) m.Group("/model", func() { m.Get("/schedule_status", repo.GetModelScheduleStatus) + m.Post("/reschedule", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.RetryModelSchedule) }) }) }) diff --git a/routers/api/v1/repo/modelarts.go b/routers/api/v1/repo/modelarts.go index 7749aa76fc..ccb01ba674 100755 --- a/routers/api/v1/repo/modelarts.go +++ b/routers/api/v1/repo/modelarts.go @@ -6,6 +6,7 @@ package repo import ( + "code.gitea.io/gitea/services/ai_task_service/schedule" "encoding/json" "net/http" "path" @@ -20,8 +21,6 @@ import ( "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" - "code.gitea.io/gitea/modules/urfs_client/urchin" - "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/grampus" @@ -169,9 +168,6 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { } if oldStatus != job.Status { notification.NotifyChangeCloudbrainStatus(job, oldStatus) - if models.IsTrainJobTerminal(job.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { - go urchin.GetAITaskOutPutBack(job.ID, job.JobName, result.JobInfo.Tasks[0].CenterID[0], job.ComputeResource) - } } err = models.UpdateTrainJobVersion(job) if err != nil { @@ -191,7 +187,7 @@ func GetModelArtsTrainJobVersion(ctx *context.APIContext) { func GetModelScheduleStatus(ctx *context.APIContext) { jobID := ctx.Params(":jobid") - status, err := cloudbrainTask.GetModelScheduleStatus(jobID) + status, err := schedule.GetModelScheduleStatus(jobID) if err != nil { ctx.JSON(http.StatusOK, response.OuterResponseError(err)) return @@ -200,6 +196,16 @@ func GetModelScheduleStatus(ctx *context.APIContext) { ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m)) } +func RetryModelSchedule(ctx *context.APIContext) { + jobID := ctx.Params(":jobid") + err := schedule.RetryModelMigrate(jobID) + if err != nil { + ctx.JSON(http.StatusOK, response.OuterResponseError(err)) + return + } + ctx.JSON(http.StatusOK, response.OuterSuccess()) +} + func TrainJobForModelConvertGetLog(ctx *context.APIContext) { var ( err error @@ -466,14 +472,41 @@ func ModelList(ctx *context.APIContext) { return } - status := models.ModelScheduleSucceed + status := models.ModelMigrateSuccess + + if task.Type == models.TypeC2Net { + if !task.IsTerminal() { + log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobID) + status = models.JobNoTeminal + } else { + status, err = schedule.GetModelScheduleStatus(task.JobID) + if err != nil { + log.Error("GetModelScheduleStatus(%s) failed:%v", task.JobName, err.Error()) + return + } + } + } + + if status != models.ModelMigrateSuccess { + ctx.JSON(http.StatusOK, map[string]interface{}{ + "JobID": jobID, + "VersionName": versionName, + "StatusOK": status, + "Path": dirArray, + "Dirs": []storage.FileInfo{}, + "task": task, + "PageIsCloudBrain": true, + }) + return + } + var fileInfos []storage.FileInfo if task.ComputeResource == models.NPUResource { prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, task.JobName, setting.OutPutPath, versionName), "/") if !strings.HasSuffix(prefix, "/") { prefix += "/" } - fileInfos, err = storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, parentDir) + fileInfos, err = storage.GetOneLevelObjectsUnderDir(setting.Bucket, prefix, parentDir) if err != nil { log.Info("get TrainJobListModel failed:", err) ctx.ServerError("GetObsListObject:", err) @@ -504,19 +537,6 @@ func ModelList(ctx *context.APIContext) { }) } - if task.Type == models.TypeC2Net { - if !task.IsTerminal() { - log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobID) - status = models.JobNoTeminal - } else { - status, err = cloudbrainTask.GetModelScheduleStatus(task.JobID) - if err != nil { - log.Error("GetModelScheduleStatus(%s) failed:%v", task.JobName, err.Error()) - return - } - } - } - ctx.JSON(http.StatusOK, map[string]interface{}{ "JobID": jobID, "VersionName": versionName, diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index af897107fa..d2464827d4 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -21,8 +21,6 @@ import ( cloudbrainService "code.gitea.io/gitea/services/cloudbrain" - "code.gitea.io/gitea/modules/urfs_client/urchin" - "code.gitea.io/gitea/modules/dataset" "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" @@ -1953,9 +1951,6 @@ func SyncCloudbrainStatus() { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) - if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { - go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource) - } } err = models.UpdateJob(task) if err != nil { diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index 4a071bd37b..b92dc1a1e6 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -16,7 +16,6 @@ import ( "code.gitea.io/gitea/services/lock" - "code.gitea.io/gitea/modules/urfs_client/urchin" "code.gitea.io/gitea/routers/response" "code.gitea.io/gitea/services/cloudbrain/cloudbrainTask" @@ -1379,11 +1378,6 @@ func GrampusNotebookShow(ctx *context.Context) { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) - if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource { - if len(result.JobInfo.Tasks[0].CenterID) == 1 { - urchin.GetNPUDataBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0]) - } - } } } err = models.UpdateJob(task) @@ -1535,9 +1529,6 @@ func GrampusTrainJobShow(ctx *context.Context) { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) - if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { - go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource) - } } } err = models.UpdateJob(task) @@ -1576,6 +1567,7 @@ func GrampusTrainJobShow(ctx *context.Context) { ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) ctx.Data["displayJobName"] = task.DisplayJobName + ctx.Data["canReschedule"] = cloudbrain.CanDeleteJob(ctx, task) ctx.Data["ai_center"] = cloudbrainService.GetAiCenterShow(task.AiCenter, ctx) diff --git a/services/ai_task_service/schedule/model_schedule.go b/services/ai_task_service/schedule/model_schedule.go new file mode 100644 index 0000000000..b92a740099 --- /dev/null +++ b/services/ai_task_service/schedule/model_schedule.go @@ -0,0 +1,374 @@ +package schedule + +import ( + "bytes" + "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/grampus" + "code.gitea.io/gitea/modules/labelmsg" + "code.gitea.io/gitea/modules/log" + "code.gitea.io/gitea/modules/redis/redis_key" + "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/setting" + "code.gitea.io/gitea/modules/storage" + "code.gitea.io/gitea/modules/util" + "encoding/json" + "errors" + "fmt" + "github.com/minio/minio-go" + "os/exec" + "path" + "strings" + "time" +) + +const NPUModelDefaultName = "models.zip" + +func GetModelScheduleStatus(jobId string) (models.ModelMigrateStatus, error) { + job, err := models.GetCloudbrainByJobID(jobId) + if err != nil { + log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) + return 0, errors.New("jobId not correct") + } + if !job.IsTerminal() { + log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId) + return models.ModelMigrateWaiting, nil + } + + record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID) + if err != nil { + log.Error("GetModelScheduleStatus GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err) + if models.IsErrRecordNotExist(err) { + return models.ModelMigrateSuccess, nil + } + return models.ModelMigrateFailed, err + } + + if !record.IsFinished() { + go HandleUnfinishedMigrateRecord(record) + } + + return record.Status, nil +} + +func RetryModelMigrate(jobId string) error { + job, err := models.GetCloudbrainByJobID(jobId) + if err != nil { + log.Error("RetryModelMigrate GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) + return errors.New("jobId not correct") + } + if !job.IsTerminal() { + log.Info("RetryModelMigrate job is not terminal.jobId=%s", jobId) + return errors.New("task is not terminal") + } + + //避免并发问题,先尝试获取锁,获取锁以后再查最新的记录 + lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(jobId)) + success, err := lock.LockWithWait(10*time.Second, 10*time.Second) + if err != nil { + log.Error("HandleUnfinishedMigrateRecord lock err.jobId=%d %v", jobId, err) + return err + } + if !success { + log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", jobId) + return nil + } + defer lock.UnLock() + + record, err := models.GetModelMigrateRecordByCloudbrainId(job.ID) + if err != nil { + log.Error("RetryModelMigrate GetModelMigrateRecordByCloudbrainId err.jobId=%s err=%v", jobId, err) + if models.IsErrRecordNotExist(err) { + return nil + } + return err + } + + //只有两种情况可以再次调度,一是虎鲸调度失败 二是本地移桶失败 + if record.CurrentStep == models.GrampusMigrateFailed { + log.Info("retry PostModelMigrate. record.id = %d", record.ID) + _, err := grampus.PostModelMigrate(jobId) + if err != nil { + log.Error("PostModelMigrate err.%v", err) + return err + } + models.IncreaseModelMigrateRetryCount(record.ID) + if err := models.RollBackMigrateStatus(record, models.GrampusMigrating); err != nil { + log.Error("UpdateModelMigrateStatusByStep err.%v", err) + return err + } + return nil + } + + if record.CurrentStep == models.BucketMoveFailed { + log.Info("retry BucketMove. record.id = %d", record.ID) + if err := models.RollBackMigrateStatus(record, models.GrampusMigrateSuccess); err != nil { + log.Error("UpdateModelMigrateStatusByStep err.%v", err) + return err + } + models.IncreaseModelMigrateRetryCount(record.ID) + return nil + } + + return errors.New("No need to retry,the model migration has been successful or is in the process.") + +} + +func HandleUnfinishedMigrateRecords() { + list, err := models.GetUnfinishedModelMigrateRecords() + if err != nil { + log.Error("GetUnfinishedModelMigrateRecords err=%v", err) + return + } + for _, r := range list { + HandleUnfinishedMigrateRecord(r) + } +} + +func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error { + cloudbrain, err := models.GetCloudbrainByID(fmt.Sprint(r.CloudbrainID)) + if err != nil { + log.Error("GetCloudbrainByID err. %v", err) + return err + } + + lock := redis_lock.NewDistributeLock(redis_key.RecordHandleLock(cloudbrain.JobID)) + success, err := lock.LockWithWait(10*time.Second, 10*time.Second) + if err != nil { + log.Error("HandleUnfinishedMigrateRecord lock err.ID=%d %v", r.ID, err) + return err + } + if !success { + log.Error("HandleUnfinishedMigrateRecord lock failed.ID=%d ", r.ID) + return nil + } + defer lock.UnLock() + + //拿到锁以后重新查询一次 + r, err = models.GetModelMigrateRecordById(r.ID) + if err != nil { + log.Error("RetryModelMigrate GetModelMigrateRecordById err.Id=%s err=%v", r.ID, err) + if models.IsErrRecordNotExist(err) { + return nil + } + return err + } + + if r.CurrentStep == models.GrampusMigrateInit || r.CurrentStep == models.GrampusMigrating { + if err := UpdateModelMigrateStatusFromGrampus(r, cloudbrain.JobID); err != nil { + log.Error("UpdateModelMigrateStatusFromGrampus err. %v", err) + return err + } + } + + if r.CurrentStep == models.GrampusMigrateSuccess { + if err := LocalMigrateOperate(cloudbrain.JobName, cloudbrain.ComputeResource, r); err != nil { + log.Error("LocalMigrateOperate err. %v", err) + return err + } + } + + if r.CurrentStep == models.BucketMoving { + //尝试查询NPU结果目录下是否有文件,有文件则认为已经解压成功 + if cloudbrain.ComputeResource == models.NPUResource && IsNPUModelDirHasFile(cloudbrain.JobName, cloudbrain.VersionName) { + TryToUpdateNPUMoveBucketResult(r, cloudbrain.JobName, cloudbrain.VersionName) + } + } + return nil +} + +func UpdateModelMigrateStatusFromGrampus(r *models.ModelMigrateRecord, jobId string) error { + res, err := grampus.ModelMigrateInfo(jobId) + if err != nil { + log.Error("ModelMigrateInfo err. r.ID=%d %v", r.ID, err) + return err + } + log.Info("grampus ModelMigrateInfo r.ID=%d res=%+v", r.ID, res) + newStep := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep() + if newStep == r.CurrentStep { + log.Info("The status has not changed. r.ID=%d status=%d", r.ID, res.Status) + return nil + } + err = updateModelMigrateFromRes(r, res) + if err != nil { + log.Error("updateModelMigrateFromRes err. r.ID=%d %v", r.ID, err) + return err + } + return nil +} + +func LocalMigrateOperate(jobName, computeSource string, r *models.ModelMigrateRecord) error { + log.Info("Grampus model migrate succeed,objectKey = %s computeSource= %s", r.DestObjectKey, computeSource) + err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoving) + if err != nil { + log.Error("UpdateModelMigrateStatusByStep err. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, err) + return err + } + if computeSource == models.NPUResource { + //因为NPU的输出会被压缩,因此需要解压+移桶 + decompress(r.DestBucket+"/"+r.DestObjectKey, setting.Bucket+"/"+strings.TrimSuffix(r.DestObjectKey, models.ModelSuffix)) + } else { + //因为调度无法指定桶,所以调度成功后我们还需要移桶 + if setting.UseLocalMinioMigrate { + if err := MoveBucketJust4LocalMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { + log.Error("MoveBucketJust4LocalMinio err.%v", err) + if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { + log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) + } + return err + } + } else { + if err := MoveBucketInOpenIMinio(r.DestObjectKey, grampus.GetGPUModelObjectKey(jobName), r.DestBucket, setting.Attachment.Minio.Bucket); err != nil { + log.Error("MoveBucketInOpenIMinio err.%v", err) + if tmpErr := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveFailed); tmpErr != nil { + log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveFailed, tmpErr) + } + return err + } + } + + if err := models.UpdateModelMigrateStatusByStep(r, models.BucketMoveSuccess); err != nil { + log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", r.ID, models.BucketMoveSuccess, err) + } + } + return nil +} + +func TryToUpdateNPUMoveBucketResult(record *models.ModelMigrateRecord, jobName, versionName string) error { + if IsNPUModelDirHasFile(jobName, versionName) { + if err := models.UpdateModelMigrateStatusByStep(record, models.BucketMoveSuccess); err != nil { + log.Error("UpdateModelMigrateStatusByStep error. r.ID=%d step=%d err=%v", record.ID, models.BucketMoveSuccess, err) + return err + } + } + return nil +} + +func updateModelMigrateFromRes(r *models.ModelMigrateRecord, res *models.GrampusModelMigrateInfoResponse) error { + step := models.GrampusMigrateResponse(res.Status).ConvertToModelMigrateStep() + err := models.UpdateModelMigrateStatusByStep(r, step) + if err != nil { + log.Error("UpdateModelMigrateStatusByStep err,ID=%d err=%v", r.ID, err) + return err + } + r.DestBucket = res.DestBucket + r.DestEndpoint = res.DestEndpoint + r.DestObjectKey = res.DestObjectKey + r.DestProxy = res.DestProxy + r.Remark = strings.TrimPrefix(r.Remark+";"+util.TruncateString(res.FailedReason, 200), ";") + r.SrcBucket = res.SrcBucket + r.SrcEndpoint = res.SrcEndpoint + r.SrcObjectKey = res.SrcObjectKey + err = models.UpdateModelMigrateRecordByStep(r) + if err != nil { + log.Error("updateModelMigrateFromRes UpdateModelMigrateRecord error.id=%d.err=%v", r.ID, err) + return err + } + return nil +} + +func MoveBucketInOpenIMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { + var core = storage.ScheduleMinioCore + objectInfo := core.Client.ListObjects(oldBucket, objectKeyPrefix, true, nil) + log.Info("MoveBucketInOpenIMinio start.objectKeyPrefix=%s", objectKeyPrefix) + count := 0 + for object := range objectInfo { + count++ + if object.Err != nil { + log.Error("MoveBucketInOpenIMinio object.Err=%v", object.Err) + return object.Err + } + log.Debug("MoveBucketInOpenIMinio object.Key=%s", object.Key) + newObjectKey := strings.Replace(object.Key, objectKeyPrefix, targetObjectPrefix, 1) + err := MoveMinioFileBucket(core, object.Key, newObjectKey, oldBucket, newBucket) + if err != nil { + log.Error("MoveBucketInOpenIMinio MoveMinioFileBucket object.Key=%s Err=%v", object.Key, err) + continue + } + } + log.Info("MoveBucketInOpenIMinio finished.objectKeyPrefix=%s ,total=%d", objectKeyPrefix, count) + return nil +} + +func MoveBucketJust4LocalMinio(objectKeyPrefix, targetObjectPrefix, oldBucket, newBucket string) error { + oldPath := path.Join(setting.Attachment.Minio.RealPath, oldBucket, objectKeyPrefix) + newPath := path.Join(setting.Attachment.Minio.RealPath, newBucket, targetObjectPrefix) + log.Info("MoveBucketJust4LocalMinio start.oldPath=%s newPath=%s", oldPath, newPath) + //重命名原有文件夹,防止已有该文件 + err, errStr := sudoMv(newPath, fmt.Sprintf("%s_%d", newPath, time.Now().Unix())) + if err != nil { + log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr) + } + //移动(重命名)文件夹 + err, errStr = sudoMv(oldPath, newPath) + if err != nil { + log.Error("MoveBucketJust4LocalMinio sudoMv error.oldPath=%s newPath=%s Err=%v errStr=%s ", oldPath, newPath, err, errStr) + return err + } + log.Info("MoveBucketInOpenIMinio finished.oldPath=%s newPath=%s ", oldPath, newPath) + return nil +} + +func sudoMv(oldPath, newPath string) (error, string) { + c := fmt.Sprintf("sudo mv %s %s", oldPath, newPath) + log.Info("start to sudoMv,oldPath=%s newPath=%s", oldPath, newPath) + cmd := exec.Command("/bin/sh", "-c", c) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout // 标准输出 + cmd.Stderr = &stderr // 标准错误 + err := cmd.Run() + outStr, errStr := string(stdout.Bytes()), string(stderr.Bytes()) + log.Debug("out:\n%s\nerr:\n%s\n", outStr, errStr) + if err != nil { + log.Error("cmd.Run() failed,oldPath=%s newPath=%s err=%v\n", oldPath, newPath, err) + return err, errStr + } + return nil, errStr +} + +func MoveMinioFileBucket(core *minio.Core, oldObjectKey, newObjectKey, oldBucket, newBucket string) error { + _, err := core.CopyObject(oldBucket, oldObjectKey, newBucket, newObjectKey, map[string]string{}) + + if err != nil { + log.Error("MoveBucketInOpenIMinio CopyObject err oldObjectKey=%s .%v", oldObjectKey, err) + return err + } + + err = core.RemoveObject(oldBucket, oldObjectKey) + if err != nil { + log.Error("MoveBucketInOpenIMinio RemoveObject err oldObjectKey=%s .%v", oldObjectKey, err) + } + return err +} + +type DecompressReq struct { + SourceFile string `json:"source_file"` + DestPath string `json:"dest_path"` +} + +func decompress(sourceFile, destPath string) { + req, _ := json.Marshal(DecompressReq{ + SourceFile: sourceFile, + DestPath: destPath, + }) + err := labelmsg.SendDecompressAttachToLabelOBS(string(req)) + if err != nil { + log.Error("SendDecompressTask to labelsystem (%s) failed:%s", sourceFile, err.Error()) + } +} + +func IsNPUModelDirHasFile(jobName string, versionName string) bool { + prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/") + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "") + if err != nil { + log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err) + return false + } + + if len(fileInfos) > 0 { + return true + } + return len(fileInfos) > 0 +} diff --git a/services/cloudbrain/cloudbrainTask/schedule.go b/services/cloudbrain/cloudbrainTask/schedule.go deleted file mode 100644 index 9c9f899590..0000000000 --- a/services/cloudbrain/cloudbrainTask/schedule.go +++ /dev/null @@ -1,93 +0,0 @@ -package cloudbrainTask - -import ( - "code.gitea.io/gitea/models" - "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/modules/storage" - "errors" - "path" - "strings" -) - -func GetModelScheduleStatus(jobId string) (models.ModelScheduleStatus, error) { - job, err := models.GetCloudbrainByJobID(jobId) - if err != nil { - log.Error("GetModelScheduleStatus GetCloudbrainByJobID err.jobId=%s err=%v", jobId, err) - return 0, errors.New("jobId not correct") - } - if !job.IsTerminal() { - log.Info("GetModelScheduleStatus job is not terminal.jobId=%s", jobId) - return models.ModelScheduleWaiting, nil - } - - record, err := models.GetScheduleRecordByCloudbrainID(job.ID) - if err != nil { - log.Error("GetModelScheduleStatus GetScheduleRecordByCloudbrainID err.jobId=%s err=%v", jobId, err) - if models.IsErrRecordNotExist(err) { - return models.ModelScheduleSucceed, nil - } - return models.ModelScheduleFailed, err - } - - switch record.Status { - case models.StorageUrchinScheduleWaiting: - return models.ModelScheduleWaiting, nil - case models.StorageUrchinScheduleProcessing: - return models.ModelScheduleOperating, nil - case models.StorageUrchinScheduleFailed: - return models.ModelScheduleFailed, nil - case models.StorageUrchinNoFile: - return models.ModelScheduleSucceed, nil - case models.StorageUrchinScheduleSucceed: - moveStatus, err := GetMoveBucketStatus(record, job.JobName, job.VersionName) - if err != nil { - log.Error("GetMoveBucketStatus err.%v", err) - return models.ModelScheduleFailed, err - } - switch moveStatus { - case models.MoveBucketSucceed: - return models.ModelScheduleSucceed, nil - case models.MoveBucketOperating: - return models.ModelScheduleOperating, nil - case models.MoveBucketFailed: - return models.ModelScheduleFailed, nil - } - } - - return models.ModelScheduleFailed, nil -} - -func GetMoveBucketStatus(record *models.ScheduleRecord, jobName, versionName string) (int, error) { - - if record.ComputeSource == models.GPUResource || record.ComputeSource == models.GCUResource { - return record.LocalOperateStatus, nil - } - if record.LocalOperateStatus != models.MoveBucketOperating { - return record.LocalOperateStatus, nil - } - //由于NPU回传后还有异步的解压,所以对于进行中的状态需要进一步查询是否已解压结束 - //判断方法是查询模型目录是否有文件 - if IsNPUModelDirHasFile(jobName, versionName) { - models.UpdateScheduleLocalOperateStatus(record, models.MoveBucketSucceed) - return models.MoveBucketSucceed, nil - } - return record.LocalOperateStatus, nil -} - -func IsNPUModelDirHasFile(jobName string, versionName string) bool { - prefix := strings.TrimPrefix(path.Join(setting.TrainJobModelPath, jobName, setting.OutPutPath, versionName), "/") - if !strings.HasSuffix(prefix, "/") { - prefix += "/" - } - fileInfos, err := storage.GetOneLevelAllObjectUnderDir(setting.Bucket, prefix, "") - if err != nil { - log.Info("IsNPUModelDirHasFile.get TrainJobListModel failed:", err) - return false - } - - if len(fileInfos) > 0 { - return true - } - return len(fileInfos) > 0 -} diff --git a/services/cloudbrain/cloudbrainTask/train.go b/services/cloudbrain/cloudbrainTask/train.go index 3d4f104779..90bb9a9571 100644 --- a/services/cloudbrain/cloudbrainTask/train.go +++ b/services/cloudbrain/cloudbrainTask/train.go @@ -13,8 +13,6 @@ import ( "strconv" "strings" - "code.gitea.io/gitea/modules/urfs_client/urchin" - "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/notification" @@ -1087,9 +1085,6 @@ func SyncTaskStatus(task *models.Cloudbrain) error { task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) - if models.IsTrainJobTerminal(task.Status) && len(result.JobInfo.Tasks[0].CenterID) == 1 { - go urchin.GetAITaskOutPutBack(task.ID, task.JobName, result.JobInfo.Tasks[0].CenterID[0], task.ComputeResource) - } } err = models.UpdateJob(task) if err != nil { diff --git a/templates/repo/cloudbrain/show.tmpl b/templates/repo/cloudbrain/show.tmpl index 0d96a2c2fc..c5f7ac4910 100755 --- a/templates/repo/cloudbrain/show.tmpl +++ b/templates/repo/cloudbrain/show.tmpl @@ -389,7 +389,7 @@ function parseLog() { let jsonValue = document.getElementById("json_value").value; - let jsonObj = JSON.parse(jsonValue); + let jsonObj = jsonValue&&JSON.parse(jsonValue); let podRoleName = jsonObj["podRoleName"]; let html = ""; if (podRoleName != null) { diff --git a/templates/repo/grampus/trainjob/show.tmpl b/templates/repo/grampus/trainjob/show.tmpl index ac60e06127..48c8439ea3 100755 --- a/templates/repo/grampus/trainjob/show.tmpl +++ b/templates/repo/grampus/trainjob/show.tmpl @@ -94,7 +94,7 @@ {{if eq .ComputeResource "CPU/GPU"}} {{$.i18n.Tr "repo.cloudbrain.runinfo"}} {{end}} - {{$.i18n.Tr "repo.model_download"}} + {{$.i18n.Tr "repo.model_download"}}
@@ -587,6 +587,7 @@ }); $('td.ti-text-form-content.spec').text(specStr); })(); + console.log({{.version_list_task}}) var setting = { check: { enable: true, diff --git a/web_src/js/features/cloudbrainShow.js b/web_src/js/features/cloudbrainShow.js index 8054056945..2ca64c38fb 100644 --- a/web_src/js/features/cloudbrainShow.js +++ b/web_src/js/features/cloudbrainShow.js @@ -565,6 +565,7 @@ export default async function initCloudrainSow() { activeTab.trigger('click'); } // + $(".content-pad").on("click", ".load-model-file", function () { let downloadFlag = $(this).data("download-flag") || ""; let gpuFlag = $(this).data("gpu-flag") || ""; @@ -573,9 +574,12 @@ export default async function initCloudrainSow() { let filename = $(this).data("filename"); let init = $(this).data("init") || ""; let path = $(this).data("path"); + let retryPath = `/api/v1/repos${$(this).data("retry-path")}`; + const rescheduleFlag = $(this).data("can-reschedule") || ""; $(`#dir_list${version_name}`).empty(); let url = `/api/v1/repos${path}?version_name=${version_name}&parentDir=${parents}`; $.get(url, (data) => { + if (data.StatusOK == 0) { // 成功 0 if (data.Dirs) { data.Dirs.length !==0 && $(`#${version_name}-result-down`).show() @@ -595,6 +599,7 @@ export default async function initCloudrainSow() { $(`#file_breadcrumb${version_name}`).append(htmlBread); } else { renderBrend( + this, path, version_name, parents, @@ -609,7 +614,7 @@ export default async function initCloudrainSow() { $(`#dir_list${version_name}`).html(`
-
+
${i18n['task_not_finished']}
`); }else if (data.StatusOK == 1) { // 处理中 1 @@ -628,12 +633,23 @@ export default async function initCloudrainSow() {
`); } else if (data.StatusOK == 2) { // 失败 2 $(`#file_breadcrumb${version_name}`).empty(); - $(`#dir_list${version_name}`).html(`
-
- -
- ${i18n['file_sync_fail']} -
`); + if (rescheduleFlag) { + $(`#dir_list${version_name}`).html(`
+
+ +
+ ${i18n['file_sync_fail']} + ${i18n['retrieve_results']} +
`); + } + else { + $(`#dir_list${version_name}`).html(`
+
+ +
+ ${i18n['file_sync_fail']} +
`); + } } else if (data.StatusOK == 3) { // 等待同步 3 $(`#file_breadcrumb${version_name}`).empty(); $(`#dir_list${version_name}`).html(`
@@ -651,10 +667,21 @@ export default async function initCloudrainSow() { ${i18n['no_file_to_download']}
`); } + $('#retry_result').on('click', function () { + $.post(retryPath, (data) => { + if (data.code === 0) { + $('.load-model-file').trigger('click'); + } + }).fail(function (err) { + console.log(err); + }); + }) }).fail(function (err) { console.log(err, version_name); }); }); + + function renderSize(value) { if (null == value || value == "") { return "0 Bytes"; @@ -678,6 +705,7 @@ export default async function initCloudrainSow() { return size + unitArr[index]; } function renderBrend( + that, path, version_name, parents, @@ -711,15 +739,9 @@ export default async function initCloudrainSow() { } else { $(`input[name=model${version_name}]`).val(parents); $(`input[name=modelback${version_name}]`).val(filename); - - let selectEle = $(`#file_breadcrumb${version_name} a.section`).filter( - (index, item) => { - return item.text == filename; - } - ); - selectEle.nextAll().remove(); - selectEle.after("
/
"); - selectEle.replaceWith(`
${filename}
`); + $(that).nextAll().remove(); + $(that).after("
/
"); + $(that).replaceWith(`
${filename}
`); } } diff --git a/web_src/js/features/i18nVue.js b/web_src/js/features/i18nVue.js index c82fc89f16..c67db2a95c 100644 --- a/web_src/js/features/i18nVue.js +++ b/web_src/js/features/i18nVue.js @@ -74,7 +74,8 @@ export const i18nVue = { file_sync_wait:"文件等待同步中,请稍侯", file_sync_fail:"文件同步失败", no_file_to_download:"没有文件可以下载,稍后再来看看", - task_not_finished:"任务还未结束,稍后再来看看", + task_not_finished: "任务还未结束,稍后再来看看", + retrieve_results: "重新获取结果", local:"本地", online:"线上", modify:"修改", @@ -215,7 +216,8 @@ export const i18nVue = { file_sync_ing:"File synchronization in waitting, please wait", file_sync_fail:"File synchronization failed", no_file_to_download:"No files can be downloaded", - task_not_finished:"Task not finished yet, please wait", + task_not_finished: "Task not finished yet, please wait", + retrieve_results: "Retrieve results", local:"Local", online:"Online", modify:"Modify",