Skip to content

Commit

Permalink
Merge pull request #896 from wonderflow/op1
Browse files Browse the repository at this point in the history
es type变为高级选项 && 去除tsdb选项
  • Loading branch information
wonderflow authored Dec 27, 2018
2 parents 994b66b + a603734 commit 5d643e4
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 77 deletions.
3 changes: 2 additions & 1 deletion reader/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,9 @@ var ModeKeyOptions = map[string][]Option{
ChooseOnly: false,
Placeholder: "type_app",
Default: "",
Required: true,
Required: false,
DefaultNoUse: true,
Advance: true,
Description: "app名称(es_type)",
},
OptionAuthUsername,
Expand Down
36 changes: 26 additions & 10 deletions reader/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,8 @@ func init() {

func NewReader(meta *reader.Meta, conf conf.MapConf) (reader.Reader, error) {
readBatch, _ := conf.GetIntOr(KeyESReadBatch, 100)
estype, err := conf.GetString(KeyESType)
if err != nil {
return nil, err
}
estype, _ := conf.GetStringOr(KeyESType, "")

esindex, err := conf.GetString(KeyESIndex)
if err != nil {
return nil, err
Expand Down Expand Up @@ -329,7 +327,10 @@ func (r *Reader) execWithLoop() error {
// Create a client
switch r.esVersion {
case ElasticVersion6:
scroll := r.elasticV6Client.Scroll(index).Type(r.estype).Size(r.readBatch).KeepAlive(r.keepAlive)
scroll := r.elasticV6Client.Scroll(index).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do(context.Background())
if err == io.EOF {
Expand All @@ -351,7 +352,10 @@ func (r *Reader) execWithLoop() error {
}
}
case ElasticVersion3:
scroll := r.elasticV3Client.Scroll(index).Type(r.estype).Size(r.readBatch).KeepAlive(r.keepAlive)
scroll := r.elasticV3Client.Scroll(index).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do()
if err == io.EOF {
Expand All @@ -373,7 +377,10 @@ func (r *Reader) execWithLoop() error {
}
}
default:
scroll := r.elasticV5Client.Scroll(index).Type(r.estype).Size(r.readBatch).KeepAlive(r.keepAlive)
scroll := r.elasticV5Client.Scroll(index).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do(context.Background())
if err == io.EOF {
Expand Down Expand Up @@ -412,7 +419,10 @@ func (r *Reader) execWithCron() error {
} else {
rangeQuery = elasticV6.NewRangeQuery(r.cronOffsetKey)
}
scroll := r.elasticV6Client.Scroll(index).Query(rangeQuery).Type(r.estype).Size(r.readBatch).KeepAlive(r.keepAlive)
scroll := r.elasticV6Client.Scroll(index).Query(rangeQuery).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do(context.Background())
if err == io.EOF {
Expand Down Expand Up @@ -443,7 +453,10 @@ func (r *Reader) execWithCron() error {
} else {
rangeQuery = elasticV3.NewRangeQuery(r.cronOffsetKey)
}
scroll := r.elasticV3Client.Scroll(index).Query(rangeQuery).Type(r.estype).Size(r.readBatch).KeepAlive(r.keepAlive)
scroll := r.elasticV3Client.Scroll(index).Query(rangeQuery).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do()
if err == io.EOF {
Expand Down Expand Up @@ -474,7 +487,10 @@ func (r *Reader) execWithCron() error {
} else {
rangeQuery = elasticV5.NewRangeQuery(r.cronOffsetKey)
}
scroll := r.elasticV5Client.Scroll(index).Query(rangeQuery).Type(r.estype).Size(r.readBatch).KeepAlive(r.keepAlive)
scroll := r.elasticV5Client.Scroll(index).Query(rangeQuery).Size(r.readBatch).KeepAlive(r.keepAlive)
if r.estype != "" {
scroll = scroll.Type(r.estype)
}
for {
results, err := scroll.ScrollId(r.offset).Do(context.Background())
if err == io.EOF {
Expand Down
133 changes: 67 additions & 66 deletions sender/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,62 +359,63 @@ var ModeKeyOptions = map[string][]Option{
AdvanceDepend: KeyPandoraEnableLogDB,
ToolTip: `指定字段的分词方式,逗号分隔多个,如 "f1 keyword, f2 full_text"。仅在新建时生效,更改时不生效,请在日志仓库更改。`,
},
{
KeyName: KeyPandoraEnableTSDB,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{"false", "true"},
Default: "false",
DefaultNoUse: false,
Description: "自动创建并导出到时序数据库(pandora_enable_tsdb)",
AdvanceDepend: KeyPandoraRegion,
AdvanceDependValue: NBRegion,
},
{
KeyName: KeyPandoraTSDBName,
ChooseOnly: false,
Default: "",
DefaultNoUse: false,
Description: "指定时序数据库仓库名称(pandora_tsdb_name)",
AdvanceDepend: KeyPandoraEnableTSDB,
ToolTip: "若不指定使用实时仓库(pandora_repo_name)名称",
},
{
KeyName: KeyPandoraTSDBSeriesName,
ChooseOnly: false,
Default: "",
DefaultNoUse: false,
Description: "指定时序数据库序列名称(pandora_tsdb_series_name)",
AdvanceDepend: KeyPandoraEnableTSDB,
ToolTip: "若不指定使用仓库(pandora_tsdb_name)名称",
},
{
KeyName: KeyPandoraTSDBSeriesTags,
ChooseOnly: false,
Default: "",
DefaultNoUse: false,
Description: "指定时序数据库标签(pandora_tsdb_series_tags)",
AdvanceDepend: KeyPandoraEnableTSDB,
},
{
KeyName: KeyPandoraTSDBHost,
ChooseOnly: false,
Default: config.DefaultTSDBEndpoint,
DefaultNoUse: false,
Description: "时序数据库域名[私有部署才修改](pandora_tsdb_host)",
Advance: true,
AdvanceDepend: KeyPandoraEnableTSDB,
ToolTip: "时序数据库域名,私有部署请对应修改",
},
{
KeyName: KeyPandoraTSDBTimeStamp,
ChooseOnly: false,
Default: "",
DefaultNoUse: false,
Description: "指定时序数据库时间戳(pandora_tsdb_timestamp)",
Advance: true,
AdvanceDepend: KeyPandoraEnableTSDB,
},
//暂时下线时序数据库
//{
// KeyName: KeyPandoraEnableTSDB,
// Element: Radio,
// ChooseOnly: true,
// ChooseOptions: []interface{}{"false", "true"},
// Default: "false",
// DefaultNoUse: false,
// Description: "自动创建并导出到时序数据库(pandora_enable_tsdb)",
// AdvanceDepend: KeyPandoraRegion,
// AdvanceDependValue: NBRegion,
//},
//{
// KeyName: KeyPandoraTSDBName,
// ChooseOnly: false,
// Default: "",
// DefaultNoUse: false,
// Description: "指定时序数据库仓库名称(pandora_tsdb_name)",
// AdvanceDepend: KeyPandoraEnableTSDB,
// ToolTip: "若不指定使用实时仓库(pandora_repo_name)名称",
//},
//{
// KeyName: KeyPandoraTSDBSeriesName,
// ChooseOnly: false,
// Default: "",
// DefaultNoUse: false,
// Description: "指定时序数据库序列名称(pandora_tsdb_series_name)",
// AdvanceDepend: KeyPandoraEnableTSDB,
// ToolTip: "若不指定使用仓库(pandora_tsdb_name)名称",
//},
//{
// KeyName: KeyPandoraTSDBSeriesTags,
// ChooseOnly: false,
// Default: "",
// DefaultNoUse: false,
// Description: "指定时序数据库标签(pandora_tsdb_series_tags)",
// AdvanceDepend: KeyPandoraEnableTSDB,
//},
//{
// KeyName: KeyPandoraTSDBHost,
// ChooseOnly: false,
// Default: config.DefaultTSDBEndpoint,
// DefaultNoUse: false,
// Description: "时序数据库域名[私有部署才修改](pandora_tsdb_host)",
// Advance: true,
// AdvanceDepend: KeyPandoraEnableTSDB,
// ToolTip: "时序数据库域名,私有部署请对应修改",
//},
//{
// KeyName: KeyPandoraTSDBTimeStamp,
// ChooseOnly: false,
// Default: "",
// DefaultNoUse: false,
// Description: "指定时序数据库时间戳(pandora_tsdb_timestamp)",
// Advance: true,
// AdvanceDepend: KeyPandoraEnableTSDB,
//},
{
KeyName: KeyPandoraEnableKodo,
Element: Radio,
Expand Down Expand Up @@ -582,16 +583,16 @@ var ModeKeyOptions = map[string][]Option{
OptionKeyFtLongDataDiscard,
OptionMaxDiskUsedBytes,
OptionMaxSizePerSize,
{
KeyName: KeyForceMicrosecond,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{"false", "true"},
Default: "false",
DefaultNoUse: false,
Description: "扰动时间字段增加精度(force_microsecond)",
Advance: true,
},
//{
// KeyName: KeyForceMicrosecond,
// Element: Radio,
// ChooseOnly: true,
// ChooseOptions: []interface{}{"false", "true"},
// Default: "false",
// DefaultNoUse: false,
// Description: "扰动时间字段增加精度(force_microsecond)",
// Advance: true,
//},
{
KeyName: KeyForceDataConvert,
Element: Radio,
Expand Down

0 comments on commit 5d643e4

Please sign in to comment.