Conversation
WalkthroughThis PR introduces a new NodeGroupTerm() accessor and specialized ValidTermCheck() overrides for AcquireAllCc and PostWriteAllCc classes. The changes refactor term validation to use NodeGroupTerm instead of LeaderTerm across execution paths, add runtime validation checks, and simplify control flow in SplitFlushRangeOp::Forward to prevent deadlocks during transaction recovery. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/include/cc/cc_request.h (1)
7363-7370: GuardUnpinNodeGroupData()to only run when pin succeeded (unpin-without-pin bug).
KeyObjectStandbyForwardCc::Execute()setsupdated_local_seq_id_ = truebefore callingTryPinStandbyNodeGroupData(), and returns early on pin failure.Finish()then unconditionally unpins whenupdated_local_seq_id_ && table_name_->IsMeta(), resulting in unpinning even when the pin never succeeded. Track pin success with a dedicated member and guard the unpin accordingly.Proposed fix
// in KeyObjectStandbyForwardCc members bool updated_local_seq_id_{false}; +int64_t pinned_ng_term_{-1}; void Reset(std::unique_ptr<remote::CcMessage> msg) { ... updated_local_seq_id_ = false; + pinned_ng_term_ = -1; ... } bool Execute(CcShard &ccs) override { ... updated_local_seq_id_ = true; if (table_name_->IsMeta()) { int64_t term = Sharder::Instance().TryPinStandbyNodeGroupData(); if (term < 0) { return SetFinish(ccs); } + pinned_ng_term_ = term; } ... } void Finish(CcShard &shard) { - if (updated_local_seq_id_ && table_name_->IsMeta()) + if (updated_local_seq_id_ && table_name_->IsMeta() && pinned_ng_term_ >= 0) { Sharder::Instance().UnpinNodeGroupData(node_group_id_); } ... }Also applies to: 7471-7476
🤖 Fix all issues with AI agents
In @tx_service/include/cc/cc_request.h:
- Around line 534-567: ValidTermCheck in AcquireAllCc can race on the member
ng_term_ when parallel_req_ is true and also uses assert(ng_term > 0); fix by
making ng_term_ initialization thread-safe and replacing the assert with a
runtime check: change ng_term_ to an atomic<int64_t> (or protect accesses with a
mutex) and use an atomic compare-exchange to set ng_term_ only if it is still
<0, and replace assert(ng_term > 0) with a conditional that returns false when
ng_term <= 0; keep the existing comparisons against ng_term, tx_term_, and
node_group_id_ (Txn(), tx_term_, node_group_id_) but ensure all reads/writes to
ng_term_ use the chosen atomic/mutex protection.
In @tx_service/include/cc/range_cc_map.h:
- Around line 606-608: Replace the assertion on ng_term with a runtime
validation before calling InitKeyCache()/pinning (which triggers
range->pins_.fetch_add()); specifically, check that ng_term (from
req.NodeGroupTerm()) is > 0, and if not handle the error path (e.g., return
false or propagate an error) instead of asserting so pins are not incremented in
release builds.
In @tx_service/include/cc/template_cc_map.h:
- Around line 702-705: The code currently uses assert(ng_term > 0) after setting
int64_t ng_term = req.NodeGroupTerm(); which is unsafe in release builds;
replace the assert with a runtime guard that checks ng_term > 0 and handles the
invalid case before any lock metadata or lock operations use ng_term.
Specifically, inside the Execute override in template_cc_map.h, validate
req.NodeGroupTerm() into ng_term, and if ng_term <= 0 then log or return an
appropriate error/status (or abort the operation) rather than proceeding; ensure
this mirrors the behavior of ValidTermCheck where applicable and prevents
propagation of an invalid term into lock operations that reference
ng_id/ng_term.
🧹 Nitpick comments (2)
tx_service/src/tx_operation.cpp (1)
4196-4200: Comment is helpful; consider making the “forward on candidate leader” behavior explicit in code structureThe rationale is solid, but since this is subtle behavior, consider extracting this whole “Recovering + term mismatch” gate into a small helper (e.g.,
HandleInvalidRecoveredTermOrContinue(txm)) so future edits don’t accidentally reintroduce the old stall condition.tx_service/include/cc/cc_request.h (1)
961-994: Deduplicate and verify concurrency assumptions forPostWriteAllCc::ValidTermCheck()(same logic asAcquireAllCc).
This is now copy/pasted term logic; consider extracting a shared helper (to avoid drift). Also, ifPostWriteAllCcis ever shared across cores concurrently (like other “All” requests), it may need the same synchronization treatment asAcquireAllCc.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
tx_service/include/cc/cc_request.htx_service/include/cc/range_cc_map.htx_service/include/cc/template_cc_map.htx_service/src/tx_operation.cpp
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
Applied to files:
tx_service/include/cc/range_cc_map.htx_service/include/cc/cc_request.htx_service/include/cc/template_cc_map.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.
Applied to files:
tx_service/include/cc/range_cc_map.htx_service/include/cc/template_cc_map.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.
Applied to files:
tx_service/include/cc/range_cc_map.h
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.
Applied to files:
tx_service/include/cc/template_cc_map.h
🧬 Code graph analysis (2)
tx_service/src/tx_operation.cpp (4)
tx_service/src/sk_generator.cpp (6)
lk(570-570)lk(585-585)lk(598-598)lk(627-627)lk(1040-1040)lk(1046-1046)tx_service/src/cc/local_cc_shards.cpp (3)
lk(274-274)lk(402-402)lk(431-431)tx_service/src/dead_lock_check.cpp (3)
lk(160-160)lk(513-513)lk(525-525)tx_service/src/checkpointer.cpp (4)
lk(409-409)lk(473-473)lk(483-483)lk(489-489)
tx_service/include/cc/cc_request.h (2)
tx_service/src/cc/cc_req_misc.cpp (8)
ValidTermCheck(88-125)ValidTermCheck(88-88)ValidTermCheck(852-877)ValidTermCheck(852-852)ValidTermCheck(1016-1027)ValidTermCheck(1016-1016)ValidTermCheck(1117-1127)ValidTermCheck(1117-1117)tx_service/include/sharder.h (1)
Sharder(170-186)
🔇 Additional comments (1)
tx_service/include/cc/cc_request.h (1)
286-290: Nice addition:NodeGroupTerm()accessor is clean and helps standardize term propagation.
| bool ValidTermCheck() override | ||
| { | ||
| int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_); | ||
| int64_t ng_candidate_term = | ||
| Sharder::Instance().CandidateLeaderTerm(node_group_id_); | ||
| ng_term = std::max(ng_term, ng_candidate_term); | ||
|
|
||
| if (ng_term < 0) | ||
| { | ||
| return false; | ||
| } | ||
| else | ||
| { | ||
| uint32_t tx_ng_id = (Txn() >> 32L) >> 10; | ||
| if (tx_ng_id == node_group_id_ && ng_term != tx_term_) | ||
| { | ||
| // The request is processed on the coordinator candidate leader, | ||
| // but the term is mismatch. | ||
| return false; | ||
| } | ||
| } | ||
| assert(ng_term > 0); | ||
|
|
||
| if (ng_term_ < 0) | ||
| { | ||
| ng_term_ = ng_term; | ||
| } | ||
| else if (ng_term != ng_term_) | ||
| { | ||
| return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
There was a problem hiding this comment.
Fix potential data race on ng_term_ in parallel AcquireAllCc::ValidTermCheck() + avoid assert-only term validation.
AcquireAllCc is marked parallel_req_ = true, but ValidTermCheck() can write ng_term_ concurrently across cores (UB). Also, assert(ng_term > 0) can crash debug builds if term can be 0 (many other paths only treat <0 as invalid).
Proposed fix (serialize `ng_term_` initialization + make term validation non-assert-only)
bool ValidTermCheck() override
{
int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_);
int64_t ng_candidate_term =
Sharder::Instance().CandidateLeaderTerm(node_group_id_);
ng_term = std::max(ng_term, ng_candidate_term);
if (ng_term < 0)
{
return false;
}
else
{
- uint32_t tx_ng_id = (Txn() >> 32L) >> 10;
+ // Txn encodes ng_id in the high bits; this is effectively >> 42.
+ uint32_t tx_ng_id = static_cast<uint32_t>(Txn() >> 42);
if (tx_ng_id == node_group_id_ && ng_term != tx_term_)
{
// The request is processed on the coordinator candidate leader,
// but the term is mismatch.
return false;
}
}
- assert(ng_term > 0);
+ // Don't rely on assert for validity.
+ if (ng_term <= 0)
+ {
+ return false;
+ }
- if (ng_term_ < 0)
- {
- ng_term_ = ng_term;
- }
- else if (ng_term != ng_term_)
- {
- return false;
- }
+ // AcquireAllCc runs in parallel across cores; protect ng_term_ init/check.
+ {
+ std::lock_guard<std::mutex> lk(mux_);
+ if (ng_term_ < 0)
+ {
+ ng_term_ = ng_term;
+ }
+ else if (ng_term != ng_term_)
+ {
+ return false;
+ }
+ }
return true;
}🤖 Prompt for AI Agents
In @tx_service/include/cc/cc_request.h around lines 534 - 567, ValidTermCheck in
AcquireAllCc can race on the member ng_term_ when parallel_req_ is true and also
uses assert(ng_term > 0); fix by making ng_term_ initialization thread-safe and
replacing the assert with a runtime check: change ng_term_ to an atomic<int64_t>
(or protect accesses with a mutex) and use an atomic compare-exchange to set
ng_term_ only if it is still <0, and replace assert(ng_term > 0) with a
conditional that returns false when ng_term <= 0; keep the existing comparisons
against ng_term, tx_term_, and node_group_id_ (Txn(), tx_term_, node_group_id_)
but ensure all reads/writes to ng_term_ use the chosen atomic/mutex protection.
| int64_t ng_term = req.NodeGroupTerm(); | ||
| assert(ng_term > 0); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the PostWriteAllCc class and its ValidTermCheck implementation
rg -n "class PostWriteAllCc" --type cpp -A 20Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search for ValidTermCheck implementations and overrides
rg -n "ValidTermCheck" --type cpp -B 2 -A 5Repository: eloqdata/tx_service
Length of output: 27257
🏁 Script executed:
# Search for Execute(PostWriteAllCc in the actual file to understand the context
rg -n "Execute.*PostWriteAllCc" tx_service/include/cc/range_cc_map.h -B 5 -A 20Repository: eloqdata/tx_service
Length of output: 1106
🏁 Script executed:
# Let's examine the specific file and lines mentioned
head -650 tx_service/include/cc/range_cc_map.h | tail -60Repository: eloqdata/tx_service
Length of output: 3031
🏁 Script executed:
# Search for TryPinNodeGroupData, PinNodeGroupData, UnpinNodeGroupData in range_cc_map.h
rg -n "TryPinNodeGroupData|PinNodeGroupData|UnpinNodeGroupData" tx_service/include/cc/range_cc_map.h -B 3 -A 3Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look for the broader context around lines 606-608, particularly the PostCommit method
sed -n '550,700p' tx_service/include/cc/range_cc_map.h | cat -nRepository: eloqdata/tx_service
Length of output: 8383
🏁 Script executed:
# Search for NodeGroupTerm() usage and how it's validated elsewhere
rg -n "NodeGroupTerm()" tx_service/include/cc/range_cc_map.h -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 279
🏁 Script executed:
# Search for similar assertions on ng_term or node_group terms in the codebase
rg -n "assert.*ng_term.*[>0]|assert.*term.*[>0]" --type cpp tx_service/include/cc/ | head -20Repository: eloqdata/tx_service
Length of output: 921
🏁 Script executed:
# Check if ValidTermCheck() is called at the start of PostWriteAllCc::Execute() in range_cc_map.h
sed -n '400,450p' tx_service/include/cc/range_cc_map.h | cat -nRepository: eloqdata/tx_service
Length of output: 2619
🏁 Script executed:
# Search for local_cc_shards to understand if pin/unpin operations are involved
rg -n "TryPinNodeGroupData|local_shards_|PostProcessFlushTaskEntries" tx_service/include/cc/range_cc_map.h -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 4238
🏁 Script executed:
# Check PostWriteAllCc::Execute in cc_request.h to see the full implementation
rg -n "class PostWriteAllCc" tx_service/include/cc/cc_request.h -A 100 | head -120Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Find the full Execute() implementation for PostWriteAllCc in cc_request.h
rg -n "class PostWriteAllCc" tx_service/include/cc/cc_request.h -A 200 | grep -A 50 "bool Execute"Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search for PostWriteAllCc declaration and where ValidTermCheck is called
rg -n "PostWriteAllCc" tx_service/include/cc/cc_request.h | head -20Repository: eloqdata/tx_service
Length of output: 694
🏁 Script executed:
# Look at the actual PostWriteAllCc class definition to see if it has an Execute that calls ValidTermCheck
sed -n '940,1100p' tx_service/include/cc/cc_request.h | cat -nRepository: eloqdata/tx_service
Length of output: 5717
🏁 Script executed:
# Check if InitKeyCache uses pin/unpin operations
rg -n "InitKeyCache" --type cpp -A 10 | head -50Repository: eloqdata/tx_service
Length of output: 3701
🏁 Script executed:
# Search for TemplateCcMap<KeyT, RangeRecord>::Execute to see if it calls ValidTermCheck
rg -n "class TemplateCcMap" tx_service/include/cc/template_cc_map.h -A 5Repository: eloqdata/tx_service
Length of output: 292
🏁 Script executed:
# Look for Execute method in TemplateCcMap that handles PostWriteAllCc
rg -n "bool Execute\(PostWriteAllCc" tx_service/include/cc/template_cc_map.h -B 5 -A 20Repository: eloqdata/tx_service
Length of output: 918
🏁 Script executed:
# Check if there's a ValidTermCheck call at the start of the PostWriteAllCc execution path
sed -n '1,100p' tx_service/include/cc/template_cc_map.h | grep -A 10 "Execute.*PostWriteAllCc"Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Continue reading the Execute(PostWriteAllCc) method in template_cc_map.h to find ValidTermCheck call
sed -n '972,1050p' tx_service/include/cc/template_cc_map.h | cat -nRepository: eloqdata/tx_service
Length of output: 3466
🏁 Script executed:
# Search for where Execute() calls ValidTermCheck in the codebase
rg -n "if.*ValidTermCheck\|ValidTermCheck\(\)" tx_service/include/cc/cc_request.h -B 2 -A 3 | head -40Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look for the pattern where Execute delegates to parent class for PostWriteAllCc
rg -n "TemplateCcMap.*Execute.*req" tx_service/include/cc/range_cc_map.h -B 5 -A 5Repository: eloqdata/tx_service
Length of output: 1587
🏁 Script executed:
# Find where ValidTermCheck is called in the CcShard::Execute flow
rg -n "class CcShard" tx_service/include/cc/ --type cpp -A 50 | grep -A 30 "Execute.*CcRequestBase"Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look for the Execute method that dispatches to cc requests
rg -n "bool Execute\(CcRequestBase" tx_service/include/cc/cc_shard.h -B 5 -A 20Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Search in cc_shard.h for the main Execute dispatcher
rg -n "ValidTermCheck\(\)" tx_service/include/cc/cc_shard.h -B 5 -A 5Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Let me check the actual flow by looking at CcRequestBase and how Execute is called
rg -n "class CcRequestBase" tx_service/include/cc/cc_request.h -A 30 | head -50Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Look for the entry point that calls ValidTermCheck before Execute
rg -n "ValidTermCheck\(\)" tx_service/src/cc/ --type cpp -B 3 -A 8 | head -60Repository: eloqdata/tx_service
Length of output: 3739
🏁 Script executed:
# Check local_cc_shards.cpp for pin/unpin patterns mentioned in the learning
rg -n "TryPinNodeGroupData|UnpinNodeGroupData" tx_service/src/cc/local_cc_shards.cpp -B 3 -A 5 | head -80Repository: eloqdata/tx_service
Length of output: 3021
🏁 Script executed:
# Let me check if there are any conditional checks around InitKeyCache in similar code patterns
rg -n "InitKeyCache" tx_service/include/cc/ --type cpp -B 5 -A 5 | grep -A 5 -B 5 "if.*term\|assert.*term"Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check the full context around lines 606-650 in range_cc_map.h to see error handling
sed -n '600,750p' tx_service/include/cc/range_cc_map.h | cat -nRepository: eloqdata/tx_service
Length of output: 7870
Replace the assertion with a runtime check to guard the resource-pinning operation.
The code pins the range via InitKeyCache() (which calls range->pins_.fetch_add()) but uses assert(ng_term > 0) instead of a runtime check. Per the codebase pattern shown in PostProcessFlushTaskEntries (local_cc_shards.cpp:3303–3306), pin operations should be guarded with runtime checks (if (term >= 0)) rather than assertions, since assertions are disabled in release builds and could lead to resource leaks. Replace the assertion with:
if (ng_term <= 0) {
// Handle error: term validation failed
return false; // or appropriate error handling
}This ensures the pinning operation is only performed when the term is valid, matching the defensive pattern used elsewhere in the codebase.
🤖 Prompt for AI Agents
In @tx_service/include/cc/range_cc_map.h around lines 606 - 608, Replace the
assertion on ng_term with a runtime validation before calling
InitKeyCache()/pinning (which triggers range->pins_.fetch_add()); specifically,
check that ng_term (from req.NodeGroupTerm()) is > 0, and if not handle the
error path (e.g., return false or propagate an error) instead of asserting so
pins are not incremented in release builds.
| uint32_t ng_id = req.NodeGroupId(); | ||
| int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id); | ||
| if (ng_term < 0) | ||
| { | ||
| return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER); | ||
| } | ||
| int64_t ng_term = req.NodeGroupTerm(); | ||
| assert(ng_term > 0); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n --type=cpp --type=hpp -A 2 "NodeGroupTerm\s*\(" | head -80Repository: eloqdata/tx_service
Length of output: 90
🏁 Script executed:
rg -n --type=cpp --type=hpp "ValidTermCheck\s*\(" -A 3 | head -60Repository: eloqdata/tx_service
Length of output: 90
🏁 Script executed:
fd -e hpp -e cpp | xargs rg -l "class\s+AcquireAllCc\b|struct\s+AcquireAllCc\b"Repository: eloqdata/tx_service
Length of output: 47
🏁 Script executed:
rg -n --type=cpp --type=hpp "int64_t.*NodeGroupTerm|CcRequestBase.*NodeGroupTerm" -A 5 | head -100Repository: eloqdata/tx_service
Length of output: 90
🏁 Script executed:
rg -n "NodeGroupTerm\s*\(" -A 2 | head -100Repository: eloqdata/tx_service
Length of output: 5274
🏁 Script executed:
rg -n "ValidTermCheck" -A 3 | head -80Repository: eloqdata/tx_service
Length of output: 4720
🏁 Script executed:
fd -e h -e cpp | xargs grep -l "class AcquireAllCc\|struct AcquireAllCc" 2>/dev/nullRepository: eloqdata/tx_service
Length of output: 128
🏁 Script executed:
rg -n "int64_t.*NodeGroupTerm\(\)|NodeGroupTerm.*int64_t" -B 2 -A 5 | head -120Repository: eloqdata/tx_service
Length of output: 5127
🏁 Script executed:
rg -n "class AcquireAllCc" -A 50 | head -80Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
rg -n "AcquireAllCc::Execute" -A 100 | head -150Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
rg -n "SetCceLock|AcquireCceKeyLock" -B 2 -A 5 | head -120Repository: eloqdata/tx_service
Length of output: 8947
🏁 Script executed:
rg -n "AcquireAllCc" | head -50Repository: eloqdata/tx_service
Length of output: 3795
🏁 Script executed:
rg -n "Execute\(CcShard" -B 5 | grep -A 5 "AcquireAllCc"Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
fd -type f -name "*.h" | xargs grep -l "class AcquireAllCc"Repository: eloqdata/tx_service
Length of output: 235
🏁 Script executed:
cat tx_service/include/cc/template_cc_map.h | sed -n '690,750p'Repository: eloqdata/tx_service
Length of output: 2701
🏁 Script executed:
rg -n "AcquireAllCc::Reset|NextRequest\(" -B 3 -A 8 | head -100Repository: eloqdata/tx_service
Length of output: 7754
🏁 Script executed:
rg -n "ng_term_" tx_service/include/cc/cc_request.h -B 2 -A 2 | head -80Repository: eloqdata/tx_service
Length of output: 2386
🏁 Script executed:
rg -n "struct AcquireAllCc" -A 80 tx_service/include/cc/cc_request.h | head -120Repository: eloqdata/tx_service
Length of output: 2811
🏁 Script executed:
rg -n "AcquireAllCc.*Reset" -B 2 -A 15 | head -150Repository: eloqdata/tx_service
Length of output: 4148
🏁 Script executed:
rg -n "ValidTermCheck\|Execute" tx_service/src/cc/local_cc_shard.cpp -B 2 -A 2 | head -150Repository: eloqdata/tx_service
Length of output: 139
🏁 Script executed:
rg -n "EnqueueCcRequest\|Execute.*AcquireAllCc" -B 5 -A 5 | head -120Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
cat tx_service/include/cc/template_cc_map.h | sed -n '671,690p'Repository: eloqdata/tx_service
Length of output: 764
🏁 Script executed:
rg -n "class LocalCcHandler" -A 50 tx_service/include/cc/local_cc_handler.h | head -100Repository: eloqdata/tx_service
Length of output: 2304
🏁 Script executed:
rg -n "EnqueueCcRequest.*AcquireAllCc|ProcessCcRequest" -B 5 -A 10 | head -150Repository: eloqdata/tx_service
Length of output: 1086
🏁 Script executed:
rg -n "ValidTermCheck.*Execute\|Execute.*ValidTermCheck" -B 3 -A 3Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
rg -n "ValidTermCheck\(\)" -B 5 -A 5 | grep -A 10 -B 10 "AcquireAllCc\|TemplatedCcRequest"Repository: eloqdata/tx_service
Length of output: 4234
🏁 Script executed:
rg -n "CcShards::ProcessCcRequest\|LocalCcShards::ProcessCcRequest" -A 50 | head -120Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
rg -n "\.Execute\(" tx_service/src/cc/local_cc_shards.cpp -B 10 -A 5 | head -150Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
rg -n "Execute.*AcquireAllCc" tx_service/include/cc/template_cc_map.h -B 2 -A 50 | head -100Repository: eloqdata/tx_service
Length of output: 4117
🏁 Script executed:
rg -n "bool Execute\(AcquireAllCc" -B 5 -A 80 tx_service/include/cc/template_cc_map.h | head -150Repository: eloqdata/tx_service
Length of output: 3863
Replace assert(ng_term > 0) with a runtime guard that validates the term before using it in lock operations.
If req.NodeGroupTerm() returns <= 0, release builds will ignore the assert and propagate an invalid term into lock metadata. Since ng_term initializes to -1 and ValidTermCheck is bypassed in this Execute override, add explicit validation:
Suggested fix
uint32_t ng_id = req.NodeGroupId();
int64_t ng_term = req.NodeGroupTerm();
-assert(ng_term > 0);
+if (ng_term <= 0)
+{
+ return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
+}🤖 Prompt for AI Agents
In @tx_service/include/cc/template_cc_map.h around lines 702 - 705, The code
currently uses assert(ng_term > 0) after setting int64_t ng_term =
req.NodeGroupTerm(); which is unsafe in release builds; replace the assert with
a runtime guard that checks ng_term > 0 and handles the invalid case before any
lock metadata or lock operations use ng_term. Specifically, inside the Execute
override in template_cc_map.h, validate req.NodeGroupTerm() into ng_term, and if
ng_term <= 0 then log or return an appropriate error/status (or abort the
operation) rather than proceeding; ensure this mirrors the behavior of
ValidTermCheck where applicable and prevents propagation of an invalid term into
lock operations that reference ng_id/ng_term.
| if (!(*start_key < *slice_end_key)) | ||
| { | ||
| DLOG(ERROR) | ||
| << "!!!ERROR!!! start key: " << start_key->ToString() | ||
| << ", search key: " << search_key.ToString() | ||
| << ", slice start key: " | ||
| << typed_slice->StartKey()->ToString() | ||
| << ", slice end key: " << slice_end_key->ToString() | ||
| << ", export base table item: " << std::boolalpha | ||
| << req.export_base_table_item_ | ||
| << ", current slice index: " | ||
| << req.curr_slice_index_[shard_->core_id_] | ||
| << " on core: " << shard_->core_id_ | ||
| << ", table: " << table_name_.StringView(); | ||
| } |
There was a problem hiding this comment.
Don’t continue scanning when start_key >= slice_end_key (can violate slice boundaries).
Right now this only emits a DLOG(ERROR) and continues. If that invariant ever breaks, it/end_it can describe an invalid range (e.g., it past end_it), and the scan may export keys outside the slice.
At minimum: (1) make this visible in prod (not DLOG), and (2) force the slice to be treated as empty (or fail the request) to avoid out-of-range iteration.
Safer fallback (treat as empty slice)
end_it = deduce_iterator(*slice_end_key);
if (!(*start_key < *slice_end_key))
{
- DLOG(ERROR)
+ LOG(ERROR)
<< "!!!ERROR!!! start key: " << start_key->ToString()
<< ", search key: " << search_key.ToString()
<< ", slice start key: "
<< typed_slice->StartKey()->ToString()
<< ", slice end key: " << slice_end_key->ToString()
<< ", export base table item: " << std::boolalpha
<< req.export_base_table_item_
<< ", current slice index: "
<< req.curr_slice_index_[shard_->core_id_]
<< " on core: " << shard_->core_id_
<< ", table: " << table_name_.StringView();
+ // Prevent scanning out of the slice.
+ it = end_it;
}| if (txm->TxStatus() == TxnStatus::Recovering && | ||
| Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) < 0) | ||
| (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != | ||
| txm->TxTerm() && | ||
| Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) != txm->TxTerm())) | ||
| { | ||
| // This is a recovered tx and replay is not done yet. We should wait for | ||
| // replay finish before forwarding tx machine. | ||
| if (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != | ||
| txm->TxTerm()) | ||
| { | ||
| // Recovered term is invalid. Do not call ForceToFinish as it will | ||
| // cause infinite recursive call. Clean up tx state directly. | ||
| txm->bool_resp_->Finish(false); | ||
| // Recovered term is invalid. Do not call ForceToFinish as it will | ||
| // cause infinite recursive call. Clean up tx state directly. | ||
| txm->bool_resp_->Finish(false); | ||
|
|
||
| ClearInfos(); | ||
| ClearInfos(); | ||
|
|
||
| txm->state_stack_.pop_back(); | ||
| assert(txm->state_stack_.empty()); | ||
| txm->state_stack_.pop_back(); | ||
| assert(txm->state_stack_.empty()); | ||
|
|
||
| assert(this == txm->split_flush_op_.get()); | ||
| LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); | ||
| std::unique_lock<std::mutex> lk( | ||
| shards->split_flush_range_op_pool_mux_); | ||
| shards->split_flush_range_op_pool_.emplace_back( | ||
| std::move(txm->split_flush_op_)); | ||
| assert(txm->split_flush_op_ == nullptr); | ||
| } | ||
| assert(this == txm->split_flush_op_.get()); | ||
| LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); | ||
| std::unique_lock<std::mutex> lk(shards->split_flush_range_op_pool_mux_); | ||
| shards->split_flush_range_op_pool_.emplace_back( | ||
| std::move(txm->split_flush_op_)); | ||
| assert(txm->split_flush_op_ == nullptr); | ||
| return; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Find all references to split flush op pooling + lifecycle.
echo "=== SPLIT FLUSH OP POOLING & LIFECYCLE ==="
rg -n --hidden --no-heading -S \
'split_flush_op_|split_flush_range_op_pool_|split_flush_range_op_pool_mux_|SplitFlushRangeOp::Forward|NewSplitFlush|SplitFlushRangeOp::Reset' . 2>/dev/null | head -100
echo -e "\n=== TX RESPONSE FINISH & TXM RECYCLING ==="
rg -n --hidden --no-heading -S \
'bool_resp_->Finish\(|Finish\(false\)|Finish\(true\)|TxnStatus::Recovering' . 2>/dev/null | head -100
echo -e "\n=== SPLIT FLUSH OP SUBOPS & ASYNC WORK ==="
rg -n --hidden --no-heading -S \
'data_sync_op_|ds_upsert_range_op_|ds_clean_old_range_op_|update_key_cache_op_|AsyncOp<' tx_service/src/tx_operation.cpp 2>/dev/null | head -60Repository: eloqdata/tx_service
Length of output: 11128
🏁 Script executed:
#!/bin/bash
# Get file size and read SplitFlushRangeOp class definition
wc -l tx_service/src/tx_operation.cpp
echo -e "\n=== SEARCHING FOR SPLITFLUSHRANGEOP CLASS ==="
rg -n 'class SplitFlushRangeOp' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service
Length of output: 146
🏁 Script executed:
#!/bin/bash
# Read the relevant section including the questioned code
sed -n '4160,4220p' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service
Length of output: 2327
🏁 Script executed:
#!/bin/bash
# Find ClearInfos implementation in SplitFlushRangeOp
rg -n -A 20 'SplitFlushRangeOp.*ClearInfos\(|void.*SplitFlushRangeOp.*ClearInfos' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service
Length of output: 870
🏁 Script executed:
#!/bin/bash
# Find DataMigrationOp invalid-term cleanup pattern for comparison
rg -n -B 5 -A 15 'DataMigrationOp.*Recovering\|Recovered term is invalid.*DataMigration' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
#!/bin/bash
# Find DataMigrationOp invalid-term cleanup for comparison
rg -n -B 3 -A 12 'Recovered term is invalid' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service
Length of output: 1578
🏁 Script executed:
#!/bin/bash
# Check if DataMigrationOp calls txm->Abort() after recycling
rg -n -B 5 -A 20 'class DataMigrationOp' tx_service/src/tx_operation.cpp | head -100Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
#!/bin/bash
# Find where DataMigrationOp recycling happens and whether Abort is called
rg -n -B 2 -A 10 'txm->Abort()' tx_service/src/tx_operation.cpp | head -80Repository: eloqdata/tx_service
Length of output: 2050
🏁 Script executed:
#!/bin/bash
# Check AsyncOp::Forward to see if it spawns persistent worker threads
sed -n '3729,3785p' tx_service/src/tx_operation.cppRepository: eloqdata/tx_service
Length of output: 1673
Critical UAF risk: subop worker threads not joined before recycling split_flush_op_
The invalid-term cleanup path (line 4174–4193) calls ClearInfos() and immediately recycles split_flush_op_ to the pool without ensuring all member subops are safely stopped.
ClearInfos() only nullifies range_info_, new_range_info_, and range_record_ — it does not call Reset() on the four AsyncOp<Void> member subops (data_sync_op_, ds_upsert_range_op_, ds_clean_old_range_op_, update_key_cache_op_). Each AsyncOp can have a running worker_thread_ that must be joined before the op is reused. While Reset() is eventually called when the op is pulled from the pool, the window between recycle and reuse leaves the operation state undefined and vulnerable to data races.
Additionally, DataMigrationOp's invalid-term cleanup (line 7227) calls txm->Abort() after recycling; SplitFlushRangeOp omits this, risking txm lifecycle mismanagement.
There was a problem hiding this comment.
Yes we can only directly recycle slit flush op if op_ == nullptr
There was a problem hiding this comment.
@liunyl Thanks — agreed.
I’ll gate the early recycle path so we only directly recycle the SplitFlushRangeOp when no sub-op has started (op_ == nullptr). If a sub-op is already in-flight, we’ll fall through and let the normal state-machine paths unwind safely.
[plan_code_changes]
- Guard the invalid-term fast-exit branch in SplitFlushRangeOp::Forward with op_ == nullptr; otherwise, do not recycle here.
- Add an explanatory comment.
If you’d like, I can open a quick PR amendment with this patch.
✏️ Learnings added
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 357
File: tx_service/src/tx_operation.cpp:4174-4195
Timestamp: 2026-01-12T13:27:41.752Z
Learning: In tx_service/src/tx_operation.cpp, SplitFlushRangeOp::Forward must only directly recycle (Finish(false), ClearInfos(), pop, return the op to the pool) when op_ == nullptr; otherwise let the state machine unwind to avoid recycling while sub-ops (including AsyncOp workers) may still be running.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
There was a problem hiding this comment.
When forward SplitFlushRangeOp, there will definitely be no running sub-ops.
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
| else | ||
| { | ||
| uint32_t tx_ng_id = (Txn() >> 32L) >> 10; | ||
| if (tx_ng_id == node_group_id_ && ng_term != tx_term_) |
There was a problem hiding this comment.
Only if the tx is processed on coordinator node group, should we allow cc requests to be processed as candidate leader.
|
|
||
| if (txservice_enable_key_cache && this->table_name_.IsBase()) | ||
| { | ||
| int64_t ng_term = req.NodeGroupTerm(); |
There was a problem hiding this comment.
introduce compile warning in release build. Use
assert(req.NodeGroupTerm() > 0);
There was a problem hiding this comment.
variable ng_term is used by InitKeyCache
| if (txm->TxStatus() == TxnStatus::Recovering && | ||
| Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) < 0) | ||
| (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != | ||
| txm->TxTerm() && | ||
| Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) != txm->TxTerm())) | ||
| { | ||
| // This is a recovered tx and replay is not done yet. We should wait for | ||
| // replay finish before forwarding tx machine. | ||
| if (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) != | ||
| txm->TxTerm()) | ||
| { | ||
| // Recovered term is invalid. Do not call ForceToFinish as it will | ||
| // cause infinite recursive call. Clean up tx state directly. | ||
| txm->bool_resp_->Finish(false); | ||
| // Recovered term is invalid. Do not call ForceToFinish as it will | ||
| // cause infinite recursive call. Clean up tx state directly. | ||
| txm->bool_resp_->Finish(false); | ||
|
|
||
| ClearInfos(); | ||
| ClearInfos(); | ||
|
|
||
| txm->state_stack_.pop_back(); | ||
| assert(txm->state_stack_.empty()); | ||
| txm->state_stack_.pop_back(); | ||
| assert(txm->state_stack_.empty()); | ||
|
|
||
| assert(this == txm->split_flush_op_.get()); | ||
| LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); | ||
| std::unique_lock<std::mutex> lk( | ||
| shards->split_flush_range_op_pool_mux_); | ||
| shards->split_flush_range_op_pool_.emplace_back( | ||
| std::move(txm->split_flush_op_)); | ||
| assert(txm->split_flush_op_ == nullptr); | ||
| } | ||
| assert(this == txm->split_flush_op_.get()); | ||
| LocalCcShards *shards = Sharder::Instance().GetLocalCcShards(); | ||
| std::unique_lock<std::mutex> lk(shards->split_flush_range_op_pool_mux_); | ||
| shards->split_flush_range_op_pool_.emplace_back( | ||
| std::move(txm->split_flush_op_)); | ||
| assert(txm->split_flush_op_ == nullptr); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Yes we can only directly recycle slit flush op if op_ == nullptr
Allowing recovered split flush transactions to forward on the candidate leader node is necessary; otherwise, the recovery process will stack under the following circumstances: the split flush transaction needs to wait for data replay to complete, and at the same time, for the range being split, no other worker will persist the keys within it. The replayed keys need to wait for checkpointing to complete before they can be evicted from memory, leading to an OUT_OF_MEMORY error, and thus preventing the data replay request from completing.
Summary by CodeRabbit
Bug Fixes
Refactor
✏️ Tip: You can customize this high-level summary in your review settings.