diff --git a/conf/map_conf.go b/conf/map_conf.go index 477dd2ed0..a3ee1b473 100644 --- a/conf/map_conf.go +++ b/conf/map_conf.go @@ -11,6 +11,7 @@ const ( StringType = "string" IntType = "int" Int64Type = "int64" + Int32Type = "int32" BoolType = "bool" StringListType = "[]string" AliasMapType = "[string string, string]" @@ -80,6 +81,26 @@ func (conf MapConf) GetInt(key string) (int, error) { return v, nil } +func (conf MapConf) GetInt32Or(key string, deft int32) (int32, error) { + ret, err := conf.GetInt32(key) + if err != nil { + return deft, err + } + return ret, nil +} + +func (conf MapConf) GetInt32(key string) (int32, error) { + value, exist := conf[key] + if !exist { + return 0, ErrConfMissingKey(key, Int32Type) + } + v, err := strconv.ParseInt(value, 10, 32) + if err != nil { + return 0, ErrConfKeyType(key, Int32Type) + } + return int32(v), nil +} + func (conf MapConf) GetInt64Or(key string, deft int64) (int64, error) { ret, err := conf.GetInt64(key) if err != nil { diff --git a/reader/rest_reader_models.go b/reader/rest_reader_models.go index ab5e6d9d4..61358d864 100644 --- a/reader/rest_reader_models.go +++ b/reader/rest_reader_models.go @@ -163,9 +163,9 @@ const ( var ( ModeUsages = KeyValueSlice{ {ModeFileAuto, "从文件读取( fileauto 模式)", ""}, - {ModeDir, "从文件读取( dir 模式)", ""}, {ModeFile, "从文件读取( file 模式)", ""}, {ModeTailx, "从文件读取( tailx 模式)", ""}, + {ModeDir, "从文件读取( dir 模式)", ""}, {ModeDirx, "从文件读取( dirx 模式)", ""}, {ModeMySQL, "从 MySQL 读取", ""}, {ModeMSSQL, "从 MSSQL 读取", ""}, diff --git a/sender/fault_tolerant.go b/sender/fault_tolerant.go index 64ccd52b0..063b66ead 100644 --- a/sender/fault_tolerant.go +++ b/sender/fault_tolerant.go @@ -25,6 +25,7 @@ const ( mb = 1024 * 1024 // 1MB defaultWriteLimit = 10 // 默认写速限制为10MB maxBytesPerFile = 100 * mb + maxDiskUsedBytes = 32 * GB qNameSuffix = "_local_save" directSuffix = "_direct" defaultMaxProcs = 1 // 默认没有并发 @@ -64,6 +65,8 @@ type FtOption struct { longDataDiscard bool innerSenderType string pandoraSenderType string + maxDiskUsedBytes int64 + maxSizePerFile int32 } type datasContext struct { @@ -88,6 +91,8 @@ func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (* } procs, _ := conf.GetIntOr(KeyFtProcs, defaultMaxProcs) runnerName, _ := conf.GetStringOr(KeyRunnerName, UnderfinedRunnerName) + maxDiskUsedBytes, _ := conf.GetInt64Or(KeyMaxDiskUsedBytes, maxDiskUsedBytes) + maxSizePerFile, _ := conf.GetInt32Or(KeyMaxSizePerFile, maxBytesPerFile) opt := &FtOption{ saveLogPath: logPath, @@ -100,6 +105,8 @@ func NewFtSender(innerSender Sender, conf conf.MapConf, ftSaveLogPath string) (* longDataDiscard: longDataDiscard, innerSenderType: senderType, pandoraSenderType: pandoraSendType, + maxDiskUsedBytes: maxDiskUsedBytes, + maxSizePerFile: maxSizePerFile, } return newFtSender(innerSender, runnerName, opt) @@ -115,38 +122,41 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende lq = queue.NewDirectQueue("stream" + directSuffix) } else if !opt.memoryChannel { lq = queue.NewDiskQueue(queue.NewDiskQueueOptions{ - Name: "stream" + qNameSuffix, - DataPath: opt.saveLogPath, - MaxBytesPerFile: maxBytesPerFile, - MaxMsgSize: maxBytesPerFile, - SyncEveryWrite: opt.syncEvery, - SyncEveryRead: opt.syncEvery, - SyncTimeout: 2 * time.Second, - WriteRateLimit: opt.writeLimit * mb, + Name: "stream" + qNameSuffix, + DataPath: opt.saveLogPath, + MaxBytesPerFile: int64(opt.maxSizePerFile), + MaxMsgSize: opt.maxSizePerFile, + SyncEveryWrite: opt.syncEvery, + SyncEveryRead: opt.syncEvery, + SyncTimeout: 2 * time.Second, + WriteRateLimit: opt.writeLimit * mb, + MaxDiskUsedBytes: opt.maxDiskUsedBytes, }) } else { lq = queue.NewDiskQueue(queue.NewDiskQueueOptions{ Name: "stream" + qNameSuffix, DataPath: opt.saveLogPath, - MaxBytesPerFile: maxBytesPerFile, - MaxMsgSize: maxBytesPerFile, + MaxBytesPerFile: int64(opt.maxSizePerFile), + MaxMsgSize: opt.maxSizePerFile, SyncEveryWrite: opt.syncEvery, SyncEveryRead: opt.syncEvery, SyncTimeout: 2 * time.Second, WriteRateLimit: opt.writeLimit * mb, EnableMemoryQueue: true, MemoryQueueSize: int64(opt.memoryChannelSize), + MaxDiskUsedBytes: opt.maxDiskUsedBytes, }) } bq = queue.NewDiskQueue(queue.NewDiskQueueOptions{ - Name: "backup" + qNameSuffix, - DataPath: opt.saveLogPath, - MaxBytesPerFile: maxBytesPerFile, - MaxMsgSize: maxBytesPerFile, - SyncEveryWrite: opt.syncEvery, - SyncEveryRead: opt.syncEvery, - SyncTimeout: 2 * time.Second, - WriteRateLimit: opt.writeLimit * mb, + Name: "backup" + qNameSuffix, + DataPath: opt.saveLogPath, + MaxBytesPerFile: int64(opt.maxSizePerFile), + MaxMsgSize: opt.maxSizePerFile, + SyncEveryWrite: opt.syncEvery, + SyncEveryRead: opt.syncEvery, + SyncTimeout: 2 * time.Second, + WriteRateLimit: opt.writeLimit * mb, + MaxDiskUsedBytes: opt.maxDiskUsedBytes, }) ftSender := FtSender{ exitChan: make(chan struct{}), diff --git a/sender/rest_senders_models.go b/sender/rest_senders_models.go index 32aee7a21..744f60276 100644 --- a/sender/rest_senders_models.go +++ b/sender/rest_senders_models.go @@ -1,6 +1,8 @@ package sender import ( + "strconv" + . "github.com/qiniu/logkit/utils/models" "github.com/qiniu/pandora-go-sdk/base/config" ) @@ -90,6 +92,24 @@ var ( Advance: true, ToolTip: `丢弃大于2M的数据`, } + OptionMaxDiskUsedBytes = Option{ + KeyName: KeyMaxDiskUsedBytes, + ChooseOnly: false, + Default: strconv.Itoa(maxDiskUsedBytes), + DefaultNoUse: false, + Description: "磁盘使用总大小限制(max_disk_used_bytes)", + Advance: true, + ToolTip: `磁盘使用总大小限制`, + } + OptionMaxSizePerSize = Option{ + KeyName: KeyMaxSizePerFile, + ChooseOnly: false, + Default: strconv.Itoa(maxBytesPerFile), + DefaultNoUse: false, + Description: "磁盘队列单个文件最大字节(max_size_per_file)", + Advance: true, + ToolTip: `磁盘队列单个文件最大字节, 也是单个消息的最大字节。最大不超过2GB`, + } OptionLogkitSendTime = Option{ KeyName: KeyLogkitSendTime, Element: Radio, @@ -473,6 +493,8 @@ var ModeKeyOptions = map[string][]Option{ OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, + OptionMaxDiskUsedBytes, + OptionMaxSizePerSize, { KeyName: KeyForceMicrosecond, Element: Radio, @@ -604,6 +626,8 @@ var ModeKeyOptions = map[string][]Option{ OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, + OptionMaxDiskUsedBytes, + OptionMaxSizePerSize, }, TypeInfluxdb: { { @@ -708,6 +732,8 @@ var ModeKeyOptions = map[string][]Option{ OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, + OptionMaxDiskUsedBytes, + OptionMaxSizePerSize, }, TypeDiscard: {}, TypeElastic: { @@ -771,6 +797,8 @@ var ModeKeyOptions = map[string][]Option{ OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, + OptionMaxDiskUsedBytes, + OptionMaxSizePerSize, }, TypeKafka: { { @@ -839,6 +867,8 @@ var ModeKeyOptions = map[string][]Option{ OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, + OptionMaxDiskUsedBytes, + OptionMaxSizePerSize, }, TypeHttp: { { @@ -882,5 +912,7 @@ var ModeKeyOptions = map[string][]Option{ OptionFtMemoryChannel, OptionFtMemoryChannelSize, OptionKeyFtLongDataDiscard, + OptionMaxDiskUsedBytes, + OptionMaxSizePerSize, }, } diff --git a/sender/sender.go b/sender/sender.go index f625dc070..cc5a3f3d5 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -130,6 +130,10 @@ const ( KeyFtMemoryChannelSize = "ft_memory_channel_size" KeyFtLongDataDiscard = "ft_long_data_discard" + // queue + KeyMaxDiskUsedBytes = "max_disk_used_bytes" + KeyMaxSizePerFile = "max_size_per_file" + // ft 策略 // KeyFtStrategyBackupOnly 只在失败的时候进行容错 KeyFtStrategyBackupOnly = "backup_only"