Skip to content

Commit

Permalink
enhance: Add max file num limit and max file size limit for import (#…
Browse files Browse the repository at this point in the history
…31497) (#31542)

The max number of import files per request should not exceed 1024 by
default (configurable).
The import file size allowed for importing should not exceed 16GB by
default (configurable).

issue: #28521

pr: #31497

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Mar 25, 2024
1 parent 8858f8d commit f1a108c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 0 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ dataCoord:
import:
filesPerPreImportTask: 2 # The maximum number of files allowed per pre-import task.
taskRetention: 10800 # The retention period in seconds for tasks in the Completed or Failed state.
maxImportFileNumPerReq: 1024 # The maximum number of files allowed per single import request.

enableGarbageCollection: true
gc:
Expand Down Expand Up @@ -498,6 +499,7 @@ dataNode:
updateChannelCheckpointMaxParallel: 10
import:
maxConcurrentTaskNum: 16 # The maximum number of import/pre-import tasks allowed to run concurrently on a datanode.
maxImportFileSizeInGB: 16 # The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.

# Configures the system log output.
log:
Expand Down
6 changes: 6 additions & 0 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager, task Task
}
totalSize += size
}
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
if totalSize > int64(maxSize) {
return 0, merr.WrapErrImportFailed(fmt.Sprintf(
"The import file size has reached the maximum limit allowed for importing, "+
"fileSize=%d, maxSize=%d", totalSize, int64(maxSize)))
}
return totalSize, nil
}

Expand Down
5 changes: 5 additions & 0 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5639,6 +5639,11 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest)
resp.Status = merr.Status(merr.WrapErrParameterInvalidMsg("import request is empty"))
return resp, nil
}
if len(req.Files) > Params.DataCoordCfg.MaxFilesPerImportReq.GetAsInt() {
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d",
Params.DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(req.Files))))
return resp, nil
}
isBackup := importutilv2.IsBackup(req.GetOptions())
if !isBackup {
// check file type
Expand Down
23 changes: 23 additions & 0 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,7 @@ type dataCoordConfig struct {
ImportScheduleInterval ParamItem `refreshable:"true"`
ImportCheckIntervalHigh ParamItem `refreshable:"true"`
ImportCheckIntervalLow ParamItem `refreshable:"true"`
MaxFilesPerImportReq ParamItem `refreshable:"true"`

GracefulStopTimeout ParamItem `refreshable:"true"`
}
Expand Down Expand Up @@ -3071,6 +3072,16 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.ImportCheckIntervalLow.Init(base.mgr)

p.MaxFilesPerImportReq = ParamItem{
Key: "dataCoord.import.maxImportFileNumPerReq",
Version: "2.4.0",
Doc: "The maximum number of files allowed per single import request.",
DefaultValue: "1024",
PanicIfEmpty: false,
Export: true,
}
p.MaxFilesPerImportReq.Init(base.mgr)

p.GracefulStopTimeout = ParamItem{
Key: "dataCoord.gracefulStopTimeout",
Version: "2.3.7",
Expand Down Expand Up @@ -3131,7 +3142,9 @@ type dataNodeConfig struct {
MaxChannelCheckpointsPerRPC ParamItem `refreshable:"true"`
ChannelCheckpointUpdateTickInSeconds ParamItem `refreshable:"true"`

// import
MaxConcurrentImportTaskNum ParamItem `refreshable:"true"`
MaxImportFileSizeInGB ParamItem `refreshable:"true"`

// Compaction
L0BatchMemoryRatio ParamItem `refreshable:"true"`
Expand Down Expand Up @@ -3390,6 +3403,16 @@ func (p *dataNodeConfig) init(base *BaseTable) {
}
p.MaxConcurrentImportTaskNum.Init(base.mgr)

p.MaxImportFileSizeInGB = ParamItem{
Key: "datanode.import.maxImportFileSizeInGB",
Version: "2.4.0",
Doc: "The maximum file size (in GB) for an import file, where an import file refers to either a Row-Based file or a set of Column-Based files.",
DefaultValue: "16",
PanicIfEmpty: false,
Export: true,
}
p.MaxImportFileSizeInGB.Init(base.mgr)

p.L0BatchMemoryRatio = ParamItem{
Key: "datanode.compaction.levelZeroBatchMemoryRatio",
Version: "2.4.0",
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 2*time.Second, Params.ImportScheduleInterval.GetAsDuration(time.Second))
assert.Equal(t, 2*time.Second, Params.ImportCheckIntervalHigh.GetAsDuration(time.Second))
assert.Equal(t, 120*time.Second, Params.ImportCheckIntervalLow.GetAsDuration(time.Second))
assert.Equal(t, 1024, Params.MaxFilesPerImportReq.GetAsInt())

params.Save("datacoord.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
Expand Down Expand Up @@ -434,6 +435,7 @@ func TestComponentParam(t *testing.T) {
maxConcurrentImportTaskNum := Params.MaxConcurrentImportTaskNum.GetAsInt()
t.Logf("maxConcurrentImportTaskNum: %d", maxConcurrentImportTaskNum)
assert.Equal(t, 16, maxConcurrentImportTaskNum)
assert.Equal(t, int64(16), Params.MaxImportFileSizeInGB.GetAsInt64())
params.Save("datanode.gracefulStopTimeout", "100")
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
})
Expand Down

0 comments on commit f1a108c

Please sign in to comment.