#4770 模型SDK流量控制功能合入。

Merged
ychao_1983 merged 6 commits from zouap_dev into V20231018 7 months ago
  1. +20
    -0
      models/file_chunk.go
  2. +32
    -1
      routers/api/v1/repo/attachments.go
  3. +30
    -22
      routers/repo/attachment_model.go
  4. +99
    -1
      routers/repo/flow_control.go

+ 20
- 0
models/file_chunk.go View File

@@ -145,6 +145,26 @@ func getModelFileChunkByUUID(e Engine, uuid string) (*ModelFileChunk, error) {
return fileChunk, nil
}

func GetModelFileChunksByUserId(userId int64, lastTime int64, isUploadFinished bool) ([]*ModelFileChunk, error) {
return getModelFileChunksByUserId(x, userId, lastTime, isUploadFinished)
}

func getModelFileChunksByUserId(e Engine, userId int64, lastTime int64, isUploadFinished bool) ([]*ModelFileChunk, error) {
fileChunks := make([]*ModelFileChunk, 0)
cond := builder.NewCond()
cond = cond.And(builder.Eq{"user_id": userId})
if lastTime > 0 {
cond = cond.And(builder.Gte{"created_unix": lastTime})
}
if !isUploadFinished {
cond = cond.And(builder.Eq{"is_uploaded": 0})
}
if err := e.Where(cond).Find(&fileChunks); err != nil {
return nil, err
}
return fileChunks, nil
}

