#4701 v20230828patchmerge

Merged
ychao_1983 merged 6 commits from v20230828patchmerge into V20230912 7 months ago
  1. +5
    -5
      models/models.go
  2. +71
    -0
      models/resource_queue.go
  3. +81
    -0
      models/resource_specification.go
  4. +2
    -0
      modules/setting/setting.go
  5. +3
    -0
      routers/api/v1/api.go
  6. +2
    -2
      routers/api/v1/repo/attachments.go
  7. +7
    -0
      services/ai_task_service/schedule/model_schedule.go
  8. +9
    -1
      services/ai_task_service/task/task_extend.go
  9. +4
    -0
      services/cloudbrain/resource/resource_specification.go
  10. +31
    -0
      services/memory/pprof.go

+ 5
- 5
models/models.go View File

@@ -242,11 +242,11 @@ func setEngine(engine *xorm.Engine, table []interface{}, database *setting.DBInf
engine.SetMapper(names.GonicMapper{})
// WARNING: for serv command, MUST remove the output to os.stdout,
// so use log file to instead print to stdout.
engine.SetLogger(NewXORMLogger(setting.Database.LogSQL))
engine.ShowSQL(setting.Database.LogSQL)
engine.SetMaxOpenConns(setting.Database.MaxOpenConns)
engine.SetMaxIdleConns(setting.Database.MaxIdleConns)
engine.SetConnMaxLifetime(setting.Database.ConnMaxLifetime)
engine.SetLogger(NewXORMLogger(database.LogSQL))
engine.ShowSQL(database.LogSQL)
engine.SetMaxOpenConns(database.MaxOpenConns)
engine.SetMaxIdleConns(database.MaxIdleConns)
engine.SetConnMaxLifetime(database.ConnMaxLifetime)
engine.Sync2(table...)

return nil


+ 71
- 0
models/resource_queue.go View File

@@ -1,7 +1,10 @@
package models

import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/timeutil"
"encoding/json"
"errors"
"strconv"
"strings"
@@ -360,3 +363,71 @@ func GetResourceAiCenters() ([]ResourceAiCenterRes, error) {
}
return r, nil
}

type SpecificationSpecialQueueConfig struct {
SpecialQueues []SpecialQueue `json:"special_queues"`
}

type SpecialQueue struct {
OrgName string `json:"org_name"`
JobType string `json:"job_type"`
Cluster string `json:"cluster"`
QueueId int64 `json:"queue_id"`
ComputeResource string `json:"compute_resource"`
}

var specialQueueConfig SpecificationSpecialQueueConfig
var specialQueueConfigFlag = false

func GetSpecialQueueConfig() SpecificationSpecialQueueConfig {
if !specialQueueConfigFlag {
if err := json.Unmarshal([]byte(setting.SPECIFICATION_SPECIAL_QUEUE), &specialQueueConfig); err != nil {
log.Error("json.Unmarshal specialQueueConfig error.%v", err)
}
specialQueueConfigFlag = true
}
return specialQueueConfig
}

func GetSpecialQueueIds(opts FindSpecsOptions) []SpecialQueue {
config := GetSpecialQueueConfig()
if len(config.SpecialQueues) == 0 {
return []SpecialQueue{}
}

queues := make([]SpecialQueue, 0)
for _, queue := range config.SpecialQueues {
if queue.JobType != string(opts.JobType) {
continue
}

if queue.Cluster != opts.Cluster {
continue
}
if queue.ComputeResource != opts.ComputeResource {
continue
}
queues = append(queues, queue)
}
return queues
}

func IsUserInSpecialPool(userId int64) bool {
userOrgs, err := GetOrgsByUserID(userId, true)
if err != nil {
log.Error("GetSpecialQueueIds GetOrgsByUserID error.%v", err)
return false
}
config := GetSpecialQueueConfig()
if len(config.SpecialQueues) == 0 {
return false
}
for _, org := range userOrgs {
for _, queue := range config.SpecialQueues {
if strings.ToLower(org.Name) == strings.ToLower(queue.OrgName) {
return true
}
}
}
return false
}

