Skip to content

Commit 44096c1

Browse files
committed
crud: add readview support
Added readview support for select and pairs. Closes #343
1 parent 2d3d479 commit 44096c1

File tree

7 files changed

+3539
-10
lines changed

7 files changed

+3539
-10
lines changed

crud.lua

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ local borders = require('crud.borders')
2020
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
2121
local utils = require('crud.common.utils')
2222
local stats = require('crud.stats')
23+
local readview = require('crud.readview')
2324

2425
local crud = {}
2526

@@ -147,6 +148,10 @@ crud.reset_stats = stats.reset
147148
-- @function storage_info
148149
crud.storage_info = utils.storage_info
149150

151+
-- @refer readview.new
152+
-- @function new
153+
crud.readview = readview.new
154+
150155
--- Initializes crud on node
151156
--
152157
-- Exports all functions that are used for calls
@@ -174,6 +179,7 @@ function crud.init_storage()
174179
count.init()
175180
borders.init()
176181
sharding_metadata.init()
182+
readview.init()
177183

178184
_G._crud.storage_info_on_storage = utils.storage_info_on_storage
179185
end

crud/readview.lua

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
local fiber = require('fiber')
2+
local checks = require('checks')
3+
local errors = require('errors')
4+
local tarantool = require('tarantool')
5+
6+
7+
local stash = require('crud.common.stash')
8+
local utils = require('crud.common.utils')
9+
local sharding = require('crud.common.sharding')
10+
local select_executor = require('crud.select.executor')
11+
local select_filters = require('crud.compare.filters')
12+
local dev_checks = require('crud.common.dev_checks')
13+
local schema = require('crud.common.schema')
14+
local select = require('crud.select.compat.select')
15+
local stats = require('crud.stats')
16+
17+
local ReadviewError = errors.new_class('ReadviewError', {capture_stack = false})
18+
19+
local readview = {}
20+
21+
22+
local function readview_open_on_storage(readview_name)
23+
local result = {}
24+
if not utils.tarantool_version_at_least(2, 11, 0) or
25+
tarantool.package ~= 'Tarantool Enterprise' then
26+
result.replica_info = nil
27+
result.err = ReadviewError:new("Tarantool does not support readview")
28+
return result
29+
end
30+
local read_view = box.read_view.open({name = readview_name})
31+
32+
if read_view == nil then
33+
result.replica_info = nil
34+
result.err = ReadviewError:new("Error creating readview")
35+
return result
36+
end
37+
38+
local replica_info = {}
39+
replica_info.uuid = box.info().uuid
40+
replica_info.id = read_view.id
41+
result.replica_info = replica_info
42+
43+
return result
44+
end
45+
46+
local function readview_close_on_storage(readview_uuid)
47+
dev_checks('table')
48+
49+
local list = box.read_view.list()
50+
local readview_id
51+
for _, replica_info in pairs(readview_uuid) do
52+
if replica_info.uuid == box.info().uuid then
53+
readview_id = replica_info.id
54+
end
55+
end
56+
57+
for k,v in pairs(list) do
58+
if v.id == readview_id then
59+
list[k]:close()
60+
end
61+
end
62+
63+
return nil
64+
end
65+
66+
local function select_readview_on_storage(space_name, index_id, conditions, opts, readview_id)
67+
dev_checks('string', 'number', '?table', {
68+
scan_value = 'table',
69+
after_tuple = '?table',
70+
tarantool_iter = 'number',
71+
limit = 'number',
72+
scan_condition_num = '?number',
73+
field_names = '?table',
74+
sharding_key_hash = '?number',
75+
sharding_func_hash = '?number',
76+
skip_sharding_hash_check = '?boolean',
77+
yield_every = '?number',
78+
fetch_latest_metadata = '?boolean',
79+
}, 'number')
80+
81+
local cursor = {}
82+
if opts.fetch_latest_metadata then
83+
local replica_schema_version
84+
if box.info.schema_version ~= nil then
85+
replica_schema_version = box.info.schema_version
86+
else
87+
replica_schema_version = box.internal.schema_version()
88+
end
89+
cursor.storage_info = {
90+
replica_uuid = box.info().uuid,
91+
replica_schema_version = replica_schema_version,
92+
}
93+
end
94+
95+
local list = box.read_view.list()
96+
local space
97+
98+
for k,v in pairs(list) do
99+
if v.id == readview_id then
100+
space = list[k].space[space_name]
101+
end
102+
end
103+
104+
if space == nil then
105+
return cursor, ReadviewError:new("Space %q doesn't exist", space_name)
106+
end
107+
108+
space.format = box.space[space_name]:format()
109+
local space_format = box.space[space_name]
110+
if space_format == nil then
111+
return cursor, ReadviewError:new("Space %q doesn't exist", space_name)
112+
end
113+
114+
local index = space.index[index_id]
115+
local index_format = space_format.index[index_id]
116+
if index == nil then
117+
return cursor, ReadviewError:new("Index with ID %s doesn't exist", index_id)
118+
end
119+
120+
local _, err = sharding.check_sharding_hash(space_name,
121+
opts.sharding_func_hash,
122+
opts.sharding_key_hash,
123+
opts.skip_sharding_hash_check)
124+
125+
if err ~= nil then
126+
return nil, err
127+
end
128+
129+
local filter_func, err = select_filters.gen_func(space_format, conditions, {
130+
tarantool_iter = opts.tarantool_iter,
131+
scan_condition_num = opts.scan_condition_num,
132+
})
133+
if err ~= nil then
134+
return cursor, ReadviewError:new("Failed to generate tuples filter: %s", err)
135+
end
136+
137+
-- execute select
138+
local resp, err = select_executor.execute(space_format, index_format, filter_func, {
139+
scan_value = opts.scan_value,
140+
after_tuple = opts.after_tuple,
141+
tarantool_iter = opts.tarantool_iter,
142+
limit = opts.limit,
143+
yield_every = opts.yield_every,
144+
readview = true,
145+
readview_index = index
146+
})
147+
if err ~= nil then
148+
return cursor, ReadviewError:new("Failed to execute select: %s", err)
149+
end
150+
151+
if resp.tuples_fetched < opts.limit or opts.limit == 0 then
152+
cursor.is_end = true
153+
else
154+
local last_tuple = resp.tuples[#resp.tuples]
155+
cursor.after_tuple = last_tuple
156+
end
157+
158+
cursor.stats = {
159+
tuples_lookup = resp.tuples_lookup,
160+
tuples_fetched = resp.tuples_fetched,
161+
}
162+
163+
-- getting tuples with user defined fields (if `fields` option is specified)
164+
-- and fields that are needed for comparison on router (primary key + scan key)
165+
local filtered_tuples = schema.filter_tuples_fields(resp.tuples, opts.field_names)
166+
167+
local result = {cursor, filtered_tuples}
168+
169+
local select_module_compat_info = stash.get(stash.name.select_module_compat_info)
170+
if not select_module_compat_info.has_merger then
171+
if opts.fetch_latest_metadata then
172+
result[3] = cursor.storage_info.replica_schema_version
173+
end
174+
end
175+
176+
return unpack(result)
177+
end
178+
179+
local Readview_obj = {}
180+
Readview_obj.__index = Readview_obj
181+
182+
local select_call = stats.wrap(select.call, stats.op.SELECT)
183+
184+
function Readview_obj:select(space_name, user_conditions, opts)
185+
opts = opts or {}
186+
opts.readview = true
187+
opts.readview_uuid = self._uuid
188+
189+
return select_call(space_name, user_conditions, opts)
190+
end
191+
192+
local pairs_call = stats.wrap(select.pairs, stats.op.SELECT, {pairs = true})
193+
function Readview_obj:pairs(space_name, user_conditions, opts)
194+
opts = opts or {}
195+
opts.readview = true
196+
opts.readview_uuid = self._uuid
197+
198+
return pairs_call(space_name, user_conditions, opts)
199+
end
200+
201+
function readview.init()
202+
_G._crud.readview_open_on_storage = readview_open_on_storage
203+
_G._crud.readview_close_on_storage = readview_close_on_storage
204+
_G._crud.select_readview_on_storage = select_readview_on_storage
205+
end
206+
207+
function Readview_obj:close(opts)
208+
checks('table', {
209+
timeout = '?number',
210+
})
211+
opts = opts or {}
212+
if self._opened == false then
213+
return
214+
end
215+
216+
local opts = {}
217+
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
218+
if err ~= nil then
219+
return ReadviewError:new(err)
220+
end
221+
222+
local replicasets, err = vshard_router:routeall()
223+
if err ~= nil then
224+
return ReadviewError:new(err)
225+
end
226+
if opts.timeout == nil then
227+
opts.timeout = 3
228+
end
229+
230+
local results = {}
231+
local errors = {}
232+
for _, replicaset in pairs(replicasets) do
233+
for replica_uuid, replica in pairs(replicaset.replicas) do
234+
for _, value in pairs(self._uuid) do
235+
if replica_uuid == value.uuid then
236+
local replica_result, replica_err = replica.conn:call('_crud.readview_close_on_storage',
237+
{self._uuid}, {timeout = opts.timeout})
238+
table.insert(results, replica_result)
239+
if replica_err ~= nil then
240+
table.insert(errors, ReadviewError:new("Failed to close Readview on storage: %s", replica_err))
241+
end
242+
end
243+
end
244+
end
245+
end
246+
247+
if next(errors) ~= nil then
248+
return errors
249+
end
250+
251+
self._opened = false
252+
return nil
253+
254+
end
255+
256+
function Readview_obj:__gc()
257+
fiber.new(self.close, self)
258+
end
259+
260+
function Readview_obj.create(name, vshard_router, opts)
261+
local readview = {}
262+
setmetatable(readview, Readview_obj)
263+
readview._name = name
264+
local results, err = vshard_router:map_callrw('_crud.readview_open_on_storage', {readview._name}, opts)
265+
266+
if err ~= nil then
267+
return nil, ReadviewError:new("Failed to call readview_open_on_storage on storage-side: %s", err)
268+
end
269+
270+
local uuid = {}
271+
local errors = {}
272+
for _, replicaset_results in pairs(results) do
273+
for _, replica_result in pairs(replicaset_results) do
274+
if replica_result.err ~= nil then
275+
table.insert(errors, replica_result.err)
276+
end
277+
table.insert(uuid, replica_result.replica_info)
278+
end
279+
end
280+
281+
readview._uuid = uuid
282+
readview._opened = true
283+
284+
if next(errors) ~= nil then
285+
return nil, errors
286+
end
287+
return readview, nil
288+
end
289+
290+
function readview.new(readview_name)
291+
checks('?string')
292+
local opts = {}
293+
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
294+
if err ~= nil then
295+
return nil, ReadviewError:new(err)
296+
end
297+
298+
local readview_obj, err = Readview_obj.create(readview_name, vshard_router, opts)
299+
300+
return readview_obj, err
301+
end
302+
303+
304+
return readview

0 commit comments

Comments
 (0)