|
- package ai_task
-
- import (
- "archive/zip"
- "net/http"
- "net/url"
- "strings"
-
- "code.gitea.io/gitea/entity"
- "code.gitea.io/gitea/models"
- "code.gitea.io/gitea/modules/cloudbrain"
- "code.gitea.io/gitea/modules/context"
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/setting"
- "code.gitea.io/gitea/modules/util"
- "code.gitea.io/gitea/routers/common"
- "code.gitea.io/gitea/routers/response"
- "code.gitea.io/gitea/services/ai_task_service/schedule"
- "code.gitea.io/gitea/services/ai_task_service/task"
- "code.gitea.io/gitea/services/cloudbrain/resource"
- )
-
- func CreateAITask(ctx *context.Context, form entity.CreateReq) {
- handCreateReq(&form)
- res, err := task.CreateAITask(form, ctx.Repo.GitRepo, ctx.Repo.Repository, ctx.User)
- if err != nil {
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(res))
- }
- func DelAITask(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- t, _ := task.GetAITaskTemplateByCloudbrainId(id)
- if t == nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.PARAM_ERROR, ctx))
- return
- }
- err := t.Delete(id)
- if err != nil {
- log.Error("Delete error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccess())
- }
- func StopAITask(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- t, err := task.GetAITaskTemplateByCloudbrainId(id)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.PARAM_ERROR, ctx))
- return
- }
- res, err := t.Stop(id)
- if err != nil {
- log.Error("Stop error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(res))
- }
- func RestartAITask(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- res, bizErr := task.RestartAITask(id, ctx.Repo.GitRepo, ctx.Repo.Repository, ctx.User)
- if bizErr != nil {
- ctx.JSON(http.StatusOK, response.OuterTrBizError(bizErr, ctx))
- return
- }
-
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(res))
- }
-
- func GetAITaskLog(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- baseLine := ctx.QueryInt64("base_line")
- lines := ctx.QueryInt64("lines")
- order := ctx.Query("order")
- nodeId := ctx.QueryInt("node_id")
- logFileName := ctx.Query("file_name")
- cloudbrain, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("GetAITaskLog GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(cloudbrain)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.GetLog(entity.QueryLogOpts{
- CloudbrainId: id,
- BaseLine: baseLine,
- Lines: lines,
- Order: entity.Direction(order),
- NodeId: nodeId,
- LogFileName: logFileName,
- })
- if err != nil {
- log.Error("GetAITaskLog error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- if res.Content != "" && cloudbrain.IsUserHasRight(ctx.User) {
- res.CanLogDownload = true
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(res))
- }
-
- func DownloadAITaskLog(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- nodeId := ctx.QueryInt("node_id")
- logFileName := ctx.Query("file_name")
- cloudbrain, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("DownloadAITaskLog GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(cloudbrain)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.GetLogDownloadInfo(entity.GetLogDownloadInfoReq{
- CloudbrainId: id,
- NodeId: nodeId,
- LogFileName: logFileName,
- })
- if err != nil {
- log.Error("DownloadAITaskLog error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
-
- if res == nil || res.IsEmpty() {
- log.Error("DownloadAITaskLog error.%v", err)
- ctx.JSON(http.StatusNotFound, "")
- return
- }
-
- tmpErr := common.WriteDownloadContent2Resp(ctx, res)
- if tmpErr != nil {
- log.Error("DownloadAITaskLog error.%v", tmpErr)
- ctx.JSON(http.StatusOK, response.OuterResponseError(tmpErr))
- return
- }
-
- }
-
- func DownloadOutputFile(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- fileName := ctx.Query("file_name")
- parentDir := ctx.Query("parent_dir")
- cloudbrain, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("DownloadOutputFile GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(cloudbrain)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.GetSingleOutputDownloadInfo(entity.GetSingleDownloadInfoReq{
- CloudbrainId: id,
- FileName: fileName,
- ParentDir: parentDir,
- })
- if err != nil {
- log.Error("DownloadOutputFile error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
-
- if res == nil || res.IsEmpty() {
- log.Error("DownloadOutputFile error.%v", err)
- ctx.JSON(http.StatusNotFound, "")
- return
- }
-
- tmpErr := common.WriteDownloadContent2Resp(ctx, res)
- if tmpErr != nil {
- log.Error("DownloadAITaskLog error.%v", tmpErr)
- ctx.JSON(http.StatusOK, response.OuterResponseError(tmpErr))
- return
- }
-
- }
-
- func DownloadAllOutputFile(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- cloudbrain, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("DownloadAllOutputFile GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(cloudbrain)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- resultFileName := cloudbrain.JobName + ".zip"
- ctx.Resp.Header().Set("Content-Disposition", "attachment; filename="+url.QueryEscape(resultFileName))
- ctx.Resp.Header().Set("Content-Type", "application/octet-stream")
- zipWriter := zip.NewWriter(ctx.Resp)
- defer zipWriter.Close()
- err = t.DownloadAllOutput(entity.DownloadAllFileReq{
- CloudbrainId: id,
- ZIPWriter: zipWriter,
- })
- if err != nil {
- log.Error("DownloadAllOutput error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- }
-
- func GetAITaskInfo(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- job, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("GetAITaskInfo GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(job)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- resultTask, err := t.Query(id)
- if err != nil {
- log.Error("Query error.id=%d err=%v", id, err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- //加载关联版本
- earlyVersionList, bizErr := task.QueryTaskEarlyVersionList(id)
- if bizErr != nil {
- log.Error("QueryTaskEarlyVersionList err.id=%d err=%v", id, err)
- ctx.JSON(http.StatusOK, response.OuterResponseError(bizErr))
- return
- }
- res := &entity.QueryAITaskRes{
- Task: resultTask,
- CanDownload: cloudbrain.CanDownloadJob(ctx, job),
- EarlyVersionList: earlyVersionList,
- CanCreateVersion: job.CanUserModify(ctx.User),
- }
- //根据权限去掉数据集和模型信息
- res.TryToRemoveDatasetAndModelInfo(ctx.User)
- //国际化
- res.Tr(ctx.Language())
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(res))
- }
-
- func GetAITaskBriefInfo(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- t, err := task.GetAITaskTemplateByCloudbrainId(id)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.BriefQuery(id)
- if err != nil {
- log.Error("BriefQuery error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res.Tr(ctx.Language())
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(res))
- }
-
- func GetAITaskOutput(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- parentDir := ctx.Query("parent_dir")
- cloudbrainTask, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("GetAITaskOutput GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(cloudbrainTask)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.GetOutput(id, parentDir)
- if err != nil {
- log.Error("GetOutput error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res.CanReschedule = cloudbrain.CanDeleteJob(ctx, cloudbrainTask)
- res.CanDownload = cloudbrain.CanDownloadJob(ctx, cloudbrainTask)
-
- m := map[string]interface{}{"output": res}
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m))
- }
-
- func GetAllAITaskOutput(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- suffixStr := ctx.Query("suffix")
- var suffix []string
- if suffixStr != "" {
- suffixList := strings.Split(suffixStr, "|")
- for i := 0; i < len(suffixList); i++ {
- if suffixList[i] != "" {
- suffix = append(suffix, suffixList[i])
- }
- }
- }
- cloudbrainTask, bizErr := models.GetCloudbrainByCloudbrainID(id)
- if bizErr != nil {
- log.Error("GetAITaskOutput GetCloudbrainByCloudbrainID err.%v", bizErr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.AI_TASK_NOT_EXISTS, ctx))
- return
- }
- t, err := task.GetAITaskTemplateFromCloudbrain(cloudbrainTask)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.GetAllOutput(entity.GetAllOutputReq{
- CloudbrainId: cloudbrainTask.ID,
- Suffix: suffix,
- })
- if err != nil {
- log.Error("GetAllOutput error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
-
- m := map[string]interface{}{"output": res}
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m))
- }
-
- func GetNotebookUrl(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- t, err := task.GetAITaskTemplateByCloudbrainId(id)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- fileName := ctx.QueryTrim("file")
- url, err := t.GetDebugUrl(id, fileName)
- if err != nil {
- log.Error("GetNotebookUrl error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- m := map[string]interface{}{"url": url}
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m))
- }
-
- func GetNodeInfo(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- t, err := task.GetAITaskTemplateByCloudbrainId(id)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- res, err := t.GetNodeInfo(id)
- if err != nil {
- log.Error("GetNodeInfo error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
-
- m := map[string]interface{}{"nodes": res}
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(m))
- }
-
- func GetImageInfoBySelectedSpec(ctx *context.Context) {
- jobType := ctx.Query("job_type")
-
- if models.JobType(jobType) == (models.JobTypeOnlineInference) {
- jobType = string(models.JobTypeDebug)
- }
- log.Info("required jobType=" + jobType)
- computeSourceName := ctx.Query("compute_source")
- clusterType := ctx.Query("cluster_type")
-
- computeSource := models.GetComputeSourceInstance(computeSourceName)
- specId := ctx.QueryInt64("spec_id")
- hasInternet := ctx.QueryInt("has_internet")
-
- spec, err := resource.GetAndCheckSpec(ctx.User.ID, specId, models.FindSpecsOptions{
- JobType: models.JobType(jobType),
- ComputeResource: computeSourceName,
- Cluster: clusterType,
- HasInternet: models.SpecInternetQuery(hasInternet),
- })
-
- if err != nil || spec == nil {
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.SPEC_NOT_AVAILABLE, ctx))
- return
- }
-
- result, bizerr := task.GetAvailableImageInfoBySpec(entity.GetAITaskCreationImageInfoReq{
- ClusterType: entity.ClusterType(clusterType),
- ComputeSource: computeSource,
- Spec: spec,
- JobType: models.JobType(jobType),
- UserID: ctx.User.ID,
- })
- if bizerr != nil {
- log.Error("GetAITaskImageCreationInfo error,err=%v", bizerr)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(bizerr, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(result))
- }
-
- func GetCreationRequiredInfo(ctx *context.Context) {
- jobType := ctx.Query("job_type")
- var isOnlineType bool
- if models.JobType(jobType) == (models.JobTypeOnlineInference) {
- isOnlineType = true
- jobType = string(models.JobTypeDebug)
- }
- log.Info("required jobType=" + jobType)
- computeSourceName := ctx.Query("compute_source")
- clusterType := ctx.Query("cluster_type")
- computeSource := models.GetComputeSourceInstance(computeSourceName)
-
- result, err := task.GetAITaskCreationInfo(entity.GetAITaskCreationInfoReq{
- User: ctx.User,
- JobType: models.JobType(jobType),
- ClusterType: entity.ClusterType(clusterType),
- ComputeSource: computeSource,
- Repo: ctx.Repo.Repository,
- GitRepo: ctx.Repo.GitRepo,
- IsOnlineType: isOnlineType,
- })
- if err != nil {
- log.Error("GetAITaskCreationInfo error,err=%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(result))
- }
-
- func GetAITaskList(ctx *context.Context) {
- jobType := ctx.Query("job_type")
- computeSourceName := ctx.Query("compute_source")
- page := ctx.QueryInt("page")
- computeSource := models.GetComputeSourceInstance(computeSourceName)
- if page <= 0 {
- page = 1
- }
- jobTypes := make([]string, 0)
- if jobType != "" {
- jobTypes = append(jobTypes, jobType)
- }
- result, err := task.GetAITaskList(entity.GetTaskListReq{
- ListOptions: models.ListOptions{
- PageSize: setting.UI.IssuePagingNum,
- Page: page,
- },
- ComputeSource: computeSource,
- JobTypes: jobTypes,
- RepoID: ctx.Repo.Repository.ID,
- Operator: ctx.User,
- IsRepoOwner: ctx.Repo.IsOwner(),
- })
- if err != nil {
- log.Error("GetAITaskList error,err=%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- result.CanCreateTask = cloudbrain.CanCreateOrDebugJob(ctx)
- result.IsRepoEmpty = ctx.Repo.Repository.IsEmpty
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(result))
- }
-
- func GetAITaskOperationProfile(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- t, err := task.GetAITaskTemplateByCloudbrainId(id)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- r, err := t.GetOperationProfile(id)
- if err != nil {
- log.Error("GetOperationProfile error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(r))
- }
-
- func GetAITaskResourceUsage(ctx *context.Context) {
- id := ctx.QueryInt64("id")
- nodeId := ctx.QueryInt("node_id")
- logFileName := ctx.Query("file_name")
- t, err := task.GetAITaskTemplateByCloudbrainId(id)
- if err != nil {
- log.Error("param error")
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- r, err := t.GetResourceUsage(entity.GetResourceUsageOpts{
- CloudbrainId: id,
- NodeId: nodeId,
- LogFileName: logFileName,
- })
- if err != nil {
- log.Error("GetOperationProfile error.%v", err)
- ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(r))
- }
- func GetAllMonitorAITask(ctx *context.Context) {
-
- fileName := "cloudbrain.csv"
- status := ctx.QueryTrim("status")
-
- ctx.Resp.Header().Set("Content-Disposition", "attachment; filename="+url.QueryEscape(fileName))
- ctx.Resp.Header().Set("Content-Type", "application/octet-stream")
- err := task.MonitorTaskFile(status, ctx)
- if err != nil {
- log.Error("get monitor tasks err", err)
- }
-
- }
-
- func RetryModelSchedule(ctx *context.APIContext) {
- id := ctx.QueryInt64("id")
- if id <= 0 {
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.PARAM_ERROR, ctx))
- return
- }
- job, err := models.GetCloudbrainByCloudbrainID(id)
- if err != nil {
- ctx.JSON(http.StatusOK, response.OuterTrBizError(response.PARAM_ERROR, ctx))
- return
- }
- err = schedule.RetryModelMigrate(job)
- if err != nil {
- ctx.JSON(http.StatusOK, response.OuterResponseError(err))
- return
- }
- ctx.JSON(http.StatusOK, response.OuterSuccess())
- }
-
- func handCreateReq(req *entity.CreateReq) {
- req.JobName = util.ConvertDisplayJobNameToJobName(req.DisplayJobName)
- if req.WorkServerNumber == 0 {
- req.WorkServerNumber = 1
- }
- }
-
- func GenerateSDKCode(ctx *context.Context) {
- datasetNames := ctx.QueryStrings("dataset_name")
- pretrainModelNames := ctx.QueryStrings("pretrain_model_name")
- parameterKeys := ctx.QueryStrings("param_key")
- jobType := ctx.Query("job_type")
- code := task.GenerateSDKCode(datasetNames, pretrainModelNames, parameterKeys, models.JobType(jobType))
- ctx.JSON(http.StatusOK, response.OuterSuccessWithData(map[string]string{"code": code}))
- }
|