Skip to content

Commit

Permalink
fix(mlcache) correctly propagate invalidation events
Browse files Browse the repository at this point in the history
Sibling instances on different workers would not receive invalidation
events since those are sent under the namespace of the publishing
worker, which has a different address for its mlcache instances than
others (potentially).

Instead of relying on the memory address, we now require users of
mlcache to simply name their cache. Several mlcache instances bearing
the same name are simply sharing the same data (same LRU and same
namespace in the shared dict).

In order to properly GC our instances, we must now keep track of how
many instances are using the same name and thuse, only GC an LRU
instance if no mlcache instance is using it (0 references left).
  • Loading branch information
thibaultcha committed Aug 27, 2017
1 parent d9fb7ad commit 7a9d030
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 155 deletions.
29 changes: 18 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ http {
# use LuaRocks or opm.
lua_package_path "/path/to/lua-resty-mlcache/lib/?.lua;;";
lua_shared_dict cache 1m;
lua_shared_dict cache_dict 1m;
init_by_lua_block {
local mlcache = require "resty.mlcache"
local cache, err = mlcache.new("cache", {
local cache, err = mlcache.new("my_cache", "cache_dict", {
lru_size = 500, -- size of the L1 (Lua-land LRU) cache
ttl = 3600, -- 1h ttl for hits
neg_ttl = 30, -- 30s ttl for misses
Expand Down Expand Up @@ -177,16 +177,21 @@ Once you have a local copy of this module's `lib/` directory, add it to your

new
---
**syntax:** `cache, err = mlcache.new(shm, opts?)`
**syntax:** `cache, err = mlcache.new(name, shm, opts?)`

Creates a new mlcache instance. If failed, returns `nil` and a string
describing the error.

The first argument `shm` is the name of the `lua_shared_dict` shared memory
The first argument `name` is an arbitrary name of your choosing for this cache,
and must be a string. Each mlcache instance namespaces the values it holds
according to its name, so several instances with the same name would
share the same data.

The second argument `shm` is the name of the `lua_shared_dict` shared memory
zone. Several instances of mlcache can use the same shm (values will be
namespaced).

The second argument `opts` is optional. If provided, it must be a table
The third argument `opts` is optional. If provided, it must be a table
holding the desired options for this instance. The possible options are:

- `lru_size`: a number defining the size of the underlying L1 cache
Expand Down Expand Up @@ -219,7 +224,7 @@ Example:
```lua
local mlcache = require "resty.mlcache"

local cache, err = mlcache.new("mlcache_shm", {
local cache, err = mlcache.new("my_cache", "cache_shared_dict", {
lru_size = 1000, -- hold up to 1000 items in the L1 cache (Lua VM)
ttl = 3600, -- caches scalar types and tables for 1h
neg_ttl = 60 -- caches nil values for 60s,
Expand All @@ -235,13 +240,15 @@ You can create several mlcache instances relying on the same underlying
```lua
local mlcache = require "mlcache"

local cache_1 = mlcache.new("mlcache_shm", { lru_size = 100 })
local cache_2 = mlcache.new("mlcache_shm", { lru_size = 1e5 })
local cache_1 = mlcache.new("cache_1", "cache_shared_dict", { lru_size = 100 })
local cache_2 = mlcache.new("cache_2", "cache_shared_dict", { lru_size = 1e5 })
```

In the above example, `cache_1` is ideal for holding a few, very large values.
`cache_2` can be used to hold a large number of small values. Both instances
will rely on the same shm: `lua_shared_dict mlcache_shm 2048m;`.
will rely on the same shm: `lua_shared_dict mlcache_shm 2048m;`. Even if
you use identical keys in both caches, they will not conflict with each other
since they each bear a different name.

[Back to TOC](#table-of-contents)

Expand Down Expand Up @@ -330,7 +337,7 @@ Example:
```lua
local mlcache = require "mlcache"

local cache, err = mlcache.new("mlcache_shm", {
local cache, err = mlcache.new("my_cache", "cache_shared_dict", {
lru_size = 1000
})
if not cache then
Expand Down Expand Up @@ -399,7 +406,7 @@ Example:
```lua
local mlcache = require "mlcache"

local cache = mlcache.new("mlcache_shm")
local cache = mlcache.new("my_cache", "cache_shared_dict")

local ttl, err, value = cache:peek("key")
if err then
Expand Down
86 changes: 71 additions & 15 deletions lib/resty/mlcache.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
-- vim: st=4 sts=4 sw=4 et:

local ffi = require "ffi"
local cjson = require "cjson.safe"
local lrucache = require "resty.lrucache"
local resty_lock = require "resty.lock"
Expand All @@ -20,6 +21,28 @@ local setmetatable = setmetatable

local LOCK_KEY_PREFIX = "lua-resty-mlcache:lock:"
local CACHE_MISS_SENTINEL_LRU = {}
local LRU_INSTANCES = {}


local c_str_type = ffi.typeof("char *")
local c_lru_gc_type = ffi.metatype([[
struct {
char *lru_name;
int len;
}
]], {
__gc = function(c_gc_type)
local lru_name = ffi.string(c_gc_type.lru_name, c_gc_type.len)

local lru_gc = LRU_INSTANCES[lru_name]
if lru_gc then
lru_gc.count = lru_gc.count - 1
if lru_gc.count <= 0 then
LRU_INSTANCES[lru_name] = nil
end
end
end
})


local TYPES_LOOKUP = {
Expand Down Expand Up @@ -109,7 +132,11 @@ local _M = {
local mt = { __index = _M }


function _M.new(shm, opts)
function _M.new(name, shm, opts)
if type(name) ~= "string" then
error("name must be a string", 2)
end

if type(shm) ~= "string" then
error("shm must be a string", 2)
end
Expand Down Expand Up @@ -163,35 +190,64 @@ function _M.new(shm, opts)
end

local self = {
lru = opts.lru or lrucache.new(opts.lru_size or 100),
name = name,
dict = dict,
shm = shm,
ttl = opts.ttl or 30,
neg_ttl = opts.neg_ttl or 5,
resty_lock_opts = opts.resty_lock_opts,
}

self.namespace = fmt("%p", self)

if opts.ipc_shm then
local mlcache_ipc = require "resty.mlcache.ipc"

local ipc, err = mlcache_ipc.new(opts.ipc_shm, opts.debug)
if not ipc then
local err
self.ipc, err = mlcache_ipc.new(opts.ipc_shm, opts.debug)
if not self.ipc then
return nil, "could not instanciate mlcache.ipc: " .. err
end

local channel = fmt("lua-resty-mlcache:invalidations:%s",
self.namespace)

self.ipc = ipc
self.ipc_invalidation_channel = channel
self.ipc_invalidation_channel = fmt("mlcache:invalidations:%s", name)

self.ipc:subscribe(self.ipc_invalidation_channel, function(key)
self.lru:delete(key)
end)
end

if opts.lru then
self.lru = opts.lru

else
-- Several mlcache instances can have the same name and hence, the samw
-- lru instance. We need to GC such LRU instances when all mlcache
-- instances using them are GC'ed.
-- We do this by using a C struct with a __gc metamethod.

local c_lru_gc = ffi.new(c_lru_gc_type)
c_lru_gc.lru_name = ffi.cast(c_str_type, name)
c_lru_gc.len = #name

-- Keep track of our LRU instance and a counter of how many mlcache
-- instances are refering to it

local lru_gc = LRU_INSTANCES[name]
if not lru_gc then
lru_gc = { count = 0, lru = nil }
LRU_INSTANCES[name] = lru_gc
end

local lru = lru_gc.lru
if not lru then
lru = lrucache.new(opts.lru_size or 100)
lru_gc.lru = lru
end

self.lru = lru
self.c_lru_gc = c_lru_gc

lru_gc.count = lru_gc.count + 1
end

return setmetatable(self, mt)
end

Expand Down Expand Up @@ -377,7 +433,7 @@ function _M:get(key, opts, cb, ...)
-- restrict this key to the current namespace, so we isolate this
-- mlcache instance from potential other instances using the same
-- shm
local namespaced_key = self.namespace .. key
local namespaced_key = self.name .. key

local err
data, err = get_shm_set_lru(self, key, namespaced_key)
Expand Down Expand Up @@ -473,7 +529,7 @@ function _M:peek(key)
-- restrict this key to the current namespace, so we isolate this
-- mlcache instance from potential other instances using the same
-- shm
local namespaced_key = self.namespace .. key
local namespaced_key = self.name .. key

local v, err = self.dict:get(namespaced_key)
if err then
Expand Down Expand Up @@ -515,7 +571,7 @@ function _M:set(key, opts, value)
-- mlcache instance from potential other instances using the same
-- shm
local ttl, neg_ttl = check_opts(self, opts)
local namespaced_key = self.namespace .. key
local namespaced_key = self.name .. key

set_lru(self, key, value, ttl, neg_ttl)

Expand Down Expand Up @@ -548,7 +604,7 @@ function _M:delete(key)
-- restrict this key to the current namespace, so we isolate this
-- mlcache instance from potential other instances using the same
-- shm
local namespaced_key = self.namespace .. key
local namespaced_key = self.name .. key

local ok, err = self.dict:delete(namespaced_key)
if not ok then
Expand Down
Loading

0 comments on commit 7a9d030

Please sign in to comment.