Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prepared) correct check for consensus timeout #24

Merged
merged 4 commits into from
Jan 16, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix(prepared) descriptive errors and nil query id catch
  • Loading branch information
thibaultcha committed Jan 16, 2016
commit 411e90670ffe1b500fef7a289f36f6c00862f290
2 changes: 1 addition & 1 deletion spec/integration/cassandra_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ describe("spawn_session()", function()
assert.falsy(err)
assert.is_table(res)

local res, err = session:execute("DROP KEYSPACE resty_cassandra_spec")
res, err = session:execute("DROP KEYSPACE resty_cassandra_spec")
assert.falsy(err)
assert.is_table(res)
assert.equal(0, #res)
Expand Down
18 changes: 13 additions & 5 deletions src/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,11 @@ function RequestHandler:wait_for_schema_consensus()
match, err = check_schema_consensus(self)
until match or err ~= nil or (time_utils.get_time() - start) < self.options.protocol_options.max_schema_consensus_wait

return err
if err ~= nil then
return err
elseif not match then
log.err("Waiting for schema consensus timed out")
end
end

function RequestHandler:send_on_next_coordinator(request)
Expand Down Expand Up @@ -757,7 +761,7 @@ local function prepare_query(request_handler, query)
local prepared_key_lock = prepared_key.."_lock"
local lock, lock_err, elapsed = lock_mutex(request_handler.options.prepared_shm, prepared_key_lock)
if lock_err then
return nil, lock_err
return nil, "Could not create lock for prepare request: "..lock_err
end

if elapsed and elapsed == 0 then
Expand All @@ -767,26 +771,30 @@ local function prepare_query(request_handler, query)
local res, err = request_handler:send(prepare_request)
if err then
return nil, err
elseif res.query_id == nil then
return nil, "Could not retrieve query id from prepare request"
end
query_id = res.query_id
local ok, cache_err = cache.set_prepared_query_id(request_handler.options, query, query_id)
if not ok then
return nil, cache_err
return nil, "Could not insert query id in cache for prepared query: "..cache_err
end
log.info("Query prepared for host "..request_handler.coordinator.address)
else
-- once the lock is resolved, all other workers can retry to get the query, and should
-- instantly succeed. We then skip the preparation part.
query_id, cache_err = cache.get_prepared_query_id(request_handler.options, query)
if cache_err then
return nil, cache_err
return nil, "Could not get query id from cache for prepared query: "..cache_err
elseif query_id == nil then
return nil, "No query id found in cache for prepared query"
end
end

-- UNLOCK MUTEX
lock_err = unlock_mutex(lock)
if lock_err then
return nil, "Error unlocking mutex for query preparation: "..lock_err
return nil, "Error unlocking mutex for query for prepare request: "..lock_err
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/cassandra/options.lua
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ local DEFAULTS = {
},
protocol_options = {
default_port = 9042,
max_schema_consensus_wait = 5000
max_schema_consensus_wait = 10000
},
socket_options = {
connect_timeout = 1000, -- ms
Expand Down