Skip to content

Commit

Permalink
Merge pull request #730 from redHJ/pdr-7242
Browse files Browse the repository at this point in the history
ip transformer修改
  • Loading branch information
wonderflow authored Aug 29, 2018
2 parents b7385c3 + c31f34d commit e9972a3
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 74 deletions.
9 changes: 3 additions & 6 deletions mgr/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,19 +380,16 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
transformKeyType, ok := transformerConfig[transforms.KeyType]
if !ok {
err := fmt.Errorf("missing param %s", transforms.KeyType)
return nil, err
return nil, fmt.Errorf("missing param %s", transforms.KeyType)
}
transformKeyTypeStr, ok := transformKeyType.(string)
if !ok {
err := fmt.Errorf("param %s must be of type string", transforms.KeyType)
return nil, err
return nil, fmt.Errorf("param %s must be of type string", transforms.KeyType)
}

create, ok := transforms.Transformers[transformKeyTypeStr]
if !ok {
err := fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
return nil, err
return nil, fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
}
return create, nil
}
Expand Down
69 changes: 69 additions & 0 deletions mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/qiniu/logkit/sender"
_ "github.com/qiniu/logkit/sender/builtin"
"github.com/qiniu/logkit/transforms"
"github.com/qiniu/logkit/transforms/ip"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -274,6 +275,12 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
if err != nil {
return nil, err
}
var serverConfigs = make([]map[string]interface{}, 0, len(transformers))
for _, transform := range transformers {
if serverTransformer, ok := transform.(transforms.ServerTansformer); ok {
serverConfigs = append(serverConfigs, serverTransformer.ServerConfig())
}
}
senders := make([]sender.Sender, 0)
for i, senderConfig := range rc.SendersConfig {
if senderConfig[sender.KeySenderType] == sender.TypePandora {
Expand All @@ -285,12 +292,17 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription
}
}
senderConfig, err := setPandoraServerConfig(senderConfig, serverConfigs)
if err != nil {
return nil, err
}
s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath())
if err != nil {
return nil, err
}
senders = append(senders, s)
delete(rc.SendersConfig[i], sender.InnerUserAgent)
delete(rc.SendersConfig[i], sender.KeyPandoraDescription)
}

senderCnt := len(senders)
Expand Down Expand Up @@ -1286,3 +1298,60 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri
}
return tags
}

func setPandoraServerConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) (conf.MapConf, error) {
if senderConfig[sender.KeySenderType] != sender.TypePandora {
return senderConfig, nil
}

var err error
for _, serverConfig := range serverConfigs {
keyType, ok := serverConfig[transforms.KeyType].(string)
if !ok {
continue
}
switch keyType {
case ip.Name:
if senderConfig, err = setIPConfig(senderConfig, serverConfig); err != nil {
return senderConfig, err
}
}

}

return senderConfig, nil
}

func setIPConfig(senderConfig conf.MapConf, serverConfig map[string]interface{}) (conf.MapConf, error) {
key, keyOk := serverConfig["key"].(string)
if !keyOk {
return senderConfig, nil
}

if len(GetKeys(key)) > 1 {
return senderConfig, fmt.Errorf("key: %v ip transform key in server doesn't support dot(.)", key)
}
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
transformAt, transformAtOk := serverConfig[transforms.TransformAt].(string)
if !transformAtOk {
return senderConfig, nil
}
if transformAt == ip.Local {
schema := fmt.Sprintf(",%v ip", key)
if autoCreate == fmt.Sprintf("%v ip", key) {
autoCreate = ""
} else if index := strings.Index(autoCreate, schema); index != -1 {
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
}
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
return senderConfig, nil
}

if autoCreate == "" {
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%s %s", key, TypeIP)
return senderConfig, nil
}

senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%s %s", key, TypeIP)
return senderConfig, nil
}
61 changes: 61 additions & 0 deletions mgr/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
_ "github.com/qiniu/logkit/sender/builtin"
"github.com/qiniu/logkit/sender/mock"
"github.com/qiniu/logkit/sender/pandora"
"github.com/qiniu/logkit/transforms"
_ "github.com/qiniu/logkit/transforms/builtin"
"github.com/qiniu/logkit/transforms/ip"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -1896,12 +1898,71 @@ DONE:
break DONE
default:
dft++

}
time.Sleep(50 * time.Millisecond)
if dft > 60 {
break
}
}
assert.Equal(t, 1, ret)
}

func Test_setSenderConfig(t *testing.T) {
senderConfig := conf.MapConf{
sender.KeySenderType: sender.TypePandora,
}

serverConfigs := []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
},
}
actualConfig, err := setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
"key": "ip",
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "ip ip", actualConfig[sender.KeyPandoraAutoCreate])

