Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ jobs:
tarantool:
- '1.10'
- '2.8'
metrics:
- ''
- '1.0.0'
coveralls: [false]
include:
- tarantool: '2.11'
metrics: '1.0.0'
coveralls: true

runs-on: ubuntu-20.04
Expand All @@ -38,6 +42,11 @@ jobs:
- name: Install requirements
run: make deps

- name: Install metrics
if: matrix.metrics != ''
run: |
tarantoolctl rocks install metrics ${{ matrix.metrics }}

- name: Run linter
run: make lint

Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ build:

.PHONY: deps
deps:
$(TTCTL) rocks install cartridge 2.8.0
$(TTCTL) rocks install luacheck 0.26.0
$(TTCTL) rocks install luacov 0.13.0
$(TTCTL) rocks install luacov-coveralls 0.2.3-1 --server=http://luarocks.org
$(TTCTL) rocks install luatest 0.5.7
$(TTCTL) rocks install cartridge 2.7.9

.PHONY: deps-full
deps-full: deps
tarantoolctl rocks install metrics 1.0.0

.PHONY: lint
lint:
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,17 @@ tubes:
ttl: 60
tube_2:
driver: my_app.my_driver
cfg:
metrics: false
```

Be careful, `cfg` field in a cluster-wide config acts as a configuration
value for `api.cfg()` call. These configuration options are currently
supported:

* `metrics` - enable or disable stats collection by metrics.
metrics >= 0.11.0 is required. It is enabled by default.

## Running locally (as an example)

Install dependencies:
Expand Down
178 changes: 173 additions & 5 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,38 @@ local vshard = require('vshard')
local fiber = require('fiber')
local log = require('log')

local state = require('sharded_queue.state')
local time = require('sharded_queue.time')
local utils = require('sharded_queue.utils')

local cartridge_pool = require('cartridge.pool')
local cartridge_rpc = require('cartridge.rpc')
local is_metrics_package, metrics = pcall(require, "metrics")
local is_hotreload_package, hotreload = pcall(require, "cartridge.hotreload")

local stash_names = {
cfg = '__sharded_queue_cfg',
metrics_stats = '__sharded_queue_metrics_stats',
}

if is_hotreload_package then
for _, name in pairs(stash_names) do
hotreload.whitelist_globals({ name })
end
end

-- get a stash instance, initialize if needed
local function stash_get(name)
local instance = rawget(_G, name) or {}
rawset(_G, name, instance)
return instance
end

local remote_call = function(method, instance_uri, args, timeout)
local conn = cartridge_pool.connect(instance_uri)
return conn:call(method, { args }, { timeout = timeout })
end


local function validate_options(options)
if not options then return true end

Expand Down Expand Up @@ -402,8 +422,117 @@ function sharded_tube.drop(self)
end

local sharded_queue = {
tube = {}
tube = {},
cfg = stash_get(stash_names.cfg),
metrics_stats = stash_get(stash_names.metrics_stats),
}
if sharded_queue.cfg.metrics == nil then
sharded_queue.cfg.metrics = true
end

local function is_metrics_v_0_11_installed()
if not is_metrics_package or metrics.unregister_callback == nil then
return false
end
local counter = require('metrics.collectors.counter')
return counter.remove and true or false
end

local function metrics_create_collectors()
return {
calls = {
collector = metrics.counter(
"sharded_queue_calls",
"sharded_queue's number of calls"
),
values = {},
},
tasks = {
collector = metrics.gauge(
"sharded_queue_tasks",
"sharded_queue's number of tasks"
)
},
}
end

local function metrics_disable()
if sharded_queue.metrics_stats.callback then
metrics.unregister_callback(sharded_queue.metrics_stats.callback)
end
sharded_queue.metrics_stats.callback = nil

if sharded_queue.metrics_stats.collectors then
for _, c in pairs(sharded_queue.metrics_stats.collectors) do
metrics.registry:unregister(c.collector)
end
end
sharded_queue.metrics_stats.collectors = nil
end

local function metrics_enable()
-- Drop all collectors and a callback.
metrics_disable()

-- Set all collectors and the callback.
sharded_queue.metrics_stats.collectors = metrics_create_collectors()
local callback = function()
local metrics_stats = sharded_queue.metrics_stats
for tube_name, _ in pairs(sharded_queue.tube) do
local stat = sharded_queue.statistics(tube_name)
local collectors = metrics_stats.collectors
if collectors.calls.values[tube_name] == nil then
collectors.calls.values[tube_name] = {}
end
for k, v in pairs(stat.calls) do
local prev = metrics_stats.collectors.calls.values[tube_name][k] or 0
local inc = v - prev
metrics_stats.collectors.calls.collector:inc(inc, {
name = tube_name,
status = k,
})
metrics_stats.collectors.calls.values[tube_name][k] = v
end
for k, v in pairs(stat.tasks) do
metrics_stats.collectors.tasks.collector:set(v, {
name = tube_name,
status = k,
})
end
end
end

metrics.register_callback(callback)
sharded_queue.metrics_stats.callback = callback
return true
end

if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = is_metrics_v_0_11_installed()
end

function sharded_queue.cfg_call(_, options)
options = options or {}
if options.metrics == nil then
return
end

if type(options.metrics) ~= 'boolean' then
error('"metrics" must be a boolean')
end

if sharded_queue.cfg.metrics ~= options.metrics then
local tubes = cartridge.config_get_deepcopy('tubes') or {}

if tubes['cfg'] ~= nil and tubes['cfg'].metrics == nil then
error('tube "cfg" exist, unable to update a default configuration')
end

tubes['cfg'] = {metrics = options.metrics}
local ok, err = cartridge.config_patch_clusterwide({ tubes = tubes })
if not ok then error(err) end
end
end

function sharded_queue.statistics(tube_name)
if not tube_name then
Expand Down Expand Up @@ -448,6 +577,10 @@ end
function sharded_queue.create_tube(tube_name, options)
local tubes = cartridge.config_get_deepcopy('tubes') or {}

if tube_name == 'cfg' then
error('a tube name "cfg" is reserved')
end

if tubes[tube_name] ~= nil then
-- already exist --
return nil
Expand All @@ -472,12 +605,34 @@ local function init(opts)
rawset(_G, 'queue', sharded_queue)
end

local function validate_config(cfg)
if cfg['cfg'] == nil then
return
end

cfg = cfg['cfg']
if type(cfg) ~= 'table' then
error('"cfg" must be a table')
end
if cfg.metrics and type(cfg.metrics) ~= 'boolean' then
error('"cfg.metrics" must be a boolean')
end
if cfg.metrics and cfg.metrics == true then
if not is_metrics_v_0_11_installed() then
error("metrics >= 0.11.0 is required")
end
end
end

local function apply_config(cfg, opts)
local cfg_tubes = cfg.tubes or {}

-- try init tubes --
for tube_name, options in pairs(cfg_tubes) do
if sharded_queue.tube[tube_name] == nil then
if tube_name == 'cfg' then
if options.metrics ~= nil then
sharded_queue.cfg.metrics = options.metrics and true or false
end
elseif sharded_queue.tube[tube_name] == nil then
local self = setmetatable({
tube_name = tube_name,
wait_max = options.wait_max,
Expand All @@ -492,11 +647,17 @@ local function apply_config(cfg, opts)

-- try drop tubes --
for tube_name, _ in pairs(sharded_queue.tube) do
if cfg_tubes[tube_name] == nil then
if tube_name ~= 'cfg' and cfg_tubes[tube_name] == nil then
setmetatable(sharded_queue.tube[tube_name], nil)
sharded_queue.tube[tube_name] = nil
end
end

if sharded_queue.cfg.metrics then
metrics_enable()
else
metrics_disable()
end
end

-- FIXME: Remove when https://github.com/tarantool/cartridge/issues/308 resolved
Expand All @@ -523,6 +684,13 @@ return {
kick = queue_action_wrapper('kick'),
peek = queue_action_wrapper('peek'),
drop = queue_action_wrapper('drop'),

cfg = setmetatable({}, {
__index = sharded_queue.cfg,
__newindex = function() error("Use api.cfg() instead", 2) end,
__call = sharded_queue.cfg_call,
__serialize = function() return sharded_queue.cfg end,
}),
statistics = sharded_queue.statistics,
_VERSION = require('sharded_queue.version'),

Expand Down
7 changes: 5 additions & 2 deletions sharded_queue/storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ local tubes = {}
local function map_tubes(cfg_tubes)
local result = {}
for tube_name, tube_opts in pairs(cfg_tubes) do
local driver_name = tube_opts.driver or DEFAULT_DRIVER
result[tube_name] = get_driver(driver_name)
if tube_name['cfg'] ~= nil or tube_opts.enable == nil then
-- do not add 'cfg' as a tube
local driver_name = tube_opts.driver or DEFAULT_DRIVER
result[tube_name] = get_driver(driver_name)
end
end
return result
end
Expand Down
40 changes: 40 additions & 0 deletions test/api_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@ local g = t.group('api')
local api = require('sharded_queue.api')
local config = require('test.helper.config')
local utils = require('test.helper.utils')
local is_metrics_supported = utils.is_metrics_supported()

g.before_all(function()
g.queue_conn = config.cluster:server('queue-router').net_box
g.queue_conn_ro = config.cluster:server('queue-router-1').net_box
g.cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg")
end)

g.after_each(function()
g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {g.cfg})
end)

g.test_exported_api = function()
Expand Down Expand Up @@ -51,6 +57,40 @@ g.test_role_statistics = function()
t.assert_type(result, 'table')
end

g.test_create_cfg = function()
t.assert_error_msg_contains('a tube name "cfg" is reserved', function()
g.queue_conn:call('queue.create_tube', { 'cfg' })
end)
end

g.test_role_cfg_default = function()
local cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg")
t.assert_equals({metrics = is_metrics_supported}, cfg)
end

g.test_role_cfg_metrics_switch = function()
t.skip_if(not is_metrics_supported, "metrics >= 0.11.0 is not installed")

local cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg")
t.assert_equals({metrics = true}, cfg)

g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {{metrics = false}})
cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg")
t.assert_equals({metrics = false}, cfg)

g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {{metrics = true}})
cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg")
t.assert_equals({metrics = true}, cfg)
end

g.test_role_cfg_assignment = function()
t.skip_if(not is_metrics_supported, "metrics >= 0.11.0 is not installed")

t.assert_error_msg_contains('Use api.cfg() instead', function()
g.queue_conn:eval("require('sharded_queue.api').cfg['metrics'] = false")
end)
end

g.test_role_statistics_read_only_router = function()
--make sure queue_conn_ro is read_only
local ro = g.queue_conn_ro:eval("return box.cfg.read_only")
Expand Down
10 changes: 10 additions & 0 deletions test/helper/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ function utils.shape_cmd(tube_name, cmd)
return string.format('queue.tube.%s:%s', tube_name, cmd)
end

function utils.is_metrics_supported()
local is_package, metrics = pcall(require, "metrics")
if not is_package then
return false
end
-- metrics >= 0.11.0 is required
local counter = require('metrics.collectors.counter')
return metrics.unregister_callback and counter.remove and true or false
end

return utils
Loading