Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ip transformer修改 #730

Merged
merged 3 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions mgr/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,19 +380,16 @@ func checkSampleData(sampleData []string, logParser parser.Parser) ([]string, er
func getTransformerCreator(transformerConfig map[string]interface{}) (transforms.Creator, error) {
transformKeyType, ok := transformerConfig[transforms.KeyType]
if !ok {
err := fmt.Errorf("missing param %s", transforms.KeyType)
return nil, err
return nil, fmt.Errorf("missing param %s", transforms.KeyType)
}
transformKeyTypeStr, ok := transformKeyType.(string)
if !ok {
err := fmt.Errorf("param %s must be of type string", transforms.KeyType)
return nil, err
return nil, fmt.Errorf("param %s must be of type string", transforms.KeyType)
}

create, ok := transforms.Transformers[transformKeyTypeStr]
if !ok {
err := fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
return nil, err
return nil, fmt.Errorf("transformer of type %v not exist", transformKeyTypeStr)
}
return create, nil
}
Expand Down
69 changes: 69 additions & 0 deletions mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/qiniu/logkit/sender"
_ "github.com/qiniu/logkit/sender/builtin"
"github.com/qiniu/logkit/transforms"
"github.com/qiniu/logkit/transforms/ip"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -274,6 +275,12 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
if err != nil {
return nil, err
}
var serverConfigs = make([]map[string]interface{}, 0, len(transformers))
for _, transform := range transformers {
if serverTransformer, ok := transform.(transforms.ServerTansformer); ok {
serverConfigs = append(serverConfigs, serverTransformer.ServerConfig())
}
}
senders := make([]sender.Sender, 0)
for i, senderConfig := range rc.SendersConfig {
if senderConfig[sender.KeySenderType] == sender.TypePandora {
Expand All @@ -285,12 +292,17 @@ func NewLogExportRunner(rc RunnerConfig, cleanChan chan<- cleaner.CleanSignal, r
senderConfig[sender.KeyPandoraDescription] = LogkitAutoCreateDescription
}
}
senderConfig, err := setPandoraServerConfig(senderConfig, serverConfigs)
if err != nil {
return nil, err
}
s, err := sr.NewSender(senderConfig, meta.FtSaveLogPath())
if err != nil {
return nil, err
}
senders = append(senders, s)
delete(rc.SendersConfig[i], sender.InnerUserAgent)
delete(rc.SendersConfig[i], sender.KeyPandoraDescription)
}

senderCnt := len(senders)
Expand Down Expand Up @@ -1286,3 +1298,60 @@ func MergeExtraInfoTags(meta *reader.Meta, tags map[string]interface{}) map[stri
}
return tags
}

func setPandoraServerConfig(senderConfig conf.MapConf, serverConfigs []map[string]interface{}) (conf.MapConf, error) {
if senderConfig[sender.KeySenderType] != sender.TypePandora {
return senderConfig, nil
}

var err error
for _, serverConfig := range serverConfigs {
keyType, ok := serverConfig[transforms.KeyType].(string)
if !ok {
continue
}
switch keyType {
case ip.Name:
if senderConfig, err = setIPConfig(senderConfig, serverConfig); err != nil {
return senderConfig, err
}
}

}

return senderConfig, nil
}

func setIPConfig(senderConfig conf.MapConf, serverConfig map[string]interface{}) (conf.MapConf, error) {
key, keyOk := serverConfig["key"].(string)
if !keyOk {
return senderConfig, nil
}

if len(GetKeys(key)) > 1 {
return senderConfig, fmt.Errorf("key: %v ip transform key in server doesn't support dot(.)", key)
}
autoCreate := senderConfig[sender.KeyPandoraAutoCreate]
transformAt, transformAtOk := serverConfig[transforms.TransformAt].(string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

先pandora TransformAtOk为false的情况,直接return

if !transformAtOk {
return senderConfig, nil
}
if transformAt == ip.Local {
schema := fmt.Sprintf(",%v ip", key)
if autoCreate == fmt.Sprintf("%v ip", key) {
autoCreate = ""
} else if index := strings.Index(autoCreate, schema); index != -1 {
autoCreate = autoCreate[:index] + autoCreate[index+len(schema):]
}
senderConfig[sender.KeyPandoraAutoCreate] = autoCreate
return senderConfig, nil
}

if autoCreate == "" {
senderConfig[sender.KeyPandoraAutoCreate] = fmt.Sprintf("%s %s", key, TypeIP)
return senderConfig, nil
}

senderConfig[sender.KeyPandoraAutoCreate] += fmt.Sprintf(",%s %s", key, TypeIP)
return senderConfig, nil
}
61 changes: 61 additions & 0 deletions mgr/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
_ "github.com/qiniu/logkit/sender/builtin"
"github.com/qiniu/logkit/sender/mock"
"github.com/qiniu/logkit/sender/pandora"
"github.com/qiniu/logkit/transforms"
_ "github.com/qiniu/logkit/transforms/builtin"
"github.com/qiniu/logkit/transforms/ip"
. "github.com/qiniu/logkit/utils/models"
)

Expand Down Expand Up @@ -1896,12 +1898,71 @@ DONE:
break DONE
default:
dft++

}
time.Sleep(50 * time.Millisecond)
if dft > 60 {
break
}
}
assert.Equal(t, 1, ret)
}

func Test_setSenderConfig(t *testing.T) {
senderConfig := conf.MapConf{
sender.KeySenderType: sender.TypePandora,
}

serverConfigs := []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
},
}
actualConfig, err := setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
"key": "ip",
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "ip ip", actualConfig[sender.KeyPandoraAutoCreate])

