#1755 处理历史云脑任务的运行时长

Merged
lewis merged 9 commits from fix-1654 into V20220328 2 years ago
  1. +13
    -1
      models/cloudbrain.go
  2. +2
    -0
      modules/cloudbrain/cloudbrain.go
  3. +2
    -0
      routers/private/internal.go
  4. +168
    -12
      routers/repo/cloudbrain.go

+ 13
- 1
models/cloudbrain.go View File

@@ -87,6 +87,8 @@ const (
ModelArtsTrainJobCheckRunning ModelArtsJobStatus = "CHECK_RUNNING" //审核作业正在运行中
ModelArtsTrainJobCheckRunningCompleted ModelArtsJobStatus = "CHECK_RUNNING_COMPLETED" //审核作业已经完成
ModelArtsTrainJobCheckFailed ModelArtsJobStatus = "CHECK_FAILED" //审核作业失败

DURATION_STR_ZERO = "00:00:00"
)

type Cloudbrain struct {
@@ -174,7 +176,7 @@ func (task *Cloudbrain) ComputeAndSetDuration() {

func ConvertDurationToStr(duration int64) string {
if duration == 0 {
return "00:00:00"
return DURATION_STR_ZERO
}
return util.AddZero(duration/3600) + ":" + util.AddZero(duration%3600/60) + ":" + util.AddZero(duration%60)
}
@@ -1323,6 +1325,7 @@ func CloudbrainsVersionList(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int, e
}

func CreateCloudbrain(cloudbrain *Cloudbrain) (err error) {
cloudbrain.TrainJobDuration = DURATION_STR_ZERO
if _, err = x.Insert(cloudbrain); err != nil {
return err
}
@@ -1467,6 +1470,15 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) {
Find(&cloudbrains)
}

func GetStoppedJobWithNoDurationJob() ([]*Cloudbrain, error) {
cloudbrains := make([]*Cloudbrain, 0)
return cloudbrains, x.
In("status", ModelArtsTrainJobCompleted, ModelArtsTrainJobFailed, ModelArtsTrainJobKilled, ModelArtsStopped, JobStopped, JobFailed, JobSucceeded).
Where("train_job_duration is null or train_job_duration = '' ").
Limit(100).
Find(&cloudbrains)
}

func GetCloudbrainCountByUserID(userID int64, jobType string) (int, error) {
count, err := x.In("status", JobWaiting, JobRunning).And("job_type = ? and user_id = ? and type = ?", jobType, userID, TypeCloudBrainOne).Count(new(Cloudbrain))
return int(count), err


+ 2
- 0
modules/cloudbrain/cloudbrain.go View File

@@ -158,10 +158,12 @@ func GenerateTask(ctx *context.Context, displayJobName, jobName, image, command,
if ResourceSpecs == nil {
json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs)
}

for _, spec := range ResourceSpecs.ResourceSpec {
if resourceSpecId == spec.Id {
resourceSpec = spec
}

}

if resourceSpec == nil {


+ 2
- 0
routers/private/internal.go View File

@@ -6,6 +6,7 @@
package private

import (
"code.gitea.io/gitea/routers/repo"
"strings"

"code.gitea.io/gitea/modules/log"
@@ -45,6 +46,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Post("/tool/update_all_repo_commit_cnt", UpdateAllRepoCommitCnt)
m.Post("/tool/repo_stat/:date", RepoStatisticManually)
m.Post("/tool/update_repo_visit/:date", UpdateRepoVisit)
m.Post("/task/history_handle/duration", repo.HandleTaskWithNoDuration)

}, CheckInternalToken)
}

+ 168
- 12
routers/repo/cloudbrain.go View File

@@ -419,13 +419,16 @@ func cloudBrainShow(ctx *context.Context, tpName base.TplName) {
}
}
if task.TrainJobDuration == "" {
var duration int64
if task.Status == string(models.JobRunning) {
duration = time.Now().Unix() - int64(task.CreatedUnix)
} else {
duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix)
if task.Duration == 0 {
var duration int64
if task.Status == string(models.JobRunning) {
duration = time.Now().Unix() - int64(task.CreatedUnix)
} else {
duration = int64(task.UpdatedUnix) - int64(task.CreatedUnix)
}
task.Duration = duration
}
task.TrainJobDuration = models.ConvertDurationToStr(duration)
task.TrainJobDuration = models.ConvertDurationToStr(task.Duration)
}
ctx.Data["duration"] = task.TrainJobDuration
ctx.Data["task"] = task
@@ -1062,6 +1065,156 @@ func SyncCloudbrainStatus() {
return
}

func HandleTaskWithNoDuration(ctx *context.Context) {
log.Info("HandleTaskWithNoDuration start")
count := 0
for {
cloudBrains, err := models.GetStoppedJobWithNoDurationJob()
if err != nil {
log.Error("HandleTaskWithNoTrainJobDuration failed:", err.Error())
break
}
if len(cloudBrains) == 0 {
log.Info("HandleTaskWithNoTrainJobDuration:no task need handle")
break
}
handleNoDurationTask(cloudBrains)
count += len(cloudBrains)
if len(cloudBrains) < 100 {
log.Info("HandleTaskWithNoTrainJobDuration:task less than 100")
break
}
}
log.Info("HandleTaskWithNoTrainJobDuration:count=%d", count)
ctx.JSON(200, "success")
}

func handleNoDurationTask(cloudBrains []*models.Cloudbrain) {
for _, task := range cloudBrains {
log.Info("Handle job ,%+v", task)
if task.Type == models.TypeCloudBrainOne {
result, err := cloudbrain.GetJob(task.JobID)
if err != nil {
log.Error("GetJob(%s) failed:%v", task.JobName, err)
updateDefaultDuration(task)
continue
}

if result != nil {
if result.Msg != "success" {
updateDefaultDuration(task)
continue
}
jobRes, err := models.ConvertToJobResultPayload(result.Payload)
if err != nil || len(jobRes.TaskRoles) == 0 {
updateDefaultDuration(task)
continue
}
taskRoles := jobRes.TaskRoles
taskRes, err := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{}))
if err != nil || len(taskRes.TaskStatuses) == 0 {
updateDefaultDuration(task)
continue
}
task.Status = taskRes.TaskStatuses[0].State
startTime := taskRes.TaskStatuses[0].StartAt.Unix()
endTime := taskRes.TaskStatuses[0].FinishedAt.Unix()
log.Info("task startTime = %v endTime= %v ,jobId=%d", startTime, endTime, task.ID)
if startTime > 0 {
task.StartTime = timeutil.TimeStamp(startTime)
} else {
task.StartTime = task.CreatedUnix
}
if endTime > 0 {
task.EndTime = timeutil.TimeStamp(endTime)
} else {
task.EndTime = task.UpdatedUnix
}

if task.EndTime < task.StartTime {
log.Info("endTime[%v] is less than starTime[%v],jobId=%d", task.EndTime, task.StartTime, task.ID)
st := task.StartTime
task.StartTime = task.EndTime
task.EndTime = st
}
task.ComputeAndSetDuration()
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
}
}
} else if task.Type == models.TypeCloudBrainTwo {
if task.JobType == string(models.JobTypeDebug) {
//result, err := modelarts.GetJob(task.JobID)
result, err := modelarts.GetNotebook2(task.JobID)
if err != nil {
log.Error("GetJob(%s) failed:%v", task.JobName, err)
task.StartTime = task.CreatedUnix
task.EndTime = task.UpdatedUnix
task.ComputeAndSetDuration()
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
}
continue
}

if result != nil {
task.Status = result.Status
startTime := result.Lease.CreateTime
duration := result.Lease.Duration / 1000
if startTime > 0 {
task.StartTime = timeutil.TimeStamp(startTime)
task.EndTime = task.StartTime.Add(duration)
}
task.ComputeAndSetDuration()
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
continue
}
}
} else if task.JobType == string(models.JobTypeTrain) {
result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
if err != nil {
log.Error("GetTrainJob(%s) failed:%v", task.JobName, err)
continue
}

if result != nil {
startTime := result.StartTime / 1000
if startTime > 0 {
task.StartTime = timeutil.TimeStamp(startTime)
task.EndTime = task.StartTime.Add(result.Duration / 1000)
}
task.ComputeAndSetDuration()
err = models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
continue
}
}
} else {
log.Error("task.JobType(%s) is error:%s", task.JobName, task.JobType)
}

} else {
log.Error("task.Type(%s) is error:%d", task.JobName, task.Type)
}
}
}