// InsertFileChunk insert a record into file_chunk.
func InsertFileChunk(fileChunk *FileChunk) (_ *FileChunk, err error) {
if _, err := x.Insert(fileChunk); err != nil {


+ 32
- 1
routers/api/v1/repo/attachments.go View File

@@ -2,6 +2,7 @@ package repo

import (
"net/http"
"strings"
"sync"

"code.gitea.io/gitea/modules/log"
@@ -12,6 +13,7 @@ import (
)

var mutex *sync.Mutex = new(sync.Mutex)
var modelMutex *sync.Mutex = new(sync.Mutex)

func GetSuccessChunks(ctx *context.APIContext) {
if errStr := checkDatasetPermission(ctx); errStr != "" {
@@ -146,7 +148,27 @@ func NewModelMultipart(ctx *context.APIContext) {
return
}

routeRepo.NewModelMultipart(ctx.Context)
if err := routeRepo.CheckFlowForModelSDK(); err != nil {
ctx.JSON(200, map[string]string{
"result_code": "-1",
"msg": err.Error(),
})
return
}
modelMutex.Lock()
defer modelMutex.Unlock()
fileName := ctx.Query("file_name")
re, err := routeRepo.NewModelMultipartForApi(ctx.Context, true)
if err != nil {
ctx.JSON(200, map[string]string{
"result_code": "-1",
"msg": err.Error(),
})
} else {
routeRepo.AddModelFileNameToCache(modeluuid, fileName, ctx.User.ID)
re["result_code"] = "0"
ctx.JSON(200, re)
}
}

func checkModelPermission(ctx *context.APIContext, model *models.AiModelManage) string {
@@ -178,5 +200,14 @@ func GetModelMultipartUploadUrl(ctx *context.APIContext) {

func CompleteModelMultipart(ctx *context.APIContext) {
log.Info("CompleteModelMultipart by api.")
modeluuid := ctx.Query("modeluuid")
//fileName := ctx.Query("file_name")
uuid := ctx.Query("uuid")
fileChunk, err := models.GetModelFileChunkByUUID(uuid)
if err == nil {
log.Info("fileChunk.ObjectName=" + fileChunk.ObjectName)
objectNames := strings.Split(fileChunk.ObjectName, "/")
routeRepo.RemoveModelFileFromCache(modeluuid, objectNames[len(objectNames)-1], ctx.User.ID)
}
routeRepo.CompleteModelMultipart(ctx.Context)
}

+ 30
- 22
routers/repo/attachment_model.go View File

@@ -1,6 +1,7 @@
package repo

import (
"errors"
"fmt"
"path"
"strconv"
@@ -142,37 +143,48 @@ func getObjectName(filename string, modeluuid string) string {
}

func NewModelMultipart(ctx *context.Context) {
if !setting.Attachment.Enabled {
ctx.Error(404, "attachment is not enabled")
re, err := NewModelMultipartForApi(ctx, false)
if err != nil {
ctx.ServerError("NewMultipart failed", err)
return
}
ctx.JSON(200, re)
}

func NewModelMultipartForApi(ctx *context.Context, isFlowControl bool) (map[string]string, error) {
if !setting.Attachment.Enabled {
return nil, errors.New("attachment is not enabled")
}
fileName := ctx.Query("file_name")
modeluuid := ctx.Query("modeluuid")

err := upload.VerifyFileType(ctx.Query("fileType"), strings.Split(setting.Attachment.AllowedTypes, ","))
if err != nil {
ctx.Error(400, err.Error())
return
return nil, err
}
if isFlowControl {
err = CheckFlowForModel(ctx)
if err != nil {
log.Info("check error," + err.Error())
return nil, err
}
}

typeCloudBrain := ctx.QueryInt("type")
err = checkTypeCloudBrain(typeCloudBrain)
if err != nil {
ctx.ServerError("checkTypeCloudBrain failed", err)
return
return nil, err
}

if setting.Attachment.StoreType == storage.MinioStorageType {
totalChunkCounts := ctx.QueryInt("totalChunkCounts")
if totalChunkCounts > minio_ext.MaxPartsCount {
ctx.Error(400, fmt.Sprintf("chunk counts(%d) is too much", totalChunkCounts))
return
return nil, errors.New(fmt.Sprintf("chunk counts(%d) is too much", totalChunkCounts))
}

fileSize := ctx.QueryInt64("size")
if fileSize > minio_ext.MaxMultipartPutObjectSize {
ctx.Error(400, fmt.Sprintf("file size(%d) is too big", fileSize))
return
return nil, errors.New(fmt.Sprintf("file size(%d) is too big", fileSize))
}

uuid := gouuid.NewV4().String()
@@ -182,16 +194,14 @@ func NewModelMultipart(ctx *context.Context) {
objectName = strings.TrimPrefix(path.Join(Model_prefix, path.Join(modeluuid[0:1], modeluuid[1:2], modeluuid, fileName)), "/")
uploadID, err = storage.NewMultiPartUpload(objectName)
if err != nil {
ctx.ServerError("NewMultipart", err)
return
return nil, err
}
} else {

objectName = strings.TrimPrefix(path.Join(Model_prefix, path.Join(modeluuid[0:1], modeluuid[1:2], modeluuid, fileName)), "/")
uploadID, err = storage.NewObsMultiPartUpload(objectName)
if err != nil {
ctx.ServerError("NewObsMultiPartUpload", err)
return
return nil, err
}
}

@@ -208,17 +218,15 @@ func NewModelMultipart(ctx *context.Context) {
})

if err != nil {
ctx.Error(500, fmt.Sprintf("InsertFileChunk: %v", err))
return
}
return nil, err

ctx.JSON(200, map[string]string{
}
return map[string]string{
"uuid": uuid,
"uploadID": uploadID,
})
}, nil
} else {
ctx.Error(404, "storage type is not enabled")
return
return nil, errors.New("storage type is not enabled")
}
}



+ 99
- 1
routers/repo/flow_control.go View File

@@ -16,10 +16,12 @@ import (
)

const (
REDIS_FLOW_ATTACHMENT_KEY = "flow_attachment_key"
REDIS_FLOW_ATTACHMENT_KEY = "flow_attachment_key"
REDIS_FLOW_MODEL_ATTACHMENT_KEY = "flow_model_attachment_key"
)

var mutex *sync.RWMutex = new(sync.RWMutex)
var modelMutex *sync.Mutex = new(sync.Mutex)

func CheckFlowForDataset(ctx *context.Context) error {
if ctx.User == nil {
@@ -86,6 +88,31 @@ func AddFileNameToCache(datasetId int64, fileName string, userId int64) {
setSDKUploadFileCache(REDIS_FLOW_ATTACHMENT_KEY, cacheMap)
}

func AddModelFileNameToCache(modelId string, fileName string, userId int64) {
modelMutex.Lock()
defer modelMutex.Unlock()
cacheMap := getSDKUploadFileMap(REDIS_FLOW_MODEL_ATTACHMENT_KEY)
expireTimeKeys := make([]string, 0)
currentTime := time.Now().Unix()
for tmpKey, tmpValue := range cacheMap {
time, err := strconv.ParseInt(tmpValue, 10, 64)
if err == nil {
if currentTime-time > 24*3600 {
expireTimeKeys = append(expireTimeKeys, tmpKey)
continue
}
}
}
for _, delKey := range expireTimeKeys {
delete(cacheMap, delKey)
}
key := modelId + "_" + fileName + "_" + fmt.Sprint(userId)
value := fmt.Sprint(time.Now().Unix())
cacheMap[key] = value
log.Info("set key=" + key + " value=" + value + " to cache.")
setSDKUploadFileCache(REDIS_FLOW_MODEL_ATTACHMENT_KEY, cacheMap)
}

func RemoveFileFromCache(datasetId int64, fileName string, userId int64) {
mutex.Lock()
defer mutex.Unlock()
@@ -96,6 +123,16 @@ func RemoveFileFromCache(datasetId int64, fileName string, userId int64) {
setSDKUploadFileCache(REDIS_FLOW_ATTACHMENT_KEY, cacheMap)
}

func RemoveModelFileFromCache(modelId string, fileName string, userId int64) {
modelMutex.Lock()
defer modelMutex.Unlock()
key := modelId + "_" + fileName + "_" + fmt.Sprint(userId)
cacheMap := getSDKUploadFileMap(REDIS_FLOW_MODEL_ATTACHMENT_KEY)
delete(cacheMap, key)
log.Info("remove key=" + key + " from cache.")
setSDKUploadFileCache(REDIS_FLOW_MODEL_ATTACHMENT_KEY, cacheMap)
}

func getSDKUploadFileMap(msgKey string) map[string]string {
valueStr, err := redis_client.Get(msgKey)
msgMap := make(map[string]string, 0)
@@ -144,3 +181,64 @@ func CheckFlowForDatasetSDK() error {
}
return nil
}

func CheckFlowForModelSDK() error {
cacheMap := getSDKUploadFileMap(REDIS_FLOW_MODEL_ATTACHMENT_KEY)
currentTime := time.Now().Unix()
count := 0
for _, tmpValue := range cacheMap {
time, err := strconv.ParseInt(tmpValue, 10, 64)
if err == nil {
if currentTime-time > 24*3600 {
continue
}
}
count += 1
}
log.Info("total find " + fmt.Sprint(count) + " uploading files.")
if count >= setting.FLOW_CONTROL.ALL_ATTACHEMENT_NUM_SDK {
log.Info("The number of model files uploaded using the SDK simultaneously cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ALL_ATTACHEMENT_NUM_SDK))
return errors.New("The number of model files uploaded using the SDK simultaneously cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ALL_ATTACHEMENT_NUM_SDK))
}
return nil
}

func CheckFlowForModel(ctx *context.Context) error {
if ctx.User == nil {
return errors.New("User not login.")
}
log.Info("start to check flow for upload model file.")
fileName := ctx.Query("file_name")
currentTimeNow := time.Now()
currentLongTime := currentTimeNow.Unix()
last24Hour := currentTimeNow.AddDate(0, 0, -1).Unix()
filechunks, err := models.GetModelFileChunksByUserId(ctx.User.ID, last24Hour, true)
if err == nil {
if len(filechunks) >= setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST24HOUR {
log.Info("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST24HOUR) + " files within the last 24 hours. so " + fileName + " is rejected. user id=" + fmt.Sprint(ctx.User.ID))
return errors.New("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST24HOUR) + " files within the last 24 hours.")
}
var totalSize int64
totalSize += ctx.QueryInt64("size")
concurrentUpload := 0
for _, file := range filechunks {
totalSize += file.Size
if (currentLongTime - int64(file.CreatedUnix)) < 10*60 {
log.Info("the file " + file.Md5 + " in 10min upload." + file.CreatedUnix.Format("2006-01-02 15:04:05"))
concurrentUpload += 1
} else {
log.Info("the file " + file.Md5 + " not in 10min upload." + file.CreatedUnix.Format("2006-01-02 15:04:05"))
}
}
log.Info("The concurrentUpload is " + fmt.Sprint(concurrentUpload) + " to checked " + fileName + ". user id=" + fmt.Sprint(ctx.User.ID))
if concurrentUpload >= setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST10M {
log.Info("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST10M) + " files within the past 10 minutes. so " + fileName + " is rejected. user id=" + fmt.Sprint(ctx.User.ID))
return errors.New("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST10M) + " files within the past 10 minutes.")
}
if totalSize >= setting.FLOW_CONTROL.ATTACHEMENT_SIZE_A_USER*1024*1024*1024 {
log.Info("The total file size uploaded by a single user within the past 24 hours cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_SIZE_A_USER) + "G. so " + fileName + " is rejected. user id=" + fmt.Sprint(ctx.User.ID))
return errors.New("The total file size uploaded by a single user within the past 24 hours cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_SIZE_A_USER) + "G.")
}
}
return nil
}

Loading…
Cancel
Save