Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ipc) avoid new workers attempting to replay evicted events #97

Merged
merged 1 commit into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions lib/resty/mlcache/ipc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local setmetatable = setmetatable


local INDEX_KEY = "lua-resty-ipc:index"
local FORCIBLE_KEY = "lua-resty-ipc:forcible"
local POLL_SLEEP_RATIO = 2


Expand Down Expand Up @@ -59,19 +60,10 @@ function _M.new(shm, debug)
return nil, "no such lua_shared_dict: " .. shm
end

local idx, err = dict:get(INDEX_KEY)
if err then
return nil, "failed to get index: " .. err
end

if idx ~= nil and type(idx) ~= "number" then
return nil, "index is not a number, shm tampered with"
end

local self = {
dict = dict,
pid = debug and 0 or worker_pid(),
idx = idx or 0,
idx = 0,
callbacks = {},
}

Expand Down Expand Up @@ -113,11 +105,21 @@ function _M:broadcast(channel, data)
return nil, "failed to increment index: " .. err
end

local ok, err = self.dict:set(idx, marshalled_event)
local ok, err, forcible = self.dict:set(idx, marshalled_event)
if not ok then
return nil, "failed to insert event in shm: " .. err
end

if forcible then
-- take note that eviction has started
-- we repeat this flagging to avoid this key from ever being
-- evicted itself
local ok, err = self.dict:set(FORCIBLE_KEY, true)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better approach here would be if shm:incr() accepted flags, or maybe even an shm:set_flags() API. In the absence of a solution to rely on flags, we rely on an extraneous key, and on it never reaching the LRU tail.

if not ok then
return nil, "failed to set forcible flag in shm: " .. err
end
end

return true
end

Expand Down Expand Up @@ -152,8 +154,23 @@ function _M:poll(timeout)
timeout = 0.3
end

-- guard: self.idx <= shm_idx
self.idx = min(self.idx, shm_idx)
if self.idx == 0 then
local forcible, err = self.dict:get(FORCIBLE_KEY)
if err then
return nil, "failed to get forcible flag from shm: " .. err
end

if forcible then
-- shm lru eviction occurred, we are likely a new worker
-- skip indexes that may have been evicted and resume current
-- polling idx
self.idx = shm_idx - 1
end

else
-- guard: self.idx <= shm_idx
self.idx = min(self.idx, shm_idx)
end

local elapsed = 0

Expand Down
158 changes: 71 additions & 87 deletions t/00-ipc.t
Original file line number Diff line number Diff line change
Expand Up @@ -57,78 +57,7 @@ no such lua_shared_dict: foo



=== TEST 2: new() picks up current idx if already set
This ensures new workers spawned during a master process' lifecycle do not
attempt to replay all events from index 0.
https://github.com/thibaultcha/lua-resty-mlcache/issues/87
--- http_config eval
qq{
lua_package_path "$::pwd/lib/?.lua;;";
lua_shared_dict ipc 1m;

init_by_lua_block {
require "resty.core"

assert(ngx.shared.ipc:set("lua-resty-ipc:index", 42))
}
}
--- config
location = /t {
content_by_lua_block {
local mlcache_ipc = require "resty.mlcache.ipc"

local ipc, err = mlcache_ipc.new("ipc")
if not ipc then
error(err)
end

ngx.say(ipc.idx)
}
}
--- request
GET /t
--- response_body
42
--- no_error_log
[warn]
[error]
[crit]



=== TEST 3: new() checks type of current idx if already set
--- http_config eval
qq{
lua_package_path "$::pwd/lib/?.lua;;";
lua_shared_dict ipc 1m;

init_by_lua_block {
require "resty.core"

assert(ngx.shared.ipc:set("lua-resty-ipc:index", "42"))
}
}
--- config
location = /t {
content_by_lua_block {
local mlcache_ipc = require "resty.mlcache.ipc"

local ipc, err = mlcache_ipc.new("ipc")
ngx.say(err)
}
}
--- request
GET /t
--- response_body
index is not a number, shm tampered with
--- no_error_log
[warn]
[error]
[crit]



=== TEST 4: broadcast() sends an event through shm
=== TEST 2: broadcast() sends an event through shm
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -162,7 +91,7 @@ received event from my_channel: hello world



