Skip to content

Commit 8427353

Browse files
committed
[TNTP-2109] Remove atomic from single calls
1 parent 11364e4 commit 8427353

File tree

8 files changed

+144
-192
lines changed

8 files changed

+144
-192
lines changed

crud/common/call.lua

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,28 +137,23 @@ local function call_with_retry_and_recovery(vshard_router,
137137
-- This is a partial copy of error handling from vshard.router.router_call_impl()
138138
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
139139
if err.class_name == bucket_ref_unref.BucketRefError.name then
140-
local redirect_replicaset
141140
if is_single_call and #err.bucket_ref_errs == 1 then
142141
local single_err = err.bucket_ref_errs[1]
143142
local destination = single_err.vshard_err.destination
144143
if destination and vshard_router.replicasets[destination] then
145-
redirect_replicaset = vshard_router.replicasets[destination]
144+
replicaset = vshard_router.replicasets[destination]
146145
end
147146
end
148147

149148
for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
150149
local bucket_id = bucket_ref_err.bucket_id
151150
local vshard_err = bucket_ref_err.vshard_err
152151
if vshard_err.name == 'WRONG_BUCKET' or
153-
vshard_err.name == 'BUCKET_IS_LOCKED' or
154-
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
152+
vshard_err.name == 'BUCKET_IS_LOCKED' or
153+
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
155154
vshard_router:_bucket_reset(bucket_id)
156155
end
157156
end
158-
159-
if redirect_replicaset ~= nil then
160-
replicaset = redirect_replicaset
161-
end
162157
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
163158
replicaset:locate_master()
164159
end
Lines changed: 46 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
--- module to call vshard.storage.bucket_ref / vshard.storage.bucket_unref
1+
--- Module to call vshard.storage.bucket_ref / vshard.storage.bucket_unref
22
--- on write requests
33
--- there are two modes: safe and fast. on safe mode module
44
--- calls vshard.storage.bucket_ref / vshard.storage.bucket_unref
55
--- on fast mode it does nothing.
6-
--- default is fast mode.
6+
--- Default is fast mode.
77

88
--- bucket_ref/bucket_unref must be called in one transaction in order to prevent
99
--- safe_mode change during execution.
@@ -15,12 +15,12 @@ local rebalance = require('crud.common.rebalance')
1515
local safe_methods
1616
local fast_methods
1717

18-
local M = {
18+
local bucket_ref_unref = {
1919
BucketRefError = errors.new_class('bucket_ref_error', {capture_stack = false})
2020
}
2121

2222
local function make_bucket_ref_err(bucket_id, vshard_ref_err)
23-
local err = M.BucketRefError:new(
23+
local err = bucket_ref_unref.BucketRefError:new(
2424
"failed bucket_ref: %s, bucket_id: %s",
2525
vshard_ref_err.name,
2626
bucket_id
@@ -34,83 +34,74 @@ local function make_bucket_ref_err(bucket_id, vshard_ref_err)
3434
return err
3535
end
3636

37-
--- on module initialization safe_mode_status func must be set
38-
--- it's rebalance.safe_mode_status
39-
function M.safe_mode_status()
40-
error('safe_mode_status not set')
41-
end
42-
43-
--- Slow bucket_refrw implementation that calls vshard.storage.bucket_refrw.
37+
--- Safe bucket_refrw implementation that calls vshard.storage.bucket_refrw.
4438
--- must be called with bucket_unrefrw in transaction
45-
function M._bucket_refrw(bucket_id)
39+
function bucket_ref_unref._bucket_refrw(bucket_id)
4640
local ref_ok, vshard_ref_err = vshard.storage.bucket_refrw(bucket_id)
4741
if not ref_ok then
48-
return false, make_bucket_ref_err(bucket_id, vshard_ref_err)
42+
return nil, make_bucket_ref_err(bucket_id, vshard_ref_err)
4943
end
5044

5145
return true
5246
end
5347

54-
--- Slow bucket_unrefrw implementation that calls vshard.storage.bucket_unrefrw.
48+
--- Safe bucket_unrefrw implementation that calls vshard.storage.bucket_unrefrw.
5549
--- must be called with bucket_refrw in transaction
56-
function M._bucket_unrefrw(bucket_id)
50+
function bucket_ref_unref._bucket_unrefrw(bucket_id)
5751
return vshard.storage.bucket_unrefrw(bucket_id)
5852
end
5953

60-
--- Slow bucket_refro implementation that calls vshard.storage.bucket_refro.
61-
function M._bucket_refro(bucket_id)
54+
--- Safe bucket_refro implementation that calls vshard.storage.bucket_refro.
55+
function bucket_ref_unref._bucket_refro(bucket_id)
6256
local ref_ok, vshard_ref_err = vshard.storage.bucket_refro(bucket_id)
6357
if not ref_ok then
64-
return false, make_bucket_ref_err(bucket_id, vshard_ref_err)
58+
return nil, make_bucket_ref_err(bucket_id, vshard_ref_err)
6559
end
6660

6761
return true
6862
end
6963

70-
--- Slow bucket_unrefro implementation that calls vshard.storage.bucket_unrefro.
64+
--- Safe bucket_unrefro implementation that calls vshard.storage.bucket_unrefro.
7165
--- must be called in one transaction with bucket_refrw_many
72-
function M._bucket_unrefro(bucket_id)
66+
function bucket_ref_unref._bucket_unrefro(bucket_id)
7367
return vshard.storage.bucket_unrefro(bucket_id)
7468
end
7569

76-
--- Slow bucket_refrw_many that calls bucket_refrw for every bucket and aggregates errors
70+
--- Safe bucket_refrw_many that calls bucket_refrw for every bucket and aggregates errors
7771
--- @param bucket_ids table<number, boolean>
78-
function M._bucket_refrw_many(bucket_ids)
72+
function bucket_ref_unref._bucket_refrw_many(bucket_ids)
7973
local bucket_ref_errs = {}
8074
local reffed_bucket_ids = {}
8175
for bucket_id in pairs(bucket_ids) do
8276
local ref_ok, bucket_refrw_err = safe_methods.bucket_refrw(bucket_id)
8377
if not ref_ok then
84-
8578
table.insert(bucket_ref_errs, bucket_refrw_err.bucket_ref_errs[1])
86-
goto continue
79+
break
8780
end
88-
8981
reffed_bucket_ids[bucket_id] = true
90-
::continue::
9182
end
9283

9384
if next(bucket_ref_errs) ~= nil then
94-
local err = M.BucketRefError:new(M.BucketRefError:new("failed bucket_ref"))
85+
local err = bucket_ref_unref.BucketRefError:new(bucket_ref_unref.BucketRefError:new("failed bucket_ref"))
9586
err.bucket_ref_errs = bucket_ref_errs
96-
safe_methods.bucket_unrefrw_many(reffed_bucket_ids)
87+
bucket_ref_unref._bucket_unrefrw_many(reffed_bucket_ids)
9788
return nil, err
9889
end
9990

10091
return true
10192
end
10293

103-
--- Slow bucket_unrefrw_many that calls vshard.storage.bucket_unrefrw for every bucket.
94+
--- Safe bucket_unrefrw_many that calls vshard.storage.bucket_unrefrw for every bucket.
10495
--- must be called in one transaction with bucket_refrw_many
10596
--- will never happen in called in one transaction with bucket_refrw_many
106-
--- @param bucket_ids table<number, true>
107-
function M._bucket_unrefrw_many(bucket_ids)
97+
--- @param bucket_ids table<number, boolean>
98+
function bucket_ref_unref._bucket_unrefrw_many(bucket_ids)
10899
local unref_all_ok = true
109100
local unref_last_err
110101
for reffed_bucket_id in pairs(bucket_ids) do
111102
local unref_ok, unref_err = safe_methods.bucket_unrefrw(reffed_bucket_id)
112103
if not unref_ok then
113-
unref_all_ok = nil
104+
unref_all_ok = false
114105
unref_last_err = unref_err
115106
end
116107
end
@@ -122,31 +113,31 @@ function M._bucket_unrefrw_many(bucket_ids)
122113
end
123114

124115
--- _fast implements module logic for fast mode
125-
function M._fast()
116+
function bucket_ref_unref._fast()
126117
return true
127118
end
128119

129120
safe_methods = {
130-
bucket_refrw = M._bucket_refrw,
131-
bucket_unrefrw = M._bucket_unrefrw,
132-
bucket_refro = M._bucket_refro,
133-
bucket_unrefro = M._bucket_unrefro,
134-
bucket_refrw_many = M._bucket_refrw_many,
135-
bucket_unrefrw_many = M._bucket_unrefrw_many,
121+
bucket_refrw = bucket_ref_unref._bucket_refrw,
122+
bucket_unrefrw = bucket_ref_unref._bucket_unrefrw,
123+
bucket_refro = bucket_ref_unref._bucket_refro,
124+
bucket_unrefro = bucket_ref_unref._bucket_unrefro,
125+
bucket_refrw_many = bucket_ref_unref._bucket_refrw_many,
126+
bucket_unrefrw_many = bucket_ref_unref._bucket_unrefrw_many,
136127
}
137128

138129
fast_methods = {
139-
bucket_refrw = M._fast,
140-
bucket_unrefrw = M._fast,
141-
bucket_refro = M._fast,
142-
bucket_unrefro = M._fast,
143-
bucket_refrw_many = M._fast,
144-
bucket_unrefrw_many = M._fast,
130+
bucket_refrw = bucket_ref_unref._fast,
131+
bucket_unrefrw = bucket_ref_unref._fast,
132+
bucket_refro = bucket_ref_unref._fast,
133+
bucket_unrefro = bucket_ref_unref._fast,
134+
bucket_refrw_many = bucket_ref_unref._fast,
135+
bucket_unrefrw_many = bucket_ref_unref._fast,
145136
}
146137

147138
local function set_methods(methods)
148139
for method_name, func in pairs(methods) do
149-
M[method_name] = func
140+
bucket_ref_unref[method_name] = func
150141
end
151142
end
152143

@@ -158,32 +149,18 @@ local function set_fast_mode()
158149
set_methods(fast_methods)
159150
end
160151

161-
local hooks_registered = false
162-
163-
--- set safe mode func
164-
--- from rebalance.safe_mode_status
165-
function M.set_safe_mode_status(safe_mode_status)
166-
M.safe_mode_status = safe_mode_status
152+
if rebalance.safe_mode then
153+
set_safe_mode()
154+
else
155+
set_fast_mode()
156+
end
167157

168-
if safe_mode_status() then
158+
rebalance.on_safe_mode_toggle(function(is_enabled)
159+
if is_enabled then
169160
set_safe_mode()
170161
else
171162
set_fast_mode()
172163
end
164+
end)
173165

174-
if not hooks_registered then
175-
rebalance.on_safe_mode_toggle(function(is_enabled)
176-
if is_enabled then
177-
set_safe_mode()
178-
else
179-
set_fast_mode()
180-
end
181-
end)
182-
183-
hooks_registered = true
184-
end
185-
end
186-
187-
set_fast_mode()
188-
189-
return M
166+
return bucket_ref_unref

crud/delete.lua

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,29 +44,25 @@ local function delete_on_storage(space_name, key, field_names, opts)
4444
return nil, err
4545
end
4646

47-
local function make_delete()
48-
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(opts.bucket_id)
49-
if not ref_ok then
50-
return nil, bucket_ref_err
51-
end
52-
53-
-- add_space_schema_hash is false because
54-
-- reloading space format on router can't avoid delete error on storage
55-
local result = schema.wrap_box_space_func_result(space, 'delete', {key}, {
56-
add_space_schema_hash = false,
57-
field_names = field_names,
58-
noreturn = opts.noreturn,
59-
fetch_latest_metadata = opts.fetch_latest_metadata,
60-
})
61-
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(opts.bucket_id)
62-
if not unref_ok then
63-
return nil, err_unref
64-
end
47+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(opts.bucket_id)
48+
if not ref_ok then
49+
return nil, bucket_ref_err
50+
end
6551

66-
return result
52+
-- add_space_schema_hash is false because
53+
-- reloading space format on router can't avoid delete error on storage
54+
local result = schema.wrap_box_space_func_result(space, 'delete', {key}, {
55+
add_space_schema_hash = false,
56+
field_names = field_names,
57+
noreturn = opts.noreturn,
58+
fetch_latest_metadata = opts.fetch_latest_metadata,
59+
})
60+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(opts.bucket_id)
61+
if not unref_ok then
62+
return nil, err_unref
6763
end
6864

69-
return box.atomic(make_delete)
65+
return result
7066
end
7167

7268
delete.storage_api = {[DELETE_FUNC_NAME] = delete_on_storage}

crud/insert.lua

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,33 +43,29 @@ local function insert_on_storage(space_name, tuple, opts)
4343
return nil, err
4444
end
4545

46-
local function make_insert()
47-
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
48-
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
46+
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
47+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
4948

50-
if not ref_ok then
51-
return nil, bucket_ref_err
52-
end
49+
if not ref_ok then
50+
return nil, bucket_ref_err
51+
end
5352

54-
-- add_space_schema_hash is true only in case of insert_object
55-
-- the only one case when reloading schema can avoid insert error
56-
-- is flattening object on router
57-
local result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
58-
add_space_schema_hash = opts.add_space_schema_hash,
59-
field_names = opts.fields,
60-
noreturn = opts.noreturn,
61-
fetch_latest_metadata = opts.fetch_latest_metadata,
62-
})
63-
64-
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
65-
if not unref_ok then
66-
return nil, err_unref
67-
end
53+
-- add_space_schema_hash is true only in case of insert_object
54+
-- the only one case when reloading schema can avoid insert error
55+
-- is flattening object on router
56+
local result = schema.wrap_box_space_func_result(space, 'insert', {tuple}, {
57+
add_space_schema_hash = opts.add_space_schema_hash,
58+
field_names = opts.fields,
59+
noreturn = opts.noreturn,
60+
fetch_latest_metadata = opts.fetch_latest_metadata,
61+
})
6862

69-
return result
63+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
64+
if not unref_ok then
65+
return nil, err_unref
7066
end
7167

72-
return box.atomic(make_insert)
68+
return result
7369
end
7470

7571
insert.storage_api = {[INSERT_FUNC_NAME] = insert_on_storage}

crud/replace.lua

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,29 +43,25 @@ local function replace_on_storage(space_name, tuple, opts)
4343
return nil, err
4444
end
4545

46-
local function make_replace()
47-
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
48-
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
49-
if not ref_ok then
50-
return nil, bucket_ref_err
51-
end
52-
-- add_space_schema_hash is true only in case of replace_object
53-
-- the only one case when reloading schema can avoid insert error
54-
-- is flattening object on router
55-
local result = schema.wrap_box_space_func_result(space, 'replace', {tuple}, {
56-
add_space_schema_hash = opts.add_space_schema_hash,
57-
field_names = opts.fields,
58-
noreturn = opts.noreturn,
59-
fetch_latest_metadata = opts.fetch_latest_metadata,
60-
})
61-
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
62-
if not unref_ok then
63-
return nil, err_unref
64-
end
65-
return result
46+
local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
47+
local ref_ok, bucket_ref_err = bucket_ref_unref.bucket_refrw(bucket_id)
48+
if not ref_ok then
49+
return nil, bucket_ref_err
6650
end
67-
68-
return box.atomic(make_replace)
51+
-- add_space_schema_hash is true only in case of replace_object
52+
-- the only one case when reloading schema can avoid insert error
53+
-- is flattening object on router
54+
local result = schema.wrap_box_space_func_result(space, 'replace', {tuple}, {
55+
add_space_schema_hash = opts.add_space_schema_hash,
56+
field_names = opts.fields,
57+
noreturn = opts.noreturn,
58+
fetch_latest_metadata = opts.fetch_latest_metadata,
59+
})
60+
local unref_ok, err_unref = bucket_ref_unref.bucket_unrefrw(bucket_id)
61+
if not unref_ok then
62+
return nil, err_unref
63+
end
64+
return result
6965
end
7066

7167
replace.storage_api = {[REPLACE_FUNC_NAME] = replace_on_storage}

0 commit comments

Comments
 (0)