Closed
Description
Our aim is to get data from the Tarantool cluster using crud.pairs() with filtering by the second field of the PK index and batching.
We have a significant amount of data in the space we need to process (10Gb per replica set and 4 replica sets), and most of the tuples we need are located at the beginning of the index.
In this situation, we faced a 100% CPU load when CRUD try to return the last batch of data from each storage instance.
In this case storage instance will be locked until the index scan will be finished.
This is a bad situation, and it would be great to have an option in crud.pairs()
that allow us to enable fiber.yield
for storage instance if we can sacrifice data consistency.
Space definition:
return {
up = function()
local utils = require('migrator.utils')
local example_space = box.schema.space.create('example', { if_not_exists = true })
example_space:format({
{ name = "id", type = "integer" },
{ name = "code", type = "string" },
{ name = "channel", type = "string" },
{ name = "content", type = "string" },
{ name = "bucket_id", type = "unsigned" }
})
example_space:create_index("primary", { parts = { { field = "id" }, { field = "code" }, { field = "channel" } },
if_not_exists = true })
example_space:create_index("bucket_id", { parts = { { field = "bucket_id" } },
unique = false,
if_not_exists = true })
utils.register_sharding_key('example', {'id'})
return true
end
}
Script:
local crud = require('crud')
local json = require('json')
local log = require('log')
local vshard = require('vshard')
local function exapmle()
local replica_sets = {}
for k, v in ipairs(vshard.router.buckets_info()) do
if v.uuid ~= nil then
replica_sets[v.uuid] = k
end
end
log.info('REPLICA SETS: %s', json.encode(replica_sets))
local i = 0
for rs_uuid, bucket_id in pairs(replica_sets) do
log.info('Begin replica set processing: %s, %s', rs_uuid, bucket_id)
for _, object in crud.pairs('settings_object',
{ { '==', 'code', 'TEST' } },
{ batch_size = 1000, bucket_id = bucket_id, prefer_replica = true, timeout = 60 }) do
-- do some work
end
log.info('End replica set processing: %s, %s', rs_uuid, bucket_id)
end
end
return {
exapmle = exapmle
}