Skip to content

Commit 19a9546

Browse files
committed
优化ip transformer 性能
1 parent 3d29834 commit 19a9546

File tree

8 files changed

+383
-68
lines changed

8 files changed

+383
-68
lines changed

reader/utils_test.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,11 @@ func TestGetTags(t *testing.T) {
136136
assert.Equal(t, exp, tags)
137137
}
138138

139-
func TestSetMapValueWithPrefix(t *testing.T) {
139+
func TestSetMapValueExistWithPrefix(t *testing.T) {
140140
data1 := map[string]interface{}{
141141
"a": "b",
142142
}
143-
err1 := SetMapValueWithPrefix(data1, "newVal", "prefix", false, "a")
143+
err1 := SetMapValueExistWithPrefix(data1, "newVal", "prefix", "a")
144144
assert.NoError(t, err1)
145145
exp1 := map[string]interface{}{
146146
"a": "b",
@@ -154,7 +154,7 @@ func TestSetMapValueWithPrefix(t *testing.T) {
154154
"age": 45,
155155
},
156156
}
157-
err2 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "name"}...)
157+
err2 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "name"}...)
158158
assert.NoError(t, err2)
159159
exp2 := map[string]interface{}{
160160
"a": map[string]interface{}{
@@ -165,10 +165,10 @@ func TestSetMapValueWithPrefix(t *testing.T) {
165165
}
166166
assert.Equal(t, exp2, data2)
167167

168-
err3 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"xy", "name"}...)
168+
err3 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"xy", "name"}...)
169169
assert.Error(t, err3)
170170