=== TEST 5: broadcast() runs event callback in protected mode
=== TEST 3: broadcast() runs event callback in protected mode
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -196,7 +125,7 @@ lua entry thread aborted: runtime error



=== TEST 6: poll() catches invalid timeout arg
=== TEST 4: poll() catches invalid timeout arg
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -225,7 +154,7 @@ timeout must be a number



=== TEST 7: poll() catches up with all events
=== TEST 5: poll() catches up with all events
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -263,7 +192,62 @@ received event from my_channel: msg 3



=== TEST 8: poll() does not execute events from self (same pid)
=== TEST 6: poll() resumes to current idx if events were previously evicted
This ensures new workers spawned during a master process' lifecycle do not
attempt to replay all events from index 0.
https://github.com/thibaultcha/lua-resty-mlcache/issues/87
https://github.com/thibaultcha/lua-resty-mlcache/issues/93
--- http_config eval
qq{
lua_package_path "$::pwd/lib/?.lua;;";
lua_shared_dict ipc 32k;

init_by_lua_block {
require "resty.core"
local mlcache_ipc = require "resty.mlcache.ipc"

ipc = assert(mlcache_ipc.new("ipc", true))

ipc:subscribe("my_channel", function(data)
ngx.log(ngx.NOTICE, "my_channel event: ", data)
end)

for i = 1, 32 do
-- fill shm, simulating busy workers
-- this must trigger eviction for this test to succeed
assert(ipc:broadcast("my_channel", string.rep(".", 2^10)))
end
}
}
--- config
location = /t {
content_by_lua_block {
ngx.say("ipc.idx: ", ipc.idx)

assert(ipc:broadcast("my_channel", "first broadcast"))
assert(ipc:broadcast("my_channel", "second broadcast"))

-- first poll without new() to simulate new worker
assert(ipc:poll())

-- ipc.idx set to shm_idx-1 ("second broadcast")
ngx.say("ipc.idx: ", ipc.idx)
}
}
--- request
GET /t
--- response_body
ipc.idx: 0
ipc.idx: 34
--- error_log
my_channel event: second broadcast
--- no_error_log
my_channel event: first broadcast
[error]



=== TEST 7: poll() does not execute events from self (same pid)
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -296,7 +280,7 @@ received event from my_channel: hello world



=== TEST 9: poll() runs all registered callbacks for a channel
=== TEST 8: poll() runs all registered callbacks for a channel
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -340,7 +324,7 @@ callback 3 from my_channel: hello world



=== TEST 10: poll() exits when no event to poll
=== TEST 9: poll() exits when no event to poll
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -371,7 +355,7 @@ callback from my_channel: hello world



=== TEST 11: poll() runs all callbacks from all channels
=== TEST 10: poll() runs all callbacks from all channels
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -424,7 +408,7 @@ callback 2 from other_channel: hello ipc 2



=== TEST 12: poll() catches tampered shm (by third-party users)
=== TEST 11: poll() catches tampered shm (by third-party users)
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -457,7 +441,7 @@ index is not a number, shm tampered with



=== TEST 13: poll() retries getting an event until timeout
=== TEST 12: poll() retries getting an event until timeout
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -502,7 +486,7 @@ GET /t



=== TEST 14: poll() reaches custom timeout
=== TEST 13: poll() reaches custom timeout
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -542,7 +526,7 @@ GET /t



=== TEST 15: poll() logs errors and continue if event has been tampered with
=== TEST 14: poll() logs errors and continue if event has been tampered with
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -580,7 +564,7 @@ GET /t



=== TEST 16: poll() is safe to be called in contexts that don't support ngx.sleep()
=== TEST 15: poll() is safe to be called in contexts that don't support ngx.sleep()
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -624,7 +608,7 @@ GET /t



=== TEST 17: poll() guards self.idx from growing beyond the current shm idx
=== TEST 16: poll() guards self.idx from growing beyond the current shm idx
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -670,7 +654,7 @@ callback from my_channel: second broadcast



=== TEST 18: poll() JITs
=== TEST 17: poll() JITs
--- http_config eval
qq{
$::HttpConfig
Expand Down Expand Up @@ -702,7 +686,7 @@ qr/\[TRACE\s+\d+ content_by_lua\(nginx\.conf:\d+\):2 loop\]/



=== TEST 19: broadcast() JITs
=== TEST 18: broadcast() JITs
--- http_config eval
qq{
$::HttpConfig
Expand Down