Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: let etcd v3 read_watch handle the case where a chunk contains partial event or multiple events #154

Merged
merged 14 commits into from
Mar 3, 2022
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ jobs:
include:
- version: 2.2.5
conf: Procfile-single
- version: 3.1.0
conf: Procfile-single
- version: 3.2.0
conf: Procfile-single
- version: 3.3.0
conf: Procfile-single-enable-v2
- version: 3.4.0
conf: Procfile-single-enable-v2
- version: 3.5.0
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Table of Contents
* [Install](#install)
* [API v2](api_v2.md)
* [API v3](api_v3.md)
* **NOTE**: Requires ETCD version >= v3.4.0

## Install

Expand Down
75 changes: 55 additions & 20 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- https://github.com/ledgetech/lua-resty-http
local split = require("ngx.re").split
local typeof = require("typeof")
local cjson = require("cjson.safe")
local setmetatable = setmetatable
Expand Down Expand Up @@ -688,43 +689,77 @@ local function request_chunk(self, method, path, opts, timeout)


local function read_watch()
body = nil

while(1) do
body, err = res.body_reader()
local chunk, read_err = res.body_reader()
if read_err then
return nil, read_err
end
if not chunk then
break
end

if not body then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look like this branch is not executed, as body has default value ""?

Copy link
Contributor Author

@nic-6443 nic-6443 Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, my fault, body should initial with nil.

return nil, err
body = chunk
else
-- this branch will only be executed in rare cases, for example, a single event json
-- is larger than the proxy_buffer_size of nginx which proxies etcd, so it would be
-- ok to use a string concat directly without worry about the performance.
body = body .. chunk
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use table.concat to avoid creating too much temp str?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

end
if not utils.is_empty_str(body) then

if not utils.is_empty_str(chunk) and str_byte(chunk, -1) == str_byte("\n") then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the response chunk doesn't contain the \n and we'll continue reading the next chunk? But if the connection is aborted / timed out this time, so the last chunk was missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, the caller can rewatch based on the last revision it got, so it can start consuming from the correct revision again without miss any event.

break
end

end

body, err = decode_json(body)
if not body then
return nil, "failed to decode json body: " .. (err or " unkwon")
elseif body.error and body.error.http_code >= 500 then
-- health_check retry should do nothing here
-- and let connection closed to create a new one
health_check.report_failure(endpoint.http_host)
return nil, endpoint.http_host .. ": " .. body.error.http_status
return nil, nil
end

if body.result and body.result.events then
for _, event in ipairs(body.result.events) do
if event.kv.value then -- DELETE not have value
event.kv.value = decode_base64(event.kv.value or "")
event.kv.value = self.serializer.deserialize(event.kv.value)
local chunks, split_err = split(body, [[\n]], "jo")
if split_err then
return nil, "failed to split chunks: " .. split_err
end

local all_events = {}
for _, chunk in ipairs(chunks) do
body, err = decode_json(chunk)
if not body then
return nil, "failed to decode json body: " .. (err or " unknown")
elseif body.error and body.error.http_code >= 500 then
-- health_check retry should do nothing here
-- and let connection closed to create a new one
health_check.report_failure(endpoint.http_host)
return nil, endpoint.http_host .. ": " .. body.error.http_status
end

if body.result and body.result.events then
for _, event in ipairs(body.result.events) do
if event.kv.value then -- DELETE not have value
event.kv.value = decode_base64(event.kv.value or "")
event.kv.value = self.serializer.deserialize(event.kv.value)
end
event.kv.key = decode_base64(event.kv.key)
if event.prev_kv then
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
event.prev_kv.key = decode_base64(event.prev_kv.key)
end
tab_insert(all_events, event)
end
event.kv.key = decode_base64(event.kv.key)
if event.prev_kv then
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
event.prev_kv.key = decode_base64(event.prev_kv.key)
else
if #chunks == 1 then
return body
end
end
end

if #all_events > 1 then
body.result.events = all_events
end
return body
end

Expand Down
148 changes: 148 additions & 0 deletions t/v3/key.t
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,151 @@ passed
request chunk headers: {"foo":"bar"}
--- no_error_log
[error]



=== TEST 13: watch response which http chunk contains partial etcd event response
--- http_config eval: $::HttpConfig
--- config
location /version {
content_by_lua_block {
ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}')
}
}

location /v3/watch {
content_by_lua_block {
-- payload get from tcpdump while running TEST 3 and split the event response into two chunks

ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"271","raft_term":"7"},"created":true}}')
ngx.flush()
ngx.sleep(0.1)

-- partial event without trailing new line
ngx.print('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437",')
ngx.flush()
ngx.print('"revision":"272","raft_term":"7"},"events"')
ngx.flush()

-- key = /test, value = bcd3
ngx.say(':[{"kv":{"key":"L3Rlc3Q=","create_revision":"156","mod_revision":"272","version":"44","value":"ImJjZDMi"}}]}}')
ngx.flush()

-- ensure client timeout
ngx.sleep(1)
}
}

location /t {
content_by_lua_block {
local etcd, err = require("resty.etcd").new({
protocol = "v3",
http_host = {
"http://127.0.0.1:" .. ngx.var.server_port,
},
})
check_res(etcd, err)

local cur_time = ngx.now()
local body_chunk_fun, err = etcd:watch("/test", {timeout = 0.5})
if not body_chunk_fun then
ngx.say("failed to watch: ", err)
end

local idx = 0
while true do
local chunk, err = body_chunk_fun()

if not chunk then
if err then
ngx.say(err)
end
break
end

idx = idx + 1
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
end
}
}
--- request
GET /t
--- no_error_log
[error]
--- response_body_like eval
qr/1:.*"created":true.*
2:.*"value":"bcd3".*
timeout/
--- timeout: 5



=== TEST 14: watch response which one http chunk contains multiple events chunk
--- http_config eval: $::HttpConfig
--- config
location /version {
content_by_lua_block {
ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}')
}
}

location /v3/watch {
content_by_lua_block {
-- payload get from tcpdump while running TEST 5 and merge two event response into one http chunk

ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"290","raft_term":"8"},"created":true}}')
ngx.flush()
ngx.sleep(0.1)

-- one http chunk contains multiple event response, note the new line at the end of first event response
-- key1 = /wdir/, value1 = bcd4
-- key2 = /wdir/a, value2 = bcd4a
ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"292","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIv","create_revision":"31","mod_revision":"292","version":"22","value":"ImJjZDQi"}}]}}\n{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"293","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIvYQ==","create_revision":"293","mod_revision":"293","version":"1","value":"ImJjZDRhIg=="}}]}}')
ngx.flush()
tokers marked this conversation as resolved.
Show resolved Hide resolved

-- ensure client timeout
ngx.sleep(1)
}
}

location /t {
content_by_lua_block {
local etcd, err = require("resty.etcd").new({
protocol = "v3",
http_host = {
"http://127.0.0.1:" .. ngx.var.server_port,
},
})
check_res(etcd, err)

local cur_time = ngx.now()
local body_chunk_fun, err = etcd:watch("/", {timeout = 0.5})
if not body_chunk_fun then
ngx.say("failed to watch: ", err)
end

local idx = 0
while true do
local chunk, err = body_chunk_fun()

if not chunk then
if err then
ngx.say(err)
end
break
end

idx = idx + 1
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
end
}
}
--- request
GET /t
--- no_error_log
[error]
--- response_body_like eval
qr/1:.*"created":true.*
2:.*"value":"bcd4".*"value":"bcd4a".*
timeout/
--- timeout: 5