senderConfig = conf.MapConf{
sender.KeySenderType: sender.TypePandora,
}
serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Local,
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: "other",
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
"key": "ip.ip",
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.Error(t, err)
}
1 change: 0 additions & 1 deletion sender/rest_senders_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ var ModeKeyOptions = map[string][]Option{
Default: "nb",
DefaultNoUse: false,
Description: "创建的资源所在区域(pandora_region)",
Advance: true,
ToolTip: "工作流资源创建所在区域",
},
{
Expand Down
97 changes: 75 additions & 22 deletions transforms/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@ const (
Latitude = "Latitude"
Longitude = "Longitude"
DistrictCode = "DistrictCode"

Local = "本地"
Server = "服务端"
)

var (
_ transforms.StatsTransformer = &Transformer{}
_ transforms.Transformer = &Transformer{}
_ transforms.Initializer = &Transformer{}
_ transforms.ServerTansformer = &Transformer{}
)

type Transformer struct {
StageTime string `json:"stage"`
Key string `json:"key"`
DataPath string `json:"data_path"`
TransformAt string `json:"transform_at"`
KeyAsPrefix bool `json:"key_as_prefix"`
Language string `json:"language"`

Expand All @@ -52,6 +57,13 @@ type Transformer struct {
}

func (t *Transformer) Init() error {
if t.TransformAt == "" {
t.TransformAt = Local
}
if t.TransformAt != Local {
return nil
}

if t.Language == "" {
t.Language = "zh-CN"
}
Expand Down Expand Up @@ -91,6 +103,10 @@ func (_ *Transformer) RawTransform(datas []string) ([]string, error) {
}

func (t *Transformer) Transform(datas []Data) ([]Data, error) {
if t.TransformAt != Local {
return datas, nil
}

var err, fmtErr error
errNum := 0
if t.loc == nil {
Expand Down Expand Up @@ -219,39 +235,67 @@ func (_ *Transformer) SampleConfig() string {

func (_ *Transformer) ConfigOptions() []Option {
return []Option{
transforms.KeyFieldName,
{
KeyName: "data_path",
ChooseOnly: false,
Default: "",
Required: true,
Placeholder: "your/path/to/ip.dat(x)",
DefaultNoUse: true,
Description: "IP数据库路径(data_path)",
Type: transforms.TransformTypeString,
},
{
KeyName: "key_as_prefix",
KeyName: transforms.TransformAt,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{false, true},
Required: false,
Default: true,
ChooseOptions: []interface{}{Local, Server},
Default: Local,
Required: true,
DefaultNoUse: false,
Element: Checkbox,
Description: "字段名称作为前缀(key_as_prefix)",
Description: "运行方式",
Type: transforms.TransformTypeString,
ToolTip: "本地运行使用客户自己的IP库,更为灵活。服务端运行固定使用七牛IP库,用户无需提供IP库",
},
{
KeyName: "language",
KeyName: "key",
ChooseOnly: false,
Default: "zh-CN",
Default: "",
Required: true,
Placeholder: "zh-CN",
Placeholder: "my_field_keyname",
DefaultNoUse: true,
Description: "mmdb格式库使用的语种",
Advance: true,
Description: "要进行Transform变化的键(key)",
ToolTip: "对该字段的值进行transform变换, 服务端运行不支持嵌套(.),请先使用rename,本地运行支持",
Type: transforms.TransformTypeString,
},
{
KeyName: "data_path",
ChooseOnly: false,
Default: "",
Required: true,
Placeholder: "your/path/to/ip.dat(x)",
DefaultNoUse: true,
Description: "IP数据库路径(data_path)",
Type: transforms.TransformTypeString,
AdvanceDepend: transforms.TransformAt,
AdvanceDependValue: Local,
},
{
KeyName: "key_as_prefix",
ChooseOnly: true,
ChooseOptions: []interface{}{false, true},
Required: false,
Default: true,
DefaultNoUse: false,
Element: Checkbox,
Description: "字段名称作为前缀(key_as_prefix)",
Type: transforms.TransformTypeString,
AdvanceDepend: transforms.TransformAt,
AdvanceDependValue: Local,
},
{
KeyName: "language",
ChooseOnly: false,
Default: "zh-CN",
Required: true,
Placeholder: "zh-CN",
DefaultNoUse: true,
Description: "mmdb格式库使用的语种",
Advance: true,
Type: transforms.TransformTypeString,
AdvanceDepend: transforms.TransformAt,
AdvanceDependValue: Local,
},
}
}

Expand All @@ -275,6 +319,15 @@ func (t *Transformer) Close() error {
return nil
}

func (t *Transformer) ServerConfig() map[string]interface{} {
config := make(map[string]interface{})
config[transforms.KeyType] = Name
config[transforms.TransformAt] = t.TransformAt
config["key"] = t.Key

return config
}

func init() {
transforms.Add(Name, func() transforms.Transformer {
return &Transformer{}
Expand Down
Loading

0 comments on commit e9972a3

Please sign in to comment.