senderConfig = conf.MapConf{
sender.KeySenderType: sender.TypePandora,
}
serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Local,
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: "other",
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.NoError(t, err)
assert.Equal(t, "", actualConfig[sender.KeyPandoraAutoCreate])

serverConfigs = []map[string]interface{}{
{
transforms.KeyType: ip.Name,
transforms.TransformAt: ip.Server,
"key": "ip.ip",
},
}
actualConfig, err = setPandoraServerConfig(senderConfig, serverConfigs)
assert.Error(t, err)
}
1 change: 0 additions & 1 deletion sender/rest_senders_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ var ModeKeyOptions = map[string][]Option{
Default: "nb",
DefaultNoUse: false,
Description: "创建的资源所在区域(pandora_region)",
Advance: true,
ToolTip: "工作流资源创建所在区域",
},
{
Expand Down
97 changes: 75 additions & 22 deletions transforms/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,23 @@ const (
Latitude = "Latitude"
Longitude = "Longitude"
DistrictCode = "DistrictCode"

Local = "本地"
Server = "服务端"
)

var (
_ transforms.StatsTransformer = &Transformer{}
_ transforms.Transformer = &Transformer{}
_ transforms.Initializer = &Transformer{}
_ transforms.ServerTansformer = &Transformer{}
)

type Transformer struct {
StageTime string `json:"stage"`
Key string `json:"key"`
DataPath string `json:"data_path"`
TransformAt string `json:"transform_at"`
KeyAsPrefix bool `json:"key_as_prefix"`
Language string `json:"language"`

Expand All @@ -52,6 +57,13 @@ type Transformer struct {
}

func (t *Transformer) Init() error {
if t.TransformAt == "" {
t.TransformAt = Local
}
if t.TransformAt != Local {
return nil
}

if t.Language == "" {
t.Language = "zh-CN"
}
Expand Down Expand Up @@ -91,6 +103,10 @@ func (_ *Transformer) RawTransform(datas []string) ([]string, error) {
}

func (t *Transformer) Transform(datas []Data) ([]Data, error) {
if t.TransformAt != Local {
return datas, nil
}

var err, fmtErr error
errNum := 0
if t.loc == nil {
Expand Down Expand Up @@ -219,39 +235,67 @@ func (_ *Transformer) SampleConfig() string {

func (_ *Transformer) ConfigOptions() []Option {
return []Option{
transforms.KeyFieldName,
{
KeyName: "data_path",
ChooseOnly: false,
Default: "",
Required: true,
Placeholder: "your/path/to/ip.dat(x)",
DefaultNoUse: true,
Description: "IP数据库路径(data_path)",
Type: transforms.TransformTypeString,
},
{
KeyName: "key_as_prefix",
KeyName: transforms.TransformAt,
Element: Radio,
ChooseOnly: true,
ChooseOptions: []interface{}{false, true},
Required: false,
Default: true,
ChooseOptions: []interface{}{Local, Server},
Default: Local,
Required: true,
DefaultNoUse: false,
Element: Checkbox,
Description: "字段名称作为前缀(key_as_prefix)",
Description: "运行方式",
Type: transforms.TransformTypeString,
ToolTip: "本地运行使用客户自己的IP库,更为灵活。服务端运行固定使用七牛IP库,用户无需提供IP库",
},
{
KeyName: "language",
KeyName: "key",
ChooseOnly: false,
Default: "zh-CN",
Default: "",
Required: true,
Placeholder: "zh-CN",
Placeholder: "my_field_keyname",
DefaultNoUse: true,
Description: "mmdb格式库使用的语种",
Advance: true,
Description: "要进行Transform变化的键(key)",
ToolTip: "对该字段的值进行transform变换, 服务端运行不支持嵌套(.),请先使用rename,本地运行支持",
Type: transforms.TransformTypeString,
},
{
KeyName: "data_path",
ChooseOnly: false,
Default: "",
Required: true,
Placeholder: "your/path/to/ip.dat(x)",
DefaultNoUse: true,
Description: "IP数据库路径(data_path)",
Type: transforms.TransformTypeString,
AdvanceDepend: transforms.TransformAt,
AdvanceDependValue: Local,
},
{
KeyName: "key_as_prefix",
ChooseOnly: true,
ChooseOptions: []interface{}{false, true},
Required: false,
Default: true,
DefaultNoUse: false,
Element: Checkbox,
Description: "字段名称作为前缀(key_as_prefix)",
Type: transforms.TransformTypeString,
AdvanceDepend: transforms.TransformAt,
AdvanceDependValue: Local,
},
{
KeyName: "language",
ChooseOnly: false,
Default: "zh-CN",
Required: true,
Placeholder: "zh-CN",
DefaultNoUse: true,
Description: "mmdb格式库使用的语种",
Advance: true,
Type: transforms.TransformTypeString,
AdvanceDepend: transforms.TransformAt,
AdvanceDependValue: Local,
},
}
}

Expand All @@ -275,6 +319,15 @@ func (t *Transformer) Close() error {
return nil
}

func (t *Transformer) ServerConfig() map[string]interface{} {
config := make(map[string]interface{})
config[transforms.KeyType] = Name
config[transforms.TransformAt] = t.TransformAt
config["key"] = t.Key

return config
}

func init() {
transforms.Add(Name, func() transforms.Transformer {
return &Transformer{}
Expand Down
Loading