171-
err4 := SetMapValueWithPrefix(data2, "newVal", "prefix", false, []string{"a", "hello"}...)
171+
err4 := SetMapValueExistWithPrefix(data2, "newVal", "prefix", []string{"a", "hello"}...)
172172
assert.NoError(t, err4)
173173
exp4 := map[string]interface{}{
174174
"a": map[string]interface{}{
@@ -180,11 +180,4 @@ func TestSetMapValueWithPrefix(t *testing.T) {
180180
}
181181
assert.Equal(t, exp4, data2)
182182

183-
data5 := map[string]interface{}{}
184-
err5 := SetMapValueWithPrefix(data5, "newVal", "prefix", true, "a")
185-
assert.NoError(t, err5)
186-
exp5 := map[string]interface{}{
187-
"prefix_a": "newVal",
188-
}
189-
assert.Equal(t, exp5, data5)
190183
}

sender/pandora/pandora.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type Sender struct {
4848
microsecondCounter uint64
4949
extraInfo map[string]string
5050
sendType string
51+
keyCache map[string]KeyInfo
5152
}
5253

5354
// UserSchema was parsed pandora schema from user's raw schema
@@ -431,6 +432,7 @@ func newPandoraSender(opt *PandoraOption) (s *Sender, err error) {
431432
schemas: make(map[string]pipeline.RepoSchemaEntry),
432433
extraInfo: utilsos.GetExtraInfo(),
433434
sendType: opt.sendType,
435+
keyCache: make(map[string]KeyInfo),
434436
}
435437

436438
expandAttr := make([]string, 0)
@@ -851,7 +853,7 @@ func (s *Sender) Send(datas []Data) (se error) {
851853
return s.rawSend(datas)
852854
default:
853855
for i, v := range datas {
854-
datas[i] = DeepConvertKey(v)
856+
datas[i] = DeepConvertKeyWithCache(v, s.keyCache)
855857
}
856858
return s.schemaFreeSend(datas)
857859
}

transforms/ip/ip.go

Lines changed: 102 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,18 @@ type Transformer struct {
3333
DataPath string `json:"data_path"`
3434
KeyAsPrefix bool `json:"key_as_prefix"`
3535

36-
loc Locator
37-
stats StatsInfo
36+
loc Locator
37+
keys []string
38+
lastEleKey string
39+
keysRegion []string
40+
keysCity []string
41+
keysCountry []string
42+
keysIsp []string
43+
keysCountryCode []string
44+
keysLatitude []string
45+
keysLongitude []string
46+
keysDistrictCode []string
47+
stats StatsInfo
3848
}
3949

4050
func (t *Transformer) Init() error {
@@ -43,9 +53,32 @@ func (t *Transformer) Init() error {
4353
return fmt.Errorf("new locator: %v", err)
4454
}
4555
t.loc = loc
56+
t.keys = GetKeys(t.Key)
57+
58+
newKeys := make([]string, len(t.keys))
59+
copy(newKeys, t.keys)
60+
t.lastEleKey = t.keys[len(t.keys)-1]
61+
t.keysRegion = generateKeys(t.keys, Region, t.KeyAsPrefix)
62+
t.keysCity = generateKeys(t.keys, City, t.KeyAsPrefix)
63+
t.keysCountry = generateKeys(t.keys, Country, t.KeyAsPrefix)
64+
t.keysIsp = generateKeys(t.keys, Isp, t.KeyAsPrefix)
65+
t.keysCountryCode = generateKeys(t.keys, CountryCode, t.KeyAsPrefix)
66+
t.keysLatitude = generateKeys(t.keys, Latitude, t.KeyAsPrefix)
67+
t.keysLongitude = generateKeys(t.keys, Longitude, t.KeyAsPrefix)
68+
t.keysDistrictCode = generateKeys(t.keys, DistrictCode, t.KeyAsPrefix)
4669
return nil
4770
}
4871

72+
func generateKeys(keys []string, lastEle string, keyAsPrefix bool) []string {
73+
newKeys := make([]string, len(keys))
74+
copy(newKeys, keys)
75+
if keyAsPrefix {
76+
lastEle = keys[len(keys)-1] + "_" + lastEle
77+
}
78+
newKeys[len(keys)-1] = lastEle
79+
return newKeys
80+
}
81+
4982
func (_ *Transformer) RawTransform(datas []string) ([]string, error) {
5083
return datas, errors.New("IP transformer not support rawTransform")
5184
}
@@ -54,18 +87,15 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) {
5487
var err, fmtErr error
5588
errNum := 0
5689
if t.loc == nil {
57-
loc, err := NewLocator(t.DataPath)
90+
err := t.Init()
5891
if err != nil {
59-
t.stats, _ = transforms.SetStatsInfo(err, t.stats, int64(errNum), int64(len(datas)), t.Type())
6092
return datas, err
6193
}
62-
t.loc = loc
6394
}
64-
keys := GetKeys(t.Key)
65-
newKeys := make([]string, len(keys))
95+
newKeys := make([]string, len(t.keys))
6696
for i := range datas {
67-
copy(newKeys, keys)
68-
val, getErr := GetMapValue(datas[i], keys...)
97+
copy(newKeys, t.keys)
98+
val, getErr := GetMapValue(datas[i], t.keys...)
6999
if getErr != nil {
70100
errNum, err = transforms.SetError(errNum, getErr, transforms.GetErr, t.Key)
71101
continue
@@ -81,36 +111,83 @@ func (t *Transformer) Transform(datas []Data) ([]Data, error) {
81111
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
82112
continue
83113
}
84-
newKeys[len(newKeys)-1] = Region
85-
SetMapValueWithPrefix(datas[i], info.Region, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
86-
newKeys[len(newKeys)-1] = City
87-
SetMapValueWithPrefix(datas[i], info.City, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
88-
newKeys[len(newKeys)-1] = Country
89-
SetMapValueWithPrefix(datas[i], info.Country, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
90-
newKeys[len(newKeys)-1] = Isp
91-
SetMapValueWithPrefix(datas[i], info.Isp, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
114+
findErr = t.SetMapValue(datas[i], info.Region, t.keysRegion...)
115+
if findErr != nil {
116+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
117+
}
118+
findErr = t.SetMapValue(datas[i], info.City, t.keysCity...)
119+
if findErr != nil {
120+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
121+
}
122+
findErr = t.SetMapValue(datas[i], info.Country, t.keysCountry...)
123+
if findErr != nil {
124+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
125+
}
126+
findErr = t.SetMapValue(datas[i], info.Isp, t.keysIsp...)
127+
if findErr != nil {
128+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
129+
}
92130
if info.CountryCode != "" {
93-
newKeys[len(newKeys)-1] = CountryCode
94-
SetMapValueWithPrefix(datas[i], info.CountryCode, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
131+
findErr = t.SetMapValue(datas[i], info.CountryCode, t.keysCountryCode...)
132+
if findErr != nil {
133+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
134+
}
95135
}
96136
if info.Latitude != "" {
97-
newKeys[len(newKeys)-1] = Latitude
98-
SetMapValueWithPrefix(datas[i], info.Latitude, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
137+
findErr = t.SetMapValue(datas[i], info.Latitude, t.keysLatitude...)
138+
if findErr != nil {
139+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
140+
}
99141
}
100142
if info.Longitude != "" {
101-
newKeys[len(newKeys)-1] = Longitude
102-
SetMapValueWithPrefix(datas[i], info.Longitude, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
143+
findErr = t.SetMapValue(datas[i], info.Longitude, t.keysLongitude...)
144+
if findErr != nil {
145+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
146+
}
103147
}
104148
if info.DistrictCode != "" {
105-
newKeys[len(newKeys)-1] = DistrictCode
106-
SetMapValueWithPrefix(datas[i], info.DistrictCode, keys[len(keys)-1], t.KeyAsPrefix, newKeys...)
149+
findErr = t.SetMapValue(datas[i], info.DistrictCode, t.keysDistrictCode...)
150+
if findErr != nil {
151+
errNum, err = transforms.SetError(errNum, findErr, transforms.General, "")
152+
}
107153
}
108154
}
109155

110156
t.stats, fmtErr = transforms.SetStatsInfo(err, t.stats, int64(errNum), int64(len(datas)), t.Type())
111157
return datas, fmtErr
112158
}
113159

160+
//通过层级key设置value值, 如果keys不存在则不加前缀,否则加前缀
161+
func (t *Transformer) SetMapValue(m map[string]interface{}, val interface{}, keys ...string) error {
162+
if len(keys) == 0 {
163+
return nil
164+
}
165+
var curr map[string]interface{}
166+
curr = m
167+
for _, k := range keys[0 : len(keys)-1] {
168+
finalVal, ok := curr[k]
169+
if !ok {
170+
n := make(map[string]interface{})
171+
curr[k] = n
172+
curr = n
173+
continue
174+
}
175+
//判断val是否为map[string]interface{}类型
176+
if curr, ok = finalVal.(map[string]interface{}); ok {
177+
continue
178+
}
179+
return fmt.Errorf("SetMapValueWithPrefix failed, %v is not the type of map[string]interface{}", keys)
180+
}
181+
//判断val(k)是否存在
182+
_, exist := curr[keys[len(keys)-1]]
183+
if exist {
184+
curr[t.lastEleKey+"_"+keys[len(keys)-1]] = val
185+
} else {
186+
curr[keys[len(keys)-1]] = val
187+
}
188+
return nil
189+
}
190+
114191
func (_ *Transformer) Description() string {
115192
//return "transform ip to country region and isp"
116193
return "获取IP的区域、国家、城市和运营商信息"

transforms/ip/ip_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func TestTransformer(t *testing.T) {
1616
Key: "ip",
1717
DataPath: "./test_data/17monipdb.dat",
1818
}
19+
assert.Nil(t, ipt.Init())
1920
data, err := ipt.Transform([]Data{{"ip": "111.2.3.4"}, {"ip": "x.x.x.x"}})
2021
assert.Error(t, err)
2122
exp := []Data{{
@@ -65,6 +66,7 @@ func TestTransformer(t *testing.T) {
6566
Key: "multi.ip",
6667
DataPath: "./test_data/17monipdb.dat",
6768
}
69+
assert.Nil(t, ipt.Init())
6870
data2, err2 := ipt2.Transform([]Data{{"multi": map[string]interface{}{"ip": "111.2.3.4"}}, {"multi": map[string]interface{}{"ip": "x.x.x.x"}}})
6971
assert.Error(t, err2)
7072
exp2 := []Data{{
@@ -173,6 +175,35 @@ func TestTransformer(t *testing.T) {
173175
assert.Len(t, locatorStore.locators, 2)
174176
}
175177

178+
var dttest []Data
179+
180+
//old: 1000000 1152 ns/op 432 B/op 16 allocs/op
181+
//new: 2000000 621 ns/op 232 B/op 7 allocs/op
182+
func BenchmarkIpTrans(b *testing.B) {
183+
b.ReportAllocs()
184+
ipt4 := &Transformer{
185+
Key: "multi.ip2",
186+
DataPath: "./test_data/17monipdb.dat",
187+
KeyAsPrefix: true,
188+
}
189+
ipt4.Init()
190+
data := []Data{
191+
{
192+
"multi": map[string]interface{}{
193+
"ip": "111.2.3.4",
194+
"Region": "浙江",
195+
"City": "宁波",
196+
"Country": "中国",
197+
"Isp": "N/A",
198+
"ip2": "183.251.28.250",
199+
},
200+
},
201+
}
202+
for i := 0; i < b.N; i++ {
203+
dttest, _ = ipt4.Transform(data)
204+
}
205+
}
206+
176207
func Test_badData(t *testing.T) {
177208
ipt := &Transformer{
178209
Key: "ip",

transforms/mutate/pandorakey_convert.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,21 @@ var (
1414

1515
type PandoraKeyConvert struct {
1616
stats StatsInfo
17+
cache map[string]KeyInfo
1718
}
1819

20+
func (g *PandoraKeyConvert) Init() error {
21+
g.cache = make(map[string]KeyInfo)
22+
return nil
23+
}
1924
func (g *PandoraKeyConvert) RawTransform(datas []string) ([]string, error) {
2025
return datas, errors.New("pandora_key_convert transformer not support rawTransform")
2126
}
2227

2328
func (g *PandoraKeyConvert) Transform(datas []Data) ([]Data, error) {
2429
for i, v := range datas {
25-
datas[i] = DeepConvertKey(v)
30+
datas[i] = DeepConvertKeyWithCache(v, g.cache)
31+
//datas[i] = DeepConvertKey(v)
2632
}
2733

2834
g.stats, _ = transforms.SetStatsInfo(nil, g.stats, 0, int64(len(datas)), g.Type())
@@ -63,6 +69,6 @@ func (g *PandoraKeyConvert) SetStats(err string) StatsInfo {
6369

6470
func init() {
6571
transforms.Add("pandora_key_convert", func() transforms.Transformer {
66-
return &PandoraKeyConvert{}
72+
return &PandoraKeyConvert{cache: make(map[string]KeyInfo)}
6773
})
6874
}

transforms/mutate/pandorakey_convert_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,23 @@ func TestDeepconvertkey(t *testing.T) {
2626
exp = []Data{{"ts_ts2": map[string]interface{}{"K200": 1, "a_xs_1": 2}}}
2727
assert.Equal(t, exp, got)
2828
}
29+
30+
var got []Data
31+
32+
//old(没有cache):500000 2846 ns/op 2536 B/op 33 allocs/op
33+
//new(cache): 500000 2249 ns/op 2392 B/op 17 allocs/op
34+
func BenchmarkCache(b *testing.B) {
35+
pandoraConvert := &PandoraKeyConvert{cache: make(map[string]KeyInfo)}
36+
b.ReportAllocs()
37+
38+
for i := 0; i < b.N; i++ {
39+
data := []Data{{"ts。ts2": "stamp1"}, {"ts-tes2/1.2": "stamp2"}}
40+
got, _ = pandoraConvert.Transform(data)
41+
42+
data = []Data{{"ts。ts2": map[string]interface{}{"_xs1_2s.xs.1": 1, "a.xs.1": 2}}, {"ts- ": "stamp2"}}
43+
got, _ = pandoraConvert.Transform(data)
44+
45+
data = []Data{{"ts。ts2": map[string]interface{}{"200": 1, "a.xs.1": 2}}}
46+
got, _ = pandoraConvert.Transform(data)
47+
}
48+
}

0 commit comments

Comments
 (0)