func updateDefaultDuration(task *models.Cloudbrain) {
log.Info("updateDefaultDuration: taskId=%d", task.ID)
task.StartTime = task.CreatedUnix
task.EndTime = task.UpdatedUnix
task.ComputeAndSetDuration()
err := models.UpdateJob(task)
if err != nil {
log.Error("UpdateJob(%s) failed:%v", task.JobName, err)
}
}

func CloudBrainBenchmarkIndex(ctx *context.Context) {
MustEnableCloudbrain(ctx)
repo := ctx.Repo.Repository
@@ -1090,13 +1243,16 @@ func CloudBrainBenchmarkIndex(ctx *context.Context) {
ciTasks[i].CanDel = cloudbrain.CanDeleteJob(ctx, &task.Cloudbrain)
ciTasks[i].Cloudbrain.ComputeResource = task.ComputeResource
if ciTasks[i].TrainJobDuration == "" {
var duration int64
if task.Status == string(models.JobRunning) {
duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix)
} else {
duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix)
if ciTasks[i].Duration == 0 {
var duration int64
if task.Status == string(models.JobRunning) {
duration = time.Now().Unix() - int64(task.Cloudbrain.CreatedUnix)
} else {
duration = int64(task.Cloudbrain.UpdatedUnix) - int64(task.Cloudbrain.CreatedUnix)
}
ciTasks[i].Duration = duration
}
ciTasks[i].TrainJobDuration = models.ConvertDurationToStr(duration)
ciTasks[i].TrainJobDuration = models.ConvertDurationToStr(ciTasks[i].Duration)
}

ciTasks[i].BenchmarkTypeName = ""


Loading…
Cancel
Save