#4628 优化结果下载部分代码

Merged
zouap merged 1 commits from update-model-download into V20230808.patch 8 months ago
  1. +9
    -1
      entity/ai_task.go
  2. +9
    -1
      entity/cluster.go
  3. +7
    -17
      routers/ai_task/ai_task.go
  4. +3
    -3
      services/ai_task_service/cluster/c2net.go
  5. +3
    -3
      services/ai_task_service/cluster/cloudbrain_one.go
  6. +3
    -3
      services/ai_task_service/cluster/cloudbrain_two.go
  7. +2
    -2
      services/ai_task_service/cluster/cluster_base.go
  8. +38
    -24
      services/ai_task_service/cluster/common.go
  9. +8
    -8
      services/ai_task_service/task/task_base.go
  10. +9
    -8
      services/ai_task_service/task/task_service.go

+ 9
- 1
entity/ai_task.go View File

@@ -1,6 +1,7 @@
package entity

import (
"archive/zip"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/storage"
"encoding/json"
@@ -275,7 +276,14 @@ type GetAllOutputReq struct {
Suffix []string
}

type GetOutputDownloadInfoReq struct {
type DownloadAllFileReq struct {
CloudbrainId int64
FileName string
ParentDir string
ZIPWriter *zip.Writer
}

type GetSingleDownloadInfoReq struct {
CloudbrainId int64
FileName string
ParentDir string


+ 9
- 1
entity/cluster.go View File

@@ -1,6 +1,7 @@
package entity

import (
"archive/zip"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
"code.gitea.io/gitea/modules/log"
@@ -318,7 +319,14 @@ type ClusterLogDownloadInfoOpts struct {
DisplayJobName string
}

type ClusterOutputDownloadInfoOpts struct {
type DownloadOutputOpts struct {
JobId string
Path string
JobName string
StorageType StorageType
ZIPWriter *zip.Writer
}
type ClusterSingleOutputDownloadInfoOpts struct {
JobId string
Path string
JobName string


+ 7
- 17
routers/ai_task/ai_task.go View File

@@ -1,6 +1,7 @@
package ai_task

import (
"archive/zip"
"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/cloudbrain"
@@ -164,7 +165,7 @@ func DownloadOutputFile(ctx *context.Context) {
ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
return
}
res, err := t.GetSingleOutputDownloadInfo(entity.GetOutputDownloadInfoReq{
res, err := t.GetSingleOutputDownloadInfo(entity.GetSingleDownloadInfoReq{
CloudbrainId: id,
FileName: fileName,
ParentDir: parentDir,
@@ -204,28 +205,17 @@ func DownloadAllOutputFile(ctx *context.Context) {
ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
return
}
res, err := t.GetAllOutputDownloadInfo(entity.GetOutputDownloadInfoReq{
zipWriter := zip.NewWriter(ctx.Resp)
defer zipWriter.Close()
err = t.DownloadAllOutput(entity.DownloadAllFileReq{
CloudbrainId: id,
ZIPWriter: zipWriter,
})
if err != nil {
log.Error("GetAllOutputDownloadInfo error.%v", err)
log.Error("DownloadAllOutput error.%v", err)
ctx.JSON(http.StatusOK, response.OuterTrBizError(err, ctx))
return
}

if res == nil || res.IsEmpty() {
log.Error("DownloadAllOutputFile error.%v", err)
ctx.JSON(http.StatusNotFound, "")
return
}

tmpErr := common.WriteDownloadContent2Resp(ctx, res)
if tmpErr != nil {
log.Error("DownloadAITaskLog error.%v", tmpErr)
ctx.JSON(http.StatusOK, response.OuterResponseError(tmpErr))
return
}

}

func GetAITaskInfo(ctx *context.Context) {


+ 3
- 3
services/ai_task_service/cluster/c2net.go View File

@@ -699,7 +699,7 @@ func (c C2NetClusterAdapter) GetLogDownloadInfo(opts entity.ClusterLogDownloadIn
}, nil
}

func (c C2NetClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
func (c C2NetClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType)
url, err := helper.GetSignedDownloadUrl(opts.Path)
if err != nil {
@@ -711,8 +711,8 @@ func (c C2NetClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutp
}, nil
}

func (c C2NetClusterAdapter) GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
return GetAllOutputDownloadInfo(opts)
func (c C2NetClusterAdapter) DownloadAllOutput(opts entity.DownloadOutputOpts) error {
return DownloadAllOutput(opts)
}

func (c C2NetClusterAdapter) GetNodeInfo(opts entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error) {


+ 3
- 3
services/ai_task_service/cluster/cloudbrain_one.go View File

@@ -427,7 +427,7 @@ func (c CloudbrainOneClusterAdapter) GetLogDownloadInfo(opts entity.ClusterLogDo
}, nil
}

func (c CloudbrainOneClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
func (c CloudbrainOneClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType)
url, err := helper.GetSignedDownloadUrl(opts.Path)
if err != nil {
@@ -439,8 +439,8 @@ func (c CloudbrainOneClusterAdapter) GetSingleOutputDownloadInfo(opts entity.Clu
}, nil
}

func (c CloudbrainOneClusterAdapter) GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
return GetAllOutputDownloadInfo(opts)
func (c CloudbrainOneClusterAdapter) DownloadAllOutput(opts entity.DownloadOutputOpts) error {
return DownloadAllOutput(opts)
}

func (c CloudbrainOneClusterAdapter) GetNodeInfo(opts entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error) {


+ 3
- 3
services/ai_task_service/cluster/cloudbrain_two.go View File

@@ -766,7 +766,7 @@ func (c CloudbrainTwoClusterAdapter) GetLogDownloadInfo(opts entity.ClusterLogDo
}, nil
}

func (c CloudbrainTwoClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
func (c CloudbrainTwoClusterAdapter) GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType)
url, err := helper.GetSignedDownloadUrl(opts.Path)
if err != nil {
@@ -778,8 +778,8 @@ func (c CloudbrainTwoClusterAdapter) GetSingleOutputDownloadInfo(opts entity.Clu
}, nil
}

func (c CloudbrainTwoClusterAdapter) GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
return GetAllOutputDownloadInfo(opts)
func (c CloudbrainTwoClusterAdapter) DownloadAllOutput(opts entity.DownloadOutputOpts) error {
return DownloadAllOutput(opts)
}

func (c CloudbrainTwoClusterAdapter) GetTrainJobOperationProfile(jobId string) (*entity.OperationProfile, error) {


+ 2
- 2
services/ai_task_service/cluster/cluster_base.go View File

@@ -42,8 +42,8 @@ type ClusterAdapter interface {
GetTrainJobOperationProfile(jobId string) (*entity.OperationProfile, error)
GetOutput(opts entity.ClusterOutputOpts) (*entity.ClusterAITaskOutput, error)
GetAllOutput(opts entity.ClusterOutputOpts) (*entity.AllAITaskOutput, error)
GetSingleOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error)
GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error)
GetSingleOutputDownloadInfo(opts entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error)
DownloadAllOutput(opts entity.DownloadOutputOpts) error
GetNodeInfo(opts entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error)
GetResourceUsage(opts entity.ClusterResourceUsageOpts) (*entity.ResourceUsage, error)
//GetImages return available list of clusters


+ 38
- 24
services/ai_task_service/cluster/common.go View File

@@ -1,6 +1,7 @@
package cluster

import (
"archive/zip"
"bufio"
"code.gitea.io/gitea/entity"
"code.gitea.io/gitea/modules/log"
@@ -95,46 +96,59 @@ func getLogFilesInStorage(helper storage_helper.StorageHelper, objectKeyPrefix s
return logFiles
}

func GetAllOutputDownloadInfo(opts entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error) {
func DownloadAllOutput(opts entity.DownloadOutputOpts) error {
helper := storage_helper.SelectUploaderFromStorageType(opts.StorageType)
var err error
fileList, err := helper.GetAllObjectsUnderDir(opts.Path)
if err != nil {
log.Error("GetAllObjectsUnderDir err.objectKeyPrefix=%s,err=%v", opts.Path, err)
return nil, err
return err
}
if len(fileList) == 0 {
return nil, nil
}

res := &entity.FileDownloadInfo{
Readers: make([]entity.FileReader, 0),
ResultType: entity.FileTypeZIP,
ResultFileName: opts.JobName + ".zip",
return nil
}

defer func() {
if err != nil {
res.Close()
}
}()

for i := 0; i < len(fileList); i++ {
file := fileList[i]
if file.IsDir {
continue
}
var reader io.ReadCloser
reader, err = helper.OpenFile(file.RelativePath)
err = openAndWrite2ZIP(helper, file, opts.ZIPWriter)
if err != nil {
log.Error("GetAllOutputDownloadInfo OpenFile err.opts=%+v,err =%v", opts, err)
return nil, err
log.Error("openAndWrite2ZIP err.%v", err)
return err
}
res.Readers = append(res.Readers, entity.FileReader{
Reader: reader,
Name: file.FileName,
})
}

return res, nil
return nil
}

func openAndWrite2ZIP(helper storage_helper.StorageHelper, file storage.FileInfo, zipWriter *zip.Writer) error {
var reader io.ReadCloser
reader, err := helper.OpenFile(file.RelativePath)
if err != nil {
log.Error("openAndWrite2ZIP OpenFile err.filePath=%+v,err =%v", file.RelativePath, err)
return err
}
defer reader.Close()

fDest, err := zipWriter.Create(file.FileName)
if err != nil {
log.Error("zipWriter.Create error.%v", err)
return err
}
p := make([]byte, 1024)
var readErr error
var readCount int
// 读取对象内容
for {
readCount, readErr = reader.Read(p)
if readCount > 0 {
fDest.Write(p[:readCount])
}
if readErr != nil {
break
}
}
return nil
}

+ 8
- 8
services/ai_task_service/task/task_base.go View File

@@ -53,8 +53,8 @@ type AITaskTemplate interface {
Update(cloudbrainId int64) *response.BizError
GetLog(opts entity.QueryLogOpts) (*entity.ClusterLog, *response.BizError)
GetLogDownloadInfo(opts entity.GetLogDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError)
GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError)
GetAllOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError)
GetSingleOutputDownloadInfo(opts entity.GetSingleDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError)
DownloadAllOutput(opts entity.DownloadAllFileReq) *response.BizError
GetOutput(cloudbrainId int64, parentDir string) (*entity.AITaskOutput, *response.BizError)
GetAllOutput(opts entity.GetAllOutputReq) (*entity.AllAITaskOutput, *response.BizError)
GetDebugUrl(cloudbrainId int64, fileName ...string) (string, *response.BizError)
@@ -346,7 +346,7 @@ func (g DefaultAITaskTemplate) GetLogDownloadInfo(opts entity.GetLogDownloadInfo
return s, nil
}

func (g DefaultAITaskTemplate) GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) {
func (g DefaultAITaskTemplate) GetSingleOutputDownloadInfo(opts entity.GetSingleDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) {
c := g.GetMyCluster()
if c == nil {
log.Error("Get cluster failed,cloudbrainId=%d", opts)
@@ -361,19 +361,19 @@ func (g DefaultAITaskTemplate) GetSingleOutputDownloadInfo(opts entity.GetOutput
return s, nil
}

func (g DefaultAITaskTemplate) GetAllOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq) (*entity.FileDownloadInfo, *response.BizError) {
func (g DefaultAITaskTemplate) DownloadAllOutput(opts entity.DownloadAllFileReq) *response.BizError {
c := g.GetMyCluster()
if c == nil {
log.Error("Get cluster failed,cloudbrainId=%d", opts)
return nil, response.SYSTEM_ERROR
return response.SYSTEM_ERROR
}
s, err := GetAllOutputDownloadInfo(opts, c.GetAllOutputDownloadInfo)
err := DownloadAllOutput(opts, c.DownloadAllOutput)
if err != nil {
log.Error("GetOutputDownloadInfo err.cloudbrainId=%d ", opts)
return nil, nil
return nil
}

return s, nil
return nil
}

func (g DefaultAITaskTemplate) GetOutput(cloudbrainId int64, parentDir string) (*entity.AITaskOutput, *response.BizError) {


+ 9
- 8
services/ai_task_service/task/task_service.go View File

@@ -36,8 +36,8 @@ type GetNotebookUrlFunc func(string) (string, error)
type GetNodeInfoFunc func(entity.ClusterNodeInfoOpts) ([]entity.AITaskNodeInfo, error)
type GetOutputFunc func(entity.ClusterOutputOpts) (*entity.ClusterAITaskOutput, error)
type GetAllOutputFunc func(entity.ClusterOutputOpts) (*entity.AllAITaskOutput, error)
type GetSingleOutputDownloadInfoFunc func(req entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error)
type GetAllOutputDownloadInfoFunc func(req entity.ClusterOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error)
type GetSingleOutputDownloadInfoFunc func(req entity.ClusterSingleOutputDownloadInfoOpts) (*entity.FileDownloadInfo, error)
type DownloadAllOutputFunc func(req entity.DownloadOutputOpts) error
type GetOperationProfileFunc func(string) (*entity.OperationProfile, error)
type GetResourceUsageFunc func(entity.ClusterResourceUsageOpts) (*entity.ResourceUsage, error)

@@ -430,7 +430,7 @@ func GetLogDownloadInfo(opts entity.GetLogDownloadInfoReq, getLogDownloadInfo Ge
})
}

func GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq, f GetSingleOutputDownloadInfoFunc) (*entity.FileDownloadInfo, error) {
func GetSingleOutputDownloadInfo(opts entity.GetSingleDownloadInfoReq, f GetSingleOutputDownloadInfoFunc) (*entity.FileDownloadInfo, error) {
cloudbrain, err := models.GetCloudbrainByCloudbrainID(opts.CloudbrainId)
if err != nil {
return nil, err
@@ -440,27 +440,28 @@ func GetSingleOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq, f GetSing
}
aiConfig := GetDetailConfigInfoByCloudbrain(cloudbrain)
fileRelativePath := path.Join(aiConfig.OutputObjectPrefix, opts.ParentDir, opts.FileName)
return f(entity.ClusterOutputDownloadInfoOpts{
return f(entity.ClusterSingleOutputDownloadInfoOpts{
JobId: cloudbrain.JobID,
Path: fileRelativePath,
StorageType: aiConfig.OutputStorageType,
})
}

func GetAllOutputDownloadInfo(opts entity.GetOutputDownloadInfoReq, f GetAllOutputDownloadInfoFunc) (*entity.FileDownloadInfo, error) {
func DownloadAllOutput(opts entity.DownloadAllFileReq, downloadFunc DownloadAllOutputFunc) error {
cloudbrain, err := models.GetCloudbrainByCloudbrainID(opts.CloudbrainId)
if err != nil {
return nil, err
return err
}
if cloudbrain.JobID == "" {
return nil, nil
return nil
}
aiConfig := GetDetailConfigInfoByCloudbrain(cloudbrain)
return f(entity.ClusterOutputDownloadInfoOpts{
return downloadFunc(entity.DownloadOutputOpts{
JobId: cloudbrain.JobID,
Path: aiConfig.OutputObjectPrefix,
StorageType: aiConfig.OutputStorageType,
JobName: cloudbrain.JobName,
ZIPWriter: opts.ZIPWriter,
})
}



Loading…
Cancel
Save