@@ -145,6 +145,26 @@ func getModelFileChunkByUUID(e Engine, uuid string) (*ModelFileChunk, error) { | |||
return fileChunk, nil | |||
} | |||
func GetModelFileChunksByUserId(userId int64, lastTime int64, isUploadFinished bool) ([]*ModelFileChunk, error) { | |||
return getModelFileChunksByUserId(x, userId, lastTime, isUploadFinished) | |||
} | |||
func getModelFileChunksByUserId(e Engine, userId int64, lastTime int64, isUploadFinished bool) ([]*ModelFileChunk, error) { | |||
fileChunks := make([]*ModelFileChunk, 0) | |||
cond := builder.NewCond() | |||
cond = cond.And(builder.Eq{"user_id": userId}) | |||
if lastTime > 0 { | |||
cond = cond.And(builder.Gte{"created_unix": lastTime}) | |||
} | |||
if !isUploadFinished { | |||
cond = cond.And(builder.Eq{"is_uploaded": 0}) | |||
} | |||
if err := e.Where(cond).Find(&fileChunks); err != nil { | |||
return nil, err | |||
} | |||
return fileChunks, nil | |||
} | |||
// InsertFileChunk insert a record into file_chunk. | |||
func InsertFileChunk(fileChunk *FileChunk) (_ *FileChunk, err error) { | |||
if _, err := x.Insert(fileChunk); err != nil { | |||
@@ -2,6 +2,7 @@ package repo | |||
import ( | |||
"net/http" | |||
"strings" | |||
"sync" | |||
"code.gitea.io/gitea/modules/log" | |||
@@ -12,6 +13,7 @@ import ( | |||
) | |||
var mutex *sync.Mutex = new(sync.Mutex) | |||
var modelMutex *sync.Mutex = new(sync.Mutex) | |||
func GetSuccessChunks(ctx *context.APIContext) { | |||
if errStr := checkDatasetPermission(ctx); errStr != "" { | |||
@@ -146,7 +148,27 @@ func NewModelMultipart(ctx *context.APIContext) { | |||
return | |||
} | |||
routeRepo.NewModelMultipart(ctx.Context) | |||
if err := routeRepo.CheckFlowForModelSDK(); err != nil { | |||
ctx.JSON(200, map[string]string{ | |||
"result_code": "-1", | |||
"msg": err.Error(), | |||
}) | |||
return | |||
} | |||
modelMutex.Lock() | |||
defer modelMutex.Unlock() | |||
fileName := ctx.Query("file_name") | |||
re, err := routeRepo.NewModelMultipartForApi(ctx.Context, true) | |||
if err != nil { | |||
ctx.JSON(200, map[string]string{ | |||
"result_code": "-1", | |||
"msg": err.Error(), | |||
}) | |||
} else { | |||
routeRepo.AddModelFileNameToCache(modeluuid, fileName, ctx.User.ID) | |||
re["result_code"] = "0" | |||
ctx.JSON(200, re) | |||
} | |||
} | |||
func checkModelPermission(ctx *context.APIContext, model *models.AiModelManage) string { | |||
@@ -178,5 +200,14 @@ func GetModelMultipartUploadUrl(ctx *context.APIContext) { | |||
func CompleteModelMultipart(ctx *context.APIContext) { | |||
log.Info("CompleteModelMultipart by api.") | |||
modeluuid := ctx.Query("modeluuid") | |||
//fileName := ctx.Query("file_name") | |||
uuid := ctx.Query("uuid") | |||
fileChunk, err := models.GetModelFileChunkByUUID(uuid) | |||
if err == nil { | |||
log.Info("fileChunk.ObjectName=" + fileChunk.ObjectName) | |||
objectNames := strings.Split(fileChunk.ObjectName, "/") | |||
routeRepo.RemoveModelFileFromCache(modeluuid, objectNames[len(objectNames)-1], ctx.User.ID) | |||
} | |||
routeRepo.CompleteModelMultipart(ctx.Context) | |||
} |
@@ -1,6 +1,7 @@ | |||
package repo | |||
import ( | |||
"errors" | |||
"fmt" | |||
"path" | |||
"strconv" | |||
@@ -142,37 +143,48 @@ func getObjectName(filename string, modeluuid string) string { | |||
} | |||
func NewModelMultipart(ctx *context.Context) { | |||
if !setting.Attachment.Enabled { | |||
ctx.Error(404, "attachment is not enabled") | |||
re, err := NewModelMultipartForApi(ctx, false) | |||
if err != nil { | |||
ctx.ServerError("NewMultipart failed", err) | |||
return | |||
} | |||
ctx.JSON(200, re) | |||
} | |||
func NewModelMultipartForApi(ctx *context.Context, isFlowControl bool) (map[string]string, error) { | |||
if !setting.Attachment.Enabled { | |||
return nil, errors.New("attachment is not enabled") | |||
} | |||
fileName := ctx.Query("file_name") | |||
modeluuid := ctx.Query("modeluuid") | |||
err := upload.VerifyFileType(ctx.Query("fileType"), strings.Split(setting.Attachment.AllowedTypes, ",")) | |||
if err != nil { | |||
ctx.Error(400, err.Error()) | |||
return | |||
return nil, err | |||
} | |||
if isFlowControl { | |||
err = CheckFlowForModel(ctx) | |||
if err != nil { | |||
log.Info("check error," + err.Error()) | |||
return nil, err | |||
} | |||
} | |||
typeCloudBrain := ctx.QueryInt("type") | |||
err = checkTypeCloudBrain(typeCloudBrain) | |||
if err != nil { | |||
ctx.ServerError("checkTypeCloudBrain failed", err) | |||
return | |||
return nil, err | |||
} | |||
if setting.Attachment.StoreType == storage.MinioStorageType { | |||
totalChunkCounts := ctx.QueryInt("totalChunkCounts") | |||
if totalChunkCounts > minio_ext.MaxPartsCount { | |||
ctx.Error(400, fmt.Sprintf("chunk counts(%d) is too much", totalChunkCounts)) | |||
return | |||
return nil, errors.New(fmt.Sprintf("chunk counts(%d) is too much", totalChunkCounts)) | |||
} | |||
fileSize := ctx.QueryInt64("size") | |||
if fileSize > minio_ext.MaxMultipartPutObjectSize { | |||
ctx.Error(400, fmt.Sprintf("file size(%d) is too big", fileSize)) | |||
return | |||
return nil, errors.New(fmt.Sprintf("file size(%d) is too big", fileSize)) | |||
} | |||
uuid := gouuid.NewV4().String() | |||
@@ -182,16 +194,14 @@ func NewModelMultipart(ctx *context.Context) { | |||
objectName = strings.TrimPrefix(path.Join(Model_prefix, path.Join(modeluuid[0:1], modeluuid[1:2], modeluuid, fileName)), "/") | |||
uploadID, err = storage.NewMultiPartUpload(objectName) | |||
if err != nil { | |||
ctx.ServerError("NewMultipart", err) | |||
return | |||
return nil, err | |||
} | |||
} else { | |||
objectName = strings.TrimPrefix(path.Join(Model_prefix, path.Join(modeluuid[0:1], modeluuid[1:2], modeluuid, fileName)), "/") | |||
uploadID, err = storage.NewObsMultiPartUpload(objectName) | |||
if err != nil { | |||
ctx.ServerError("NewObsMultiPartUpload", err) | |||
return | |||
return nil, err | |||
} | |||
} | |||
@@ -208,17 +218,15 @@ func NewModelMultipart(ctx *context.Context) { | |||
}) | |||
if err != nil { | |||
ctx.Error(500, fmt.Sprintf("InsertFileChunk: %v", err)) | |||
return | |||
} | |||
return nil, err | |||
ctx.JSON(200, map[string]string{ | |||
} | |||
return map[string]string{ | |||
"uuid": uuid, | |||
"uploadID": uploadID, | |||
}) | |||
}, nil | |||
} else { | |||
ctx.Error(404, "storage type is not enabled") | |||
return | |||
return nil, errors.New("storage type is not enabled") | |||
} | |||
} | |||
@@ -16,10 +16,12 @@ import ( | |||
) | |||
const ( | |||
REDIS_FLOW_ATTACHMENT_KEY = "flow_attachment_key" | |||
REDIS_FLOW_ATTACHMENT_KEY = "flow_attachment_key" | |||
REDIS_FLOW_MODEL_ATTACHMENT_KEY = "flow_model_attachment_key" | |||
) | |||
var mutex *sync.RWMutex = new(sync.RWMutex) | |||
var modelMutex *sync.Mutex = new(sync.Mutex) | |||
func CheckFlowForDataset(ctx *context.Context) error { | |||
if ctx.User == nil { | |||
@@ -86,6 +88,31 @@ func AddFileNameToCache(datasetId int64, fileName string, userId int64) { | |||
setSDKUploadFileCache(REDIS_FLOW_ATTACHMENT_KEY, cacheMap) | |||
} | |||
func AddModelFileNameToCache(modelId string, fileName string, userId int64) { | |||
modelMutex.Lock() | |||
defer modelMutex.Unlock() | |||
cacheMap := getSDKUploadFileMap(REDIS_FLOW_MODEL_ATTACHMENT_KEY) | |||
expireTimeKeys := make([]string, 0) | |||
currentTime := time.Now().Unix() | |||
for tmpKey, tmpValue := range cacheMap { | |||
time, err := strconv.ParseInt(tmpValue, 10, 64) | |||
if err == nil { | |||
if currentTime-time > 24*3600 { | |||
expireTimeKeys = append(expireTimeKeys, tmpKey) | |||
continue | |||
} | |||
} | |||
} | |||
for _, delKey := range expireTimeKeys { | |||
delete(cacheMap, delKey) | |||
} | |||
key := modelId + "_" + fileName + "_" + fmt.Sprint(userId) | |||
value := fmt.Sprint(time.Now().Unix()) | |||
cacheMap[key] = value | |||
log.Info("set key=" + key + " value=" + value + " to cache.") | |||
setSDKUploadFileCache(REDIS_FLOW_MODEL_ATTACHMENT_KEY, cacheMap) | |||
} | |||
func RemoveFileFromCache(datasetId int64, fileName string, userId int64) { | |||
mutex.Lock() | |||
defer mutex.Unlock() | |||
@@ -96,6 +123,16 @@ func RemoveFileFromCache(datasetId int64, fileName string, userId int64) { | |||
setSDKUploadFileCache(REDIS_FLOW_ATTACHMENT_KEY, cacheMap) | |||
} | |||
func RemoveModelFileFromCache(modelId string, fileName string, userId int64) { | |||
modelMutex.Lock() | |||
defer modelMutex.Unlock() | |||
key := modelId + "_" + fileName + "_" + fmt.Sprint(userId) | |||
cacheMap := getSDKUploadFileMap(REDIS_FLOW_MODEL_ATTACHMENT_KEY) | |||
delete(cacheMap, key) | |||
log.Info("remove key=" + key + " from cache.") | |||
setSDKUploadFileCache(REDIS_FLOW_MODEL_ATTACHMENT_KEY, cacheMap) | |||
} | |||
func getSDKUploadFileMap(msgKey string) map[string]string { | |||
valueStr, err := redis_client.Get(msgKey) | |||
msgMap := make(map[string]string, 0) | |||
@@ -144,3 +181,64 @@ func CheckFlowForDatasetSDK() error { | |||
} | |||
return nil | |||
} | |||
func CheckFlowForModelSDK() error { | |||
cacheMap := getSDKUploadFileMap(REDIS_FLOW_MODEL_ATTACHMENT_KEY) | |||
currentTime := time.Now().Unix() | |||
count := 0 | |||
for _, tmpValue := range cacheMap { | |||
time, err := strconv.ParseInt(tmpValue, 10, 64) | |||
if err == nil { | |||
if currentTime-time > 24*3600 { | |||
continue | |||
} | |||
} | |||
count += 1 | |||
} | |||
log.Info("total find " + fmt.Sprint(count) + " uploading files.") | |||
if count >= setting.FLOW_CONTROL.ALL_ATTACHEMENT_NUM_SDK { | |||
log.Info("The number of model files uploaded using the SDK simultaneously cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ALL_ATTACHEMENT_NUM_SDK)) | |||
return errors.New("The number of model files uploaded using the SDK simultaneously cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ALL_ATTACHEMENT_NUM_SDK)) | |||
} | |||
return nil | |||
} | |||
func CheckFlowForModel(ctx *context.Context) error { | |||
if ctx.User == nil { | |||
return errors.New("User not login.") | |||
} | |||
log.Info("start to check flow for upload model file.") | |||
fileName := ctx.Query("file_name") | |||
currentTimeNow := time.Now() | |||
currentLongTime := currentTimeNow.Unix() | |||
last24Hour := currentTimeNow.AddDate(0, 0, -1).Unix() | |||
filechunks, err := models.GetModelFileChunksByUserId(ctx.User.ID, last24Hour, true) | |||
if err == nil { | |||
if len(filechunks) >= setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST24HOUR { | |||
log.Info("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST24HOUR) + " files within the last 24 hours. so " + fileName + " is rejected. user id=" + fmt.Sprint(ctx.User.ID)) | |||
return errors.New("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST24HOUR) + " files within the last 24 hours.") | |||
} | |||
var totalSize int64 | |||
totalSize += ctx.QueryInt64("size") | |||
concurrentUpload := 0 | |||
for _, file := range filechunks { | |||
totalSize += file.Size | |||
if (currentLongTime - int64(file.CreatedUnix)) < 10*60 { | |||
log.Info("the file " + file.Md5 + " in 10min upload." + file.CreatedUnix.Format("2006-01-02 15:04:05")) | |||
concurrentUpload += 1 | |||
} else { | |||
log.Info("the file " + file.Md5 + " not in 10min upload." + file.CreatedUnix.Format("2006-01-02 15:04:05")) | |||
} | |||
} | |||
log.Info("The concurrentUpload is " + fmt.Sprint(concurrentUpload) + " to checked " + fileName + ". user id=" + fmt.Sprint(ctx.User.ID)) | |||
if concurrentUpload >= setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST10M { | |||
log.Info("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST10M) + " files within the past 10 minutes. so " + fileName + " is rejected. user id=" + fmt.Sprint(ctx.User.ID)) | |||
return errors.New("A single user cannot upload more than " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_NUM_A_USER_LAST10M) + " files within the past 10 minutes.") | |||
} | |||
if totalSize >= setting.FLOW_CONTROL.ATTACHEMENT_SIZE_A_USER*1024*1024*1024 { | |||
log.Info("The total file size uploaded by a single user within the past 24 hours cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_SIZE_A_USER) + "G. so " + fileName + " is rejected. user id=" + fmt.Sprint(ctx.User.ID)) | |||
return errors.New("The total file size uploaded by a single user within the past 24 hours cannot exceed " + fmt.Sprint(setting.FLOW_CONTROL.ATTACHEMENT_SIZE_A_USER) + "G.") | |||
} | |||
} | |||
return nil | |||
} |
Dear OpenI User
Thank you for your continuous support to the Openl Qizhi Community AI Collaboration Platform. In order to protect your usage rights and ensure network security, we updated the Openl Qizhi Community AI Collaboration Platform Usage Agreement in January 2024. The updated agreement specifies that users are prohibited from using intranet penetration tools. After you click "Agree and continue", you can continue to use our services. Thank you for your cooperation and understanding.
For more agreement content, please refer to the《Openl Qizhi Community AI Collaboration Platform Usage Agreement》