Skip to content

Commit

Permalink
feat(kafka-logger): the key field should be optional. (apache#2807)
Browse files Browse the repository at this point in the history
  • Loading branch information
Firstsawyou authored Nov 24, 2020
1 parent 82628f0 commit dc5fee2
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 3 deletions.
2 changes: 1 addition & 1 deletion apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ local schema = {
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false}
},
required = {"broker_list", "kafka_topic", "key"}
required = {"broker_list", "kafka_topic"}
}

local _M = {
Expand Down
2 changes: 1 addition & 1 deletion doc/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- |
| broker_list | object | required | | | An array of Kafka brokers. |
| kafka_topic | string | required | | | Target topic to push data. |
| key | string | required | | | Key for the message. |
| key | string | optional | | | Used for partition allocation of messages. |
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. |
| name | string | optional | "kafka logger" | | A unique identifier to identity the batch processor |
| meta_format | string | optional | "default" | enum: `default`, `origin`| `default`: collect the request information with detfault JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)|
Expand Down
2 changes: 1 addition & 1 deletion doc/zh-cn/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
| ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ |
| broker_list | object | 必须 | | | 要推送的 kafka 的 broker 列表。 |
| kafka_topic | string | 必须 | | | 要推送的 topic。 |
| key | string | 必须 | | | 用于加密消息的密钥|
| key | string | 可选 | | | 用于消息的分区分配|
| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 |
| name | string | 可选 | "kafka logger" | | batch processor 的唯一标识。 |
| meta_format | string | 可选 | "default" | 枚举:`default``origin`| `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)|
Expand Down
137 changes: 137 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -439,3 +439,140 @@ hello world
--- error_log_like eval
qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
--- wait: 2



=== TEST 13: set route(id: 1), missing key field
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"timeout" : 1,
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]],
[[{
"node": {
"value": {
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"timeout" : 1,
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
},
"key": "/apisix/routes/1"
},
"action": "set"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 14: access, test key field is optional
--- request
GET /hello
--- response_body
hello world
--- no_error_log
[error]
--- wait: 2



=== TEST 15: set route(meta_format = default), missing key field
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" : {
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"timeout" : 1,
"batch_max_size": 1,
"include_req_body": false
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 16: hit route, report log to kafka
--- request
GET /hello?ab=cd
abcdef
--- response_body
hello world
--- no_error_log
[error]
--- error_log_like eval
qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
--- wait: 2

0 comments on commit dc5fee2

Please sign in to comment.