-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: let etcd v3 read_watch
handle the case where a chunk contains partial event or multiple events
#154
fix: let etcd v3 read_watch
handle the case where a chunk contains partial event or multiple events
#154
Changes from all commits
eb21766
de344ce
64ca04b
77e8ef7
7b456db
0faf78c
a85dab8
d60dea1
331449a
3a58c08
46b56dd
f428a26
aca8a02
bb20595
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
-- https://github.com/ledgetech/lua-resty-http | ||
local split = require("ngx.re").split | ||
local typeof = require("typeof") | ||
local cjson = require("cjson.safe") | ||
local setmetatable = setmetatable | ||
|
@@ -688,43 +689,77 @@ local function request_chunk(self, method, path, opts, timeout) | |
|
||
|
||
local function read_watch() | ||
body = nil | ||
|
||
while(1) do | ||
body, err = res.body_reader() | ||
local chunk, read_err = res.body_reader() | ||
if read_err then | ||
return nil, read_err | ||
end | ||
if not chunk then | ||
break | ||
end | ||
|
||
if not body then | ||
return nil, err | ||
body = chunk | ||
else | ||
-- this branch will only be executed in rare cases, for example, a single event json | ||
-- is larger than the proxy_buffer_size of nginx which proxies etcd, so it would be | ||
-- ok to use a string concat directly without worry about the performance. | ||
body = body .. chunk | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
end | ||
if not utils.is_empty_str(body) then | ||
|
||
if not utils.is_empty_str(chunk) and str_byte(chunk, -1) == str_byte("\n") then | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if the response chunk doesn't contain the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this case, the caller can rewatch based on the last revision it got, so it can start consuming from the correct revision again without miss any event. |
||
break | ||
end | ||
|
||
end | ||
|
||
body, err = decode_json(body) | ||
if not body then | ||
return nil, "failed to decode json body: " .. (err or " unkwon") | ||
elseif body.error and body.error.http_code >= 500 then | ||
-- health_check retry should do nothing here | ||
-- and let connection closed to create a new one | ||
health_check.report_failure(endpoint.http_host) | ||
return nil, endpoint.http_host .. ": " .. body.error.http_status | ||
return nil, nil | ||
end | ||
|
||
if body.result and body.result.events then | ||
for _, event in ipairs(body.result.events) do | ||
if event.kv.value then -- DELETE not have value | ||
event.kv.value = decode_base64(event.kv.value or "") | ||
event.kv.value = self.serializer.deserialize(event.kv.value) | ||
local chunks, split_err = split(body, [[\n]], "jo") | ||
if split_err then | ||
return nil, "failed to split chunks: " .. split_err | ||
end | ||
|
||
local all_events = {} | ||
for _, chunk in ipairs(chunks) do | ||
body, err = decode_json(chunk) | ||
if not body then | ||
return nil, "failed to decode json body: " .. (err or " unknown") | ||
elseif body.error and body.error.http_code >= 500 then | ||
-- health_check retry should do nothing here | ||
-- and let connection closed to create a new one | ||
health_check.report_failure(endpoint.http_host) | ||
return nil, endpoint.http_host .. ": " .. body.error.http_status | ||
end | ||
|
||
if body.result and body.result.events then | ||
for _, event in ipairs(body.result.events) do | ||
if event.kv.value then -- DELETE not have value | ||
event.kv.value = decode_base64(event.kv.value or "") | ||
event.kv.value = self.serializer.deserialize(event.kv.value) | ||
end | ||
event.kv.key = decode_base64(event.kv.key) | ||
if event.prev_kv then | ||
event.prev_kv.value = decode_base64(event.prev_kv.value or "") | ||
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value) | ||
event.prev_kv.key = decode_base64(event.prev_kv.key) | ||
end | ||
tab_insert(all_events, event) | ||
end | ||
event.kv.key = decode_base64(event.kv.key) | ||
if event.prev_kv then | ||
event.prev_kv.value = decode_base64(event.prev_kv.value or "") | ||
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value) | ||
event.prev_kv.key = decode_base64(event.prev_kv.key) | ||
else | ||
if #chunks == 1 then | ||
return body | ||
end | ||
end | ||
end | ||
|
||
if #all_events > 1 then | ||
body.result.events = all_events | ||
end | ||
return body | ||
end | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look like this branch is not executed, as
body
has default value""
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, my fault,
body
should initial withnil
.