+ 81
- 0
models/resource_specification.go View File

@@ -1,6 +1,7 @@
package models

import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
"fmt"
"strings"
@@ -312,6 +313,12 @@ func (s *Specification) GetAvailableCenterIds(opts GetAvailableCenterIdOpts) []s
//filter exclusive specs
specs := FilterExclusiveSpecs(s.RelatedSpecs, opts.UserId)

specs = HandleSpecialQueues(specs, opts.UserId, FindSpecsOptions{
JobType: opts.JobType,
Cluster: s.Cluster,
ComputeResource: s.ComputeResource,
})

centerIds := make([]string, len(specs))
for i, v := range specs {
centerIds[i] = v.AiCenterCode
@@ -348,6 +355,80 @@ func FilterExclusiveSpecs(r []*Specification, userId int64) []*Specification {
return specs
}

func GetSpecsInQueue(r []*Specification, queueIds []int64) []*Specification {
specs := make([]*Specification, 0, len(r))
for i := 0; i < len(r); i++ {
spec := r[i]
for _, queueId := range queueIds {
if spec.QueueId == queueId {
specs = append(specs, spec)
break
}
}
}
return specs
}

func GetSpecsNotInQueue(r []*Specification, queueIds []int64) []*Specification {
specs := make([]*Specification, 0, len(r))
for i := 0; i < len(r); i++ {
spec := r[i]
flag := false
for _, queueId := range queueIds {
if spec.QueueId == queueId {
flag = true
break
}
}
if !flag {
specs = append(specs, spec)
}
}
return specs
}

func HandleSpecialQueues(specs []*Specification, userId int64, opts FindSpecsOptions) []*Specification {
if len(specs) == 0 {
return specs
}
isUserInSpecialPool := IsUserInSpecialPool(userId)
if isUserInSpecialPool {
specs = handleSpecialUserSpecs(specs, userId, opts)
} else {
specs = handleNormalUserSpecs(specs, opts)
}
return specs
}

func handleSpecialUserSpecs(specs []*Specification, userId int64, opts FindSpecsOptions) []*Specification {
specialQueues := GetSpecialQueueIds(opts)
userOrgs, err := GetOrgsByUserID(userId, true)
if err != nil {
log.Error("handleSpecialUserSpecs GetOrgsByUserID error.%v", err)
return []*Specification{}
}
userQueueIds := make([]int64, 0)
for _, org := range userOrgs {
for _, queue := range specialQueues {
if strings.ToLower(org.Name) == strings.ToLower(queue.OrgName) {
userQueueIds = append(userQueueIds, queue.QueueId)
}
}
}
specs = GetSpecsInQueue(specs, userQueueIds)
return specs
}

func handleNormalUserSpecs(specs []*Specification, opts FindSpecsOptions) []*Specification {
queues := GetSpecialQueueIds(opts)
queueIds := make([]int64, 0)
for _, queue := range queues {
queueIds = append(queueIds, queue.QueueId)
}
specs = GetSpecsNotInQueue(specs, queueIds)
return specs
}

func DistinctSpecs(r []*Specification) []*Specification {
specs := make([]*Specification, 0, len(r))
sourceSpecIdMap := make(map[string]string, 0)


+ 2
- 0
modules/setting/setting.go View File

@@ -728,6 +728,7 @@ var (
PREPARING_MAX_WAIT_DURATION time.Duration
OUTPUT_SHOW_MAX_KEY int
OUTPUT_DOWNLOAD_MAX_KEY int
SPECIFICATION_SPECIAL_QUEUE string

//wenxin url
BaiduWenXin = struct {
@@ -1579,6 +1580,7 @@ func NewContext() {
PREPARING_MAX_WAIT_DURATION = sec.Key("ENABLED").MustDuration(15 * time.Minute)
OUTPUT_SHOW_MAX_KEY = sec.Key("OUTPUT_SHOW_MAX_KEY").MustInt(100)
OUTPUT_DOWNLOAD_MAX_KEY = sec.Key("OUTPUT_DOWNLOAD_MAX_KEY").MustInt(1000)
SPECIFICATION_SPECIAL_QUEUE = sec.Key("SPECIFICATION_SPECIAL_QUEUE").MustString("{}")

sec = Cfg.Section("benchmark")
IsBenchmarkEnabled = sec.Key("ENABLED").MustBool(false)


+ 3
- 0
routers/api/v1/api.go View File

@@ -63,6 +63,8 @@ import (
"net/http"
"strings"

"code.gitea.io/gitea/services/memory"

"code.gitea.io/gitea/routers/response"

"code.gitea.io/gitea/entity"
@@ -682,6 +684,7 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Post("/markdown/raw", misc.MarkdownRaw)

m.Group("/monitor", func() {
m.Get("/pprof", HasRole(models.MonitorAdmin), memory.PprofMemory)
m.Group("/:username/:reponame", func() {
m.Group("/ai_task", func() {
m.Get("", HasRole(models.MonitorAdmin), ai_task.GetAITaskInfo)


+ 2
- 2
routers/api/v1/repo/attachments.go View File

@@ -40,12 +40,12 @@ func checkDatasetPermission(ctx *context.APIContext) string {
if err != nil {
log.Warn("can not find repo permission for user", err)
return "dataset.query_dataset_fail"
} else {
log.Info("permission.AccessMode=" + string(permission.AccessMode))
}

if permission.AccessMode >= models.AccessModeAdmin {
return ""
}

if !permission.CanWrite(models.UnitTypeDatasets) {
return "error.no_right"
}


+ 7
- 0
services/ai_task_service/schedule/model_schedule.go View File

@@ -123,6 +123,13 @@ func HandleUnfinishedMigrateRecords() {
}

func HandleUnfinishedMigrateRecord(r *models.ModelMigrateRecord) error {
defer func() {
if err := recover(); err != nil {
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
log.Error("PANIC:", combinedErr)
}
}()

cloudbrain, err := models.GetCloudbrainByID(fmt.Sprint(r.CloudbrainID))
if err != nil {
log.Error("GetCloudbrainByID err. %v", err)


+ 9
- 1
services/ai_task_service/task/task_extend.go View File

@@ -174,7 +174,15 @@ func correctAITaskSpec(task *models.Cloudbrain) {
log.Error("correctAITaskSpec FindSpecs 0.taskId=%d ", task.ID)
return
}
n, err := models.UpdateCloudbrainSpec(task.ID, r[0])
var newestQueue = r[0]
if len(r) > 1 {
for _, s := range r {
if s.QueueId > newestQueue.QueueId {
newestQueue = s
}
}
}
n, err := models.UpdateCloudbrainSpec(task.ID, newestQueue)
if err == nil && n > 0 {
log.Info("correctAITaskSpec success,taskId=%d oldCenter=%s realCenter=%s", task.ID, oldSpec.AiCenterCode, realCenterCode)
}


+ 4
- 0
services/cloudbrain/resource/resource_specification.go View File

@@ -263,8 +263,12 @@ func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.S
//filter exclusive specs
specs := models.FilterExclusiveSpecs(r, userId)

//filter special queue
specs = models.HandleSpecialQueues(specs, userId, opts)

//distinct by sourceSpecId
specs = models.DistinctSpecs(specs)

return specs, err
}



+ 31
- 0
services/memory/pprof.go View File

@@ -0,0 +1,31 @@
package memory

import (
"net/http"
"os"
"runtime"
"runtime/pprof"
"time"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/log"

"code.gitea.io/gitea/modules/setting"
)

func PprofMemory(ctx *context.Context) {
t := time.Now()
file := setting.LogRootPath + "/" + "heap" + t.Format("20060102150405")
f, err := os.Create(file)
if err != nil {
log.Error("", err)
ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("fail to create pprof file."))
return
}
defer f.Close()
runtime.GC() // get up-to-date statistics
pprof.WriteHeapProfile(f)
ctx.JSON(http.StatusOK, models.BaseOKMessage)

}

Loading…
Cancel
Save