From b33effb376cec6c21f90a38b0f3d306a9cbba415 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Fri, 18 Aug 2023 16:23:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BB=93=E6=9E=9C=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- entity/ai_task.go | 10 ++- entity/cluster.go | 10 ++- routers/ai_task/ai_task.go | 24 +++---- services/ai_task_service/cluster/c2net.go | 6 +- .../ai_task_service/cluster/cloudbrain_one.go | 6 +- .../ai_task_service/cluster/cloudbrain_two.go | 6 +- .../ai_task_service/cluster/cluster_base.go | 4 +- services/ai_task_service/cluster/common.go | 62 ++++++++++++------- services/ai_task_service/task/task_base.go | 16 ++--- services/ai_task_service/task/task_service.go | 17 ++--- 10 files changed, 91 insertions(+), 70 deletions(-) diff --git a/entity/ai_task.go b/entity/ai_task.go index ea768ebe3d..5e4a2d8f0c 100644 --- a/entity/ai_task.go +++ b/entity/ai_task.go @@ -1,6 +1,7 @@ package entity import ( + "archive/zip" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/storage" "encoding/json" @@ -275,7 +276,14 @@ type GetAllOutputReq struct { Suffix []string } -type GetOutputDownloadInfoReq struct { +type DownloadAllFileReq struct { + CloudbrainId int64 + FileName string + ParentDir string + ZIPWriter *zip.Writer +} + +type GetSingleDownloadInfoReq struct { CloudbrainId int64 FileName string ParentDir string diff --git a/entity/cluster.go b/entity/cluster.go index 43f7e9f161..ce65cc669e 100644 --- a/entity/cluster.go +++ b/entity/cluster.go @@ -1,6 +1,7 @@ package entity import ( + "archive/zip" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/log" @@ -318,7 +319,14 @@ type ClusterLogDownloadInfoOpts struct { DisplayJobName string } -type ClusterOutputDownloadInfoOpts struct { +type DownloadOutputOpts struct { + JobId string + Path string + JobName string + StorageType StorageType + ZIPWriter *zip.Writer +} +type ClusterSingleOutputDownloadInfoOpts struct { JobId string Path string JobName string diff --git a/routers/ai_task/ai_task.go b/routers/ai_task/ai_task.go index 07c34aca3d..8b5de7c476 100644 --- a/routers/ai_task/ai_task.go +++ b/routers/ai_task/ai_task.go @@ -1,6 +1,7 @@ package ai_task import ( + "archive/zip" "code.gitea.io/gitea/entity" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" @@ -164,7 +165,7 @@ func DownloadOutputFile(ctx *context.Context) { ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx)) return } - res, err := t.GetSingleOutputDownloadInfo(entity.GetOutputDownloadInfoReq{ + res, err := t.GetSingleOutputDownloadInfo(entity.GetSingleDownloadInfoReq{ CloudbrainId: id, FileName: fileName, ParentDir: parentDir, @@ -204,28 +205,17 @@ func DownloadAllOutputFile(ctx *context.Context) { ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx)) return } - res, err := t.GetAllOutputDownloadInfo(entity.GetOutputDownloadInfoReq{ + zipWriter := zip.NewWriter(ctx.Resp) + defer zipWriter.Close() + err = t.DownloadAllOutput(entity.DownloadAllFileReq{ CloudbrainId: id, + ZIPWriter: zipWriter, }) if err != nil { - log.Error("GetAllOutputDownloadInfo error.%v", err) + log.Error("DownloadAllOutput error.%v", err) ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx)) return } - - if res == nil || res.IsEmpty() { - log.Error("DownloadAllOutputFile error.%v", err) - ctx.JSON(http.StatusNotFound, "") - return - } - - tmpErr := common.WriteDownloadContent2Resp(ctx, res) - if tmpErr != nil { - log.Error("DownloadAITaskLog error.%v", tmpErr) - ctx.JSON(http.StatusOK, response.OuterResponseError(tmpErr)) - return - } - } func GetAITaskInfo(ctx *context.Context) { diff --git a/services/ai_task_service/cluster/c2net.go b/services/ai_task_service/cluster/c2net.go index 3c8ab310c9..3749782e69 100644 --- a/services/ai_task_service/cluster/c2net.go +++ b/services/ai_task_service/cluster/c2net.go @@ -699,7 +699,7 @@ func (c C2NetClusterAdapter) GetLogDownloadInfo(opts entity.ClusterLogDownloadIn }, nil } -func (c C2NetClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { +func (c C2NetClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType) url, err := helper.GetSignedDownloadUrl(opts.Path) if err != nil { @@ -711,8 +711,8 @@ func (c C2NetClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutp }, nil } -func (c C2NetClusterAdapter) GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { - return GetAllOutputDownloadInfo(opts) +func (c C2NetClusterAdapter) DownloadAllOutput(opts entity.DownloadOutputOpts) error { + return DownloadAllOutput(opts) } func (c C2NetClusterAdapter) GetNodeInfo(opts entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error) { diff --git a/services/ai_task_service/cluster/cloudbrain_one.go b/services/ai_task_service/cluster/cloudbrain_one.go index 66e6763fbf..f43b9902b8 100644 --- a/services/ai_task_service/cluster/cloudbrain_one.go +++ b/services/ai_task_service/cluster/cloudbrain_one.go @@ -427,7 +427,7 @@ func (c CloudbrainOneClusterAdapter) GetLogDownloadInfo(opts entity.ClusterLogDo }, nil } -func (c CloudbrainOneClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { +func (c CloudbrainOneClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType) url, err := helper.GetSignedDownloadUrl(opts.Path) if err != nil { @@ -439,8 +439,8 @@ func (c CloudbrainOneClusterAdapter) GetSingleOutputDownloadInfo(opts entity.Clu }, nil } -func (c CloudbrainOneClusterAdapter) GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { - return GetAllOutputDownloadInfo(opts) +func (c CloudbrainOneClusterAdapter) DownloadAllOutput(opts entity.DownloadOutputOpts) error { + return DownloadAllOutput(opts) } func (c CloudbrainOneClusterAdapter) GetNodeInfo(opts entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error) { diff --git a/services/ai_task_service/cluster/cloudbrain_two.go b/services/ai_task_service/cluster/cloudbrain_two.go index 58ca694293..c730b2ec3d 100644 --- a/services/ai_task_service/cluster/cloudbrain_two.go +++ b/services/ai_task_service/cluster/cloudbrain_two.go @@ -766,7 +766,7 @@ func (c CloudbrainTwoClusterAdapter) GetLogDownloadInfo(opts entity.ClusterLogDo }, nil } -func (c CloudbrainTwoClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { +func (c CloudbrainTwoClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType) url, err := helper.GetSignedDownloadUrl(opts.Path) if err != nil { @@ -778,8 +778,8 @@ func (c CloudbrainTwoClusterAdapter) GetSingleOutputDownloadInfo(opts entity.Clu }, nil } -func (c CloudbrainTwoClusterAdapter) GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { - return GetAllOutputDownloadInfo(opts) +func (c CloudbrainTwoClusterAdapter) DownloadAllOutput(opts entity.DownloadOutputOpts) error { + return DownloadAllOutput(opts) } func (c CloudbrainTwoClusterAdapter) GetTrainJobOperationProfile(jobId string) (*entity.OperationProfile, error) { diff --git a/services/ai_task_service/cluster/cluster_base.go b/services/ai_task_service/cluster/cluster_base.go index e3fbb5ea1f..ce66e16629 100644 --- a/services/ai_task_service/cluster/cluster_base.go +++ b/services/ai_task_service/cluster/cluster_base.go @@ -42,8 +42,8 @@ type ClusterAdapter interface { GetTrainJobOperationProfile(jobId string) (*entity.OperationProfile, error) GetOutput(opts entity.ClusterOutputOpts) (*entity.ClusterAITaskOutput, error) GetAllOutput(opts entity.ClusterOutputOpts) (*entity.AllAITaskOutput, error) - GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) - GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) + GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) + DownloadAllOutput(opts entity.DownloadOutputOpts) error GetNodeInfo(opts entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error) GetResourceUsage(opts entity.ClusterResourceUsageOpts) (*entity.ResourceUsage, error) //GetImages return available list of clusters diff --git a/services/ai_task_service/cluster/common.go b/services/ai_task_service/cluster/common.go index 29c512b8ca..33f02a764b 100644 --- a/services/ai_task_service/cluster/common.go +++ b/services/ai_task_service/cluster/common.go @@ -1,6 +1,7 @@ package cluster import ( + "archive/zip" "bufio" "code.gitea.io/gitea/entity" "code.gitea.io/gitea/modules/log" @@ -95,46 +96,59 @@ func getLogFilesInStorage(helper storage_helper.StorageHelper, objectKeyPrefix s return logFiles } -func GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) { +func DownloadAllOutput(opts entity.DownloadOutputOpts) error { helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType) var err error fileList, err := helper.GetAllObjectsUnderDir(opts.Path) if err != nil { log.Error("GetAllObjectsUnderDir err.objectKeyPrefix=%s,err=%v", opts.Path, err) - return nil, err + return err } if len(fileList) == 0 { - return nil, nil - } - - res := &entity.FileDownloadInfo{ - Readers: make([]entity.FileReader, 0), - ResultType: entity.FileTypeZIP, - ResultFileName: opts.JobName + ".zip", + return nil } - defer func() { - if err != nil { - res.Close() - } - }() - for i := 0; i < len(fileList); i++ { file := fileList[i] if file.IsDir { continue } - var reader io.ReadCloser - reader, err = helper.OpenFile(file.RelativePath) + err = openAndWrite2ZIP(helper, file, opts.ZIPWriter) if err != nil { - log.Error("GetAllOutputDownloadInfo OpenFile err.opts=%+v,err =%v", opts, err) - return nil, err + log.Error("openAndWrite2ZIP err.%v", err) + return err } - res.Readers = append(res.Readers, entity.FileReader{ - Reader: reader, - Name: file.FileName, - }) } - return res, nil + return nil +} + +func openAndWrite2ZIP(helper storage_helper.StorageHelper, file storage.FileInfo, zipWriter *zip.Writer) error { + var reader io.ReadCloser + reader, err := helper.OpenFile(file.RelativePath) + if err != nil { + log.Error("openAndWrite2ZIP OpenFile err.filePath=%+v,err =%v", file.RelativePath, err) + return err + } + defer reader.Close() + + fDest, err := zipWriter.Create(file.FileName) + if err != nil { + log.Error("zipWriter.Create error.%v", err) + return err + } + p := make([]byte, 1024) + var readErr error + var readCount int + // 读取对象内容 + for { + readCount, readErr = reader.Read(p) + if readCount > 0 { + fDest.Write(p[:readCount]) + } + if readErr != nil { + break + } + } + return nil } diff --git a/services/ai_task_service/task/task_base.go b/services/ai_task_service/task/task_base.go index 604f58d2a0..9f62eba2cf 100644 --- a/services/ai_task_service/task/task_base.go +++ b/services/ai_task_service/task/task_base.go @@ -53,8 +53,8 @@ type AITaskTemplate interface { Update(cloudbrainId int64) *response.BizError GetLog(opts entity.QueryLogOpts) (*entity.ClusterLog, *response.BizError) GetLogDownloadInfo(opts entity.GetLogDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) - GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) - GetAllOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) + GetSingleOutputDownloadInfo(opts entity.GetSingleDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) + DownloadAllOutput(opts entity.DownloadAllFileReq) *response.BizError GetOutput(cloudbrainId int64, parentDir string) (*entity.AITaskOutput, *response.BizError) GetAllOutput(opts entity.GetAllOutputReq) (*entity.AllAITaskOutput, *response.BizError) GetDebugUrl(cloudbrainId int64, fileName ...string) (string, *response.BizError) @@ -346,7 +346,7 @@ func (g DefaultAITaskTemplate) GetLogDownloadInfo(opts entity.GetLogDownloadInfo return s, nil } -func (g DefaultAITaskTemplate) GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) { +func (g DefaultAITaskTemplate) GetSingleOutputDownloadInfo(opts entity.GetSingleDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) { c := g.GetMyCluster() if c == nil { log.Error("Get cluster failed,cloudbrainId=%d", opts) @@ -361,19 +361,19 @@ func (g DefaultAITaskTemplate) GetSingleOutputDownloadInfo(opts entity.GetOutput return s, nil } -func (g DefaultAITaskTemplate) GetAllOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) { +func (g DefaultAITaskTemplate) DownloadAllOutput(opts entity.DownloadAllFileReq) *response.BizError { c := g.GetMyCluster() if c == nil { log.Error("Get cluster failed,cloudbrainId=%d", opts) - return nil, response.SYSTEM_ERROR + return response.SYSTEM_ERROR } - s, err := GetAllOutputDownloadInfo(opts, c.GetAllOutputDownloadInfo) + err := DownloadAllOutput(opts, c.DownloadAllOutput) if err != nil { log.Error("GetOutputDownloadInfo err.cloudbrainId=%d ", opts) - return nil, nil + return nil } - return s, nil + return nil } func (g DefaultAITaskTemplate) GetOutput(cloudbrainId int64, parentDir string) (*entity.AITaskOutput, *response.BizError) { diff --git a/services/ai_task_service/task/task_service.go b/services/ai_task_service/task/task_service.go index 84a3a44763..403a70ead9 100644 --- a/services/ai_task_service/task/task_service.go +++ b/services/ai_task_service/task/task_service.go @@ -36,8 +36,8 @@ type GetNotebookUrlFunc func(string) (string, error) type GetNodeInfoFunc func(entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error) type GetOutputFunc func(entity.ClusterOutputOpts) (*entity.ClusterAITaskOutput, error) type GetAllOutputFunc func(entity.ClusterOutputOpts) (*entity.AllAITaskOutput, error) -type GetSingleOutputDownloadInfoFunc func(req entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) -type GetAllOutputDownloadInfoFunc func(req entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) +type GetSingleOutputDownloadInfoFunc func(req entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) +type DownloadAllOutputFunc func(req entity.DownloadOutputOpts) error type GetOperationProfileFunc func(string) (*entity.OperationProfile, error) type GetResourceUsageFunc func(entity.ClusterResourceUsageOpts) (*entity.ResourceUsage, error) @@ -430,7 +430,7 @@ func GetLogDownloadInfo(opts entity.GetLogDownloadInfoReq, getLogDownloadInfo Ge }) } -func GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq, f GetSingleOutputDownloadInfoFunc) (*entity.FileDownloadInfo, error) { +func GetSingleOutputDownloadInfo(opts entity.GetSingleDownloadInfoReq, f GetSingleOutputDownloadInfoFunc) (*entity.FileDownloadInfo, error) { cloudbrain, err := models.GetCloudbrainByCloudbrainID(opts.CloudbrainId) if err != nil { return nil, err @@ -440,27 +440,28 @@ func GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq, f GetSing } aiConfig := GetDetailConfigInfoByCloudbrain(cloudbrain) fileRelativePath := path.Join(aiConfig.OutputObjectPrefix, opts.ParentDir, opts.FileName) - return f(entity.ClusterOutputDownloadInfoOpts{ + return f(entity.ClusterSingleOutputDownloadInfoOpts{ JobId: cloudbrain.JobID, Path: fileRelativePath, StorageType: aiConfig.OutputStorageType, }) } -func GetAllOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq, f GetAllOutputDownloadInfoFunc) (*entity.FileDownloadInfo, error) { +func DownloadAllOutput(opts entity.DownloadAllFileReq, downloadFunc DownloadAllOutputFunc) error { cloudbrain, err := models.GetCloudbrainByCloudbrainID(opts.CloudbrainId) if err != nil { - return nil, err + return err } if cloudbrain.JobID == "" { - return nil, nil + return nil } aiConfig := GetDetailConfigInfoByCloudbrain(cloudbrain) - return f(entity.ClusterOutputDownloadInfoOpts{ + return downloadFunc(entity.DownloadOutputOpts{ JobId: cloudbrain.JobID, Path: aiConfig.OutputObjectPrefix, StorageType: aiConfig.OutputStorageType, JobName: cloudbrain.JobName, + ZIPWriter: opts.ZIPWriter, }) } -- 2.34.1