From d34517cc32c130151e432fce4a9c9d1fb947e765 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Fri, 25 Mar 2022 18:02:01 +0800 Subject: [PATCH 1/8] #1746 handle history data --- models/cloudbrain.go | 14 +++- modules/cloudbrain/cloudbrain.go | 2 + routers/private/internal.go | 2 + routers/repo/cloudbrain.go | 113 +++++++++++++++++++++++++++++-- 4 files changed, 124 insertions(+), 7 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index ea6d0338e9..4cd02d7c63 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -87,6 +87,8 @@ const ( ModelArtsTrainJobCheckRunning ModelArtsJobStatus = "CHECK_RUNNING" //审核作业正在运行中 ModelArtsTrainJobCheckRunningCompleted ModelArtsJobStatus = "CHECK_RUNNING_COMPLETED" //审核作业已经完成 ModelArtsTrainJobCheckFailed ModelArtsJobStatus = "CHECK_FAILED" //审核作业失败 + + DURATION_STR_ZERO = "00:00:00" ) type Cloudbrain struct { @@ -174,7 +176,7 @@ func (task *Cloudbrain) ComputeAndSetDuration() { func ConvertDurationToStr(duration int64) string { if duration == 0 { - return "00:00:00" + return DURATION_STR_ZERO } return util.AddZero(duration/3600) + ":" + util.AddZero(duration%3600/60) + ":" + util.AddZero(duration%60) } @@ -1323,6 +1325,7 @@ func CloudbrainsVersionList(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int, e } func CreateCloudbrain(cloudbrain *Cloudbrain) (err error) { + cloudbrain.TrainJobDuration = DURATION_STR_ZERO if _, err = x.Insert(cloudbrain); err != nil { return err } @@ -1467,6 +1470,15 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { Find(&cloudbrains) } +func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { + cloudbrains := make([]*Cloudbrain, 0, 10) + return cloudbrains, x. + In("status", ModelArtsTrainJobCompleted, ModelArtsTrainJobFailed, ModelArtsTrainJobKilled, ModelArtsStopped, JobStopped, JobFailed, JobSucceeded). + Where("train_job_duration is null or train_job_duration = '' "). + Limit(100). + Find(&cloudbrains) +} + func GetCloudbrainCountByUserID(userID int64, jobType string) (int, error) { count, err := x.In("status", JobWaiting, JobRunning).And("job_type = ? and user_id = ? and type = ?", jobType, userID, TypeCloudBrainOne).Count(new(Cloudbrain)) return int(count), err diff --git a/modules/cloudbrain/cloudbrain.go b/modules/cloudbrain/cloudbrain.go index 54ac0c7acf..9aae447b03 100755 --- a/modules/cloudbrain/cloudbrain.go +++ b/modules/cloudbrain/cloudbrain.go @@ -158,10 +158,12 @@ func GenerateTask(ctx *context.Context, displayJobName, jobName, image, command, if ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) } + for _, spec := range ResourceSpecs.ResourceSpec { if resourceSpecId == spec.Id { resourceSpec = spec } + } if resourceSpec == nil { diff --git a/routers/private/internal.go b/routers/private/internal.go index 0dd725ca3f..d80a706cc5 100755 --- a/routers/private/internal.go +++ b/routers/private/internal.go @@ -6,6 +6,7 @@ package private import ( + "code.gitea.io/gitea/routers/repo" "strings" "code.gitea.io/gitea/modules/log" @@ -45,6 +46,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Post("/tool/update_all_repo_commit_cnt", UpdateAllRepoCommitCnt) m.Post("/tool/repo_stat/:date", RepoStatisticManually) m.Post("/tool/update_repo_visit/:date", UpdateRepoVisit) + m.Post("/task/history_handle/duration", repo.HandleTaskWithNoDuration) }, CheckInternalToken) } diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index d444ea73fd..084dbcbe93 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -417,13 +417,16 @@ func cloudBrainShow(ctx *context.Context, tpName base.TplName) { } } if task.TrainJobDuration == "" { - var duration int64 - if task.Status == string(models.JobRunning) { - duration = time.Now().Unix() - int64(task.CreatedUnix) - } else { - duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix) + if task.Duration == 0 { + var duration int64 + if task.Status == string(models.JobRunning) { + duration = time.Now().Unix() - int64(task.CreatedUnix) + } else { + duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix) + } + task.Duration = duration } - task.TrainJobDuration = models.ConvertDurationToStr(duration) + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) } ctx.Data["duration"] = task.TrainJobDuration ctx.Data["task"] = task @@ -1060,6 +1063,104 @@ func SyncCloudbrainStatus() { return } +func HandleTaskWithNoDuration(ctx *context.Context) { + log.Info("HandleTaskWithNoDuration start") + cloudBrains, err := models.GetStoppedJobWithNoDurationJob() + if err != nil { + log.Error("HandleTaskWithNoTrainJobDuration failed:", err.Error()) + return + } + if len(cloudBrains) == 0 { + log.Info("HandleTaskWithNoTrainJobDuration:no task need handle") + return + } + + for _, task := range cloudBrains { + log.Info("Handle job ,%+v", task) + if task.Type == models.TypeCloudBrainOne { + result, err := cloudbrain.GetJob(task.JobID) + if err != nil { + log.Error("GetJob(%s) failed:%v", task.JobName, err) + continue + } + + if result != nil { + jobRes, _ := models.ConvertToJobResultPayload(result.Payload) + taskRoles := jobRes.TaskRoles + taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) + task.Status = taskRes.TaskStatuses[0].State + startTime := taskRes.TaskStatuses[0].StartAt.Unix() + endTime := taskRes.TaskStatuses[0].FinishedAt.Unix() + log.Info("task startTime = %v endTime= %v", startTime, endTime) + if startTime > 0 && endTime > 0 && endTime-startTime > 0 { + task.StartTime = timeutil.TimeStamp(startTime) + task.EndTime = timeutil.TimeStamp(endTime) + } else { + task.StartTime = task.CreatedUnix + task.EndTime = task.UpdatedUnix + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } + } + } else if task.Type == models.TypeCloudBrainTwo { + if task.JobType == string(models.JobTypeDebug) { + //result, err := modelarts.GetJob(task.JobID) + result, err := modelarts.GetNotebook2(task.JobID) + if err != nil { + log.Error("GetJob(%s) failed:%v", task.JobName, err) + continue + } + + if result != nil { + task.Status = result.Status + startTime := result.Lease.CreateTime + duration := result.Lease.Duration / 1000 + if startTime > 0 { + task.StartTime = timeutil.TimeStamp(startTime) + task.EndTime = task.StartTime.Add(duration) + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + continue + } + } + } else if task.JobType == string(models.JobTypeTrain) { + result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10)) + if err != nil { + log.Error("GetTrainJob(%s) failed:%v", task.JobName, err) + continue + } + + if result != nil { + startTime := result.StartTime / 1000 + if startTime > 0 { + task.StartTime = timeutil.TimeStamp(startTime) + task.EndTime = task.StartTime.Add(result.Duration / 1000) + } + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + continue + } + } + } else { + log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType) + } + + } else { + log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) + } + } + + return +} + func CloudBrainBenchmarkIndex(ctx *context.Context) { MustEnableCloudbrain(ctx) repo := ctx.Repo.Repository -- 2.34.1 From 23fb767efa64cc0b64260edb1702e56e83e0921a Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 10:18:18 +0800 Subject: [PATCH 2/8] #1654 fix bug --- models/cloudbrain.go | 2 +- routers/repo/cloudbrain.go | 16 +++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 4cd02d7c63..17761a1dc5 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -1471,7 +1471,7 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { } func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) { - cloudbrains := make([]*Cloudbrain, 0, 10) + cloudbrains := make([]*Cloudbrain, 0) return cloudbrains, x. In("status", ModelArtsTrainJobCompleted, ModelArtsTrainJobFailed, ModelArtsTrainJobKilled, ModelArtsStopped, JobStopped, JobFailed, JobSucceeded). Where("train_job_duration is null or train_job_duration = '' "). diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 084dbcbe93..7053177ac9 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1091,14 +1091,24 @@ func HandleTaskWithNoDuration(ctx *context.Context) { task.Status = taskRes.TaskStatuses[0].State startTime := taskRes.TaskStatuses[0].StartAt.Unix() endTime := taskRes.TaskStatuses[0].FinishedAt.Unix() - log.Info("task startTime = %v endTime= %v", startTime, endTime) - if startTime > 0 && endTime > 0 && endTime-startTime > 0 { + log.Info("task startTime = %v endTime= %v ,jobId=%d", startTime, endTime, task.ID) + if startTime > 0 { task.StartTime = timeutil.TimeStamp(startTime) - task.EndTime = timeutil.TimeStamp(endTime) } else { task.StartTime = task.CreatedUnix + } + if endTime > 0 { + task.EndTime = timeutil.TimeStamp(endTime) + } else { task.EndTime = task.UpdatedUnix } + + if task.EndTime < task.StartTime { + log.Info("endTime[%v] is less than starTime[%v],jobId=%d", task.EndTime, task.StartTime, task.ID) + st := task.StartTime + task.StartTime = task.EndTime + task.EndTime = st + } task.ComputeAndSetDuration() err = models.UpdateJob(task) if err != nil { -- 2.34.1 From 389ab81ceb8cd4d6caee4f9e5e593c10e47cd863 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 10:30:44 +0800 Subject: [PATCH 3/8] #1654 fix bug --- routers/repo/cloudbrain.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 7053177ac9..ee44df4f95 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1081,6 +1081,13 @@ func HandleTaskWithNoDuration(ctx *context.Context) { result, err := cloudbrain.GetJob(task.JobID) if err != nil { log.Error("GetJob(%s) failed:%v", task.JobName, err) + task.StartTime = task.CreatedUnix + task.EndTime = task.UpdatedUnix + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } continue } @@ -1121,6 +1128,13 @@ func HandleTaskWithNoDuration(ctx *context.Context) { result, err := modelarts.GetNotebook2(task.JobID) if err != nil { log.Error("GetJob(%s) failed:%v", task.JobName, err) + task.StartTime = task.CreatedUnix + task.EndTime = task.UpdatedUnix + task.ComputeAndSetDuration() + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } continue } -- 2.34.1 From 82787ddc2ed001f6c517b39692cb9cab1a4d1500 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 10:39:36 +0800 Subject: [PATCH 4/8] #1654 fix bug --- routers/repo/cloudbrain.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index ee44df4f95..fb40c8dec8 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1065,16 +1065,29 @@ func SyncCloudbrainStatus() { func HandleTaskWithNoDuration(ctx *context.Context) { log.Info("HandleTaskWithNoDuration start") - cloudBrains, err := models.GetStoppedJobWithNoDurationJob() - if err != nil { - log.Error("HandleTaskWithNoTrainJobDuration failed:", err.Error()) - return - } - if len(cloudBrains) == 0 { - log.Info("HandleTaskWithNoTrainJobDuration:no task need handle") - return + count := 0 + for { + cloudBrains, err := models.GetStoppedJobWithNoDurationJob() + if err != nil { + log.Error("HandleTaskWithNoTrainJobDuration failed:", err.Error()) + break + } + if len(cloudBrains) == 0 { + log.Info("HandleTaskWithNoTrainJobDuration:no task need handle") + break + } + handleNoDurationTask(cloudBrains) + count += len(cloudBrains) + if len(cloudBrains) < 100 { + log.Info("HandleTaskWithNoTrainJobDuration:task less than 100") + break + } } + log.Info("HandleTaskWithNoTrainJobDuration:count=%d", count) +} + +func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { for _, task := range cloudBrains { log.Info("Handle job ,%+v", task) if task.Type == models.TypeCloudBrainOne { @@ -1181,8 +1194,6 @@ func HandleTaskWithNoDuration(ctx *context.Context) { log.Error("task.Type(%s) is error:%d", task.JobName, task.Type) } } - - return } func CloudBrainBenchmarkIndex(ctx *context.Context) { -- 2.34.1 From 9f13443fa783314ef11e99445d3eac8e512ff3e5 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 10:56:37 +0800 Subject: [PATCH 5/8] #1654 fix bug --- routers/repo/cloudbrain.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index fb40c8dec8..a1a8d4a125 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1094,17 +1094,15 @@ func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { result, err := cloudbrain.GetJob(task.JobID) if err != nil { log.Error("GetJob(%s) failed:%v", task.JobName, err) - task.StartTime = task.CreatedUnix - task.EndTime = task.UpdatedUnix - task.ComputeAndSetDuration() - err = models.UpdateJob(task) - if err != nil { - log.Error("UpdateJob(%s) failed:%v", task.JobName, err) - } + updateDefaultDuration(task) continue } if result != nil { + if result.Msg != "success" { + updateDefaultDuration(task) + continue + } jobRes, _ := models.ConvertToJobResultPayload(result.Payload) taskRoles := jobRes.TaskRoles taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) @@ -1196,6 +1194,17 @@ func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { } } +func updateDefaultDuration(task *models.Cloudbrain) { + log.Info("updateDefaultDuration: taskId=%d", task.ID) + task.StartTime = task.CreatedUnix + task.EndTime = task.UpdatedUnix + task.ComputeAndSetDuration() + err := models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob(%s) failed:%v", task.JobName, err) + } +} + func CloudBrainBenchmarkIndex(ctx *context.Context) { MustEnableCloudbrain(ctx) repo := ctx.Repo.Repository -- 2.34.1 From 8acf96236f86e554b483dbbf77f616cf9d201ce8 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 11:13:37 +0800 Subject: [PATCH 6/8] #1654 fix bug --- routers/repo/cloudbrain.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index a1a8d4a125..a0bed615ce 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1105,7 +1105,15 @@ func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { } jobRes, _ := models.ConvertToJobResultPayload(result.Payload) taskRoles := jobRes.TaskRoles + if len(taskRoles) == 0 { + updateDefaultDuration(task) + continue + } taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) + if len(taskRes.TaskStatuses) == 0 { + updateDefaultDuration(task) + continue + } task.Status = taskRes.TaskStatuses[0].State startTime := taskRes.TaskStatuses[0].StartAt.Unix() endTime := taskRes.TaskStatuses[0].FinishedAt.Unix() -- 2.34.1 From f58583c46e71f22c9f831aae5a55eff7b3b8d15f Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 11:24:43 +0800 Subject: [PATCH 7/8] #1654 fix bug --- routers/repo/cloudbrain.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index a0bed615ce..2d4b4f2799 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1103,14 +1103,14 @@ func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { updateDefaultDuration(task) continue } - jobRes, _ := models.ConvertToJobResultPayload(result.Payload) - taskRoles := jobRes.TaskRoles - if len(taskRoles) == 0 { + jobRes, err := models.ConvertToJobResultPayload(result.Payload) + if err != nil || len(jobRes.TaskRoles) == 0 { updateDefaultDuration(task) continue } - taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) - if len(taskRes.TaskStatuses) == 0 { + taskRoles := jobRes.TaskRoles + taskRes, err := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) + if err != nil || len(taskRes.TaskStatuses) == 0 { updateDefaultDuration(task) continue } -- 2.34.1 From 215b67061fa500858a627fc514f33f4a57e2bc7d Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Mon, 28 Mar 2022 11:45:52 +0800 Subject: [PATCH 8/8] #1654 fix bug --- routers/repo/cloudbrain.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index 2d4b4f2799..1b83c86abe 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -1084,7 +1084,7 @@ func HandleTaskWithNoDuration(ctx *context.Context) { } } log.Info("HandleTaskWithNoTrainJobDuration:count=%d", count) - + ctx.JSON(200, "success") } func handleNoDurationTask(cloudBrains []*models.Cloudbrain) { @@ -1241,13 +1241,16 @@ func CloudBrainBenchmarkIndex(ctx *context.Context) { ciTasks[i].CanDel = cloudbrain.CanDeleteJob(ctx, &task.Cloudbrain) ciTasks[i].Cloudbrain.ComputeResource = task.ComputeResource if ciTasks[i].TrainJobDuration == "" { - var duration int64 - if task.Status == string(models.JobRunning) { - duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix) - } else { - duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix) + if ciTasks[i].Duration == 0 { + var duration int64 + if task.Status == string(models.JobRunning) { + duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix) + } else { + duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix) + } + ciTasks[i].Duration = duration } - ciTasks[i].TrainJobDuration = models.ConvertDurationToStr(duration) + ciTasks[i].TrainJobDuration = models.ConvertDurationToStr(ciTasks[i].Duration) } ciTasks[i].BenchmarkTypeName = "" -- 2.34.1