Skip to content

Fix stuck during recovery#357

Open
yi-xmu wants to merge 1 commit intomainfrom
fix_recovery_stuck
Open

Fix stuck during recovery#357
yi-xmu wants to merge 1 commit intomainfrom
fix_recovery_stuck

Conversation

@yi-xmu
Copy link
Collaborator

@yi-xmu yi-xmu commented Jan 12, 2026

Allowing recovered split flush transactions to forward on the candidate leader node is necessary; otherwise, the recovery process will stack under the following circumstances: the split flush transaction needs to wait for data replay to complete, and at the same time, for the range being split, no other worker will persist the keys within it. The replayed keys need to wait for checkpointing to complete before they can be evicted from memory, leading to an OUT_OF_MEMORY error, and thus preventing the data replay request from completing.

Summary by CodeRabbit

  • Bug Fixes

    • Improved transaction term validation and synchronization to prevent invalid states during leader transitions.
    • Enhanced recovery handling to prevent potential deadlocks in split-flush operations during replicas' state recovery.
  • Refactor

    • Refined internal control flow for transaction processing to ensure consistent term propagation across request types.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 12, 2026

Walkthrough

This PR introduces a new NodeGroupTerm() accessor and specialized ValidTermCheck() overrides for AcquireAllCc and PostWriteAllCc classes. The changes refactor term validation to use NodeGroupTerm instead of LeaderTerm across execution paths, add runtime validation checks, and simplify control flow in SplitFlushRangeOp::Forward to prevent deadlocks during transaction recovery.

Changes

Cohort / File(s) Summary
CC Request Interface
tx_service/include/cc/cc_request.h
Added NodeGroupTerm() const accessor to TemplatedCcRequest returning ng_term_. Added ValidTermCheck() overrides to AcquireAllCc and PostWriteAllCc that compute ng_term as max(LeaderTerm, CandidateLeaderTerm), validate non-negativity, and synchronize ng_term_ member with computed value, including assertions for positive terms.
Execution Path Updates
tx_service/include/cc/range_cc_map.h, tx_service/include/cc/template_cc_map.h
Replaced LeaderTerm-based term retrieval with NodeGroupTerm() calls in AcquireCc and AcquireAllCc execute paths. Removed explicit leader validation error handling in PostWriteAllCc. Added runtime validation logs when key-range sanity checks fail (start_key not strictly less than slice_end_key). Changed to use assertions instead of early-return guards for term validity.
Transaction Operation Cleanup
tx_service/src/tx_operation.cpp
Refactored SplitFlushRangeOp::Forward recovered-tx handling from conditional retry logic to direct cleanup path. Now immediately finishes response, clears internal state, and recycles operation on term mismatch to avoid deadlocks and infinite recursion during leader transitions.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • Range read block on rw conflict #283: Modifies the same execution code paths in range_cc_map.h and SplitFlushRangeOp::Forward in tx_operation.cpp with directly related function-level changes.
  • Store ng leader term with ng_leader_cache. #129: Introduces node-group leader term caching and transport mechanisms (ng_leader_term_cache_, UpdateLeader(term)) that work in tandem with this PR's NodeGroupTerm() usage and term validation flow.

Suggested reviewers

  • lokax
  • liunyl

Poem

🐰 A rabbit hops through terms so true,
NodeGroupTerm() shines fresh and new,
ValidTermCheck keeps the flow quite tight,
No more deadlocks in the night! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description explains the technical problem and solution clearly. However, the required checklist from the template (tests, documentation, issue/RFC links, and test suite execution) is not addressed in the provided description. Add the required checklist items: confirm tests were added, document changes, include issue link (fixes eloqdb/tx_service#...), reference RFC if applicable, and verify test suite passes.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Fix stuck during recovery' directly addresses the main objective of the PR—fixing a recovery stall issue. It is concise and clearly communicates the primary change.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix_recovery_stuck

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@yi-xmu yi-xmu linked an issue Jan 12, 2026 that may be closed by this pull request
@yi-xmu yi-xmu self-assigned this Jan 12, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tx_service/include/cc/cc_request.h (1)

7363-7370: Guard UnpinNodeGroupData() to only run when pin succeeded (unpin-without-pin bug).

KeyObjectStandbyForwardCc::Execute() sets updated_local_seq_id_ = true before calling TryPinStandbyNodeGroupData(), and returns early on pin failure. Finish() then unconditionally unpins when updated_local_seq_id_ && table_name_->IsMeta(), resulting in unpinning even when the pin never succeeded. Track pin success with a dedicated member and guard the unpin accordingly.

Proposed fix
 // in KeyObjectStandbyForwardCc members
 bool updated_local_seq_id_{false};
+int64_t pinned_ng_term_{-1};

 void Reset(std::unique_ptr<remote::CcMessage> msg)
 {
     ...
     updated_local_seq_id_ = false;
+    pinned_ng_term_ = -1;
     ...
 }

 bool Execute(CcShard &ccs) override
 {
     ...
     updated_local_seq_id_ = true;
     if (table_name_->IsMeta())
     {
         int64_t term = Sharder::Instance().TryPinStandbyNodeGroupData();
         if (term < 0)
         {
             return SetFinish(ccs);
         }
+        pinned_ng_term_ = term;
     }
     ...
 }

 void Finish(CcShard &shard)
 {
-    if (updated_local_seq_id_ && table_name_->IsMeta())
+    if (updated_local_seq_id_ && table_name_->IsMeta() && pinned_ng_term_ >= 0)
     {
         Sharder::Instance().UnpinNodeGroupData(node_group_id_);
     }
     ...
 }

Also applies to: 7471-7476

🤖 Fix all issues with AI agents
In @tx_service/include/cc/cc_request.h:
- Around line 534-567: ValidTermCheck in AcquireAllCc can race on the member
ng_term_ when parallel_req_ is true and also uses assert(ng_term > 0); fix by
making ng_term_ initialization thread-safe and replacing the assert with a
runtime check: change ng_term_ to an atomic<int64_t> (or protect accesses with a
mutex) and use an atomic compare-exchange to set ng_term_ only if it is still
<0, and replace assert(ng_term > 0) with a conditional that returns false when
ng_term <= 0; keep the existing comparisons against ng_term, tx_term_, and
node_group_id_ (Txn(), tx_term_, node_group_id_) but ensure all reads/writes to
ng_term_ use the chosen atomic/mutex protection.

In @tx_service/include/cc/range_cc_map.h:
- Around line 606-608: Replace the assertion on ng_term with a runtime
validation before calling InitKeyCache()/pinning (which triggers
range->pins_.fetch_add()); specifically, check that ng_term (from
req.NodeGroupTerm()) is > 0, and if not handle the error path (e.g., return
false or propagate an error) instead of asserting so pins are not incremented in
release builds.

In @tx_service/include/cc/template_cc_map.h:
- Around line 702-705: The code currently uses assert(ng_term > 0) after setting
int64_t ng_term = req.NodeGroupTerm(); which is unsafe in release builds;
replace the assert with a runtime guard that checks ng_term > 0 and handles the
invalid case before any lock metadata or lock operations use ng_term.
Specifically, inside the Execute override in template_cc_map.h, validate
req.NodeGroupTerm() into ng_term, and if ng_term <= 0 then log or return an
appropriate error/status (or abort the operation) rather than proceeding; ensure
this mirrors the behavior of ValidTermCheck where applicable and prevents
propagation of an invalid term into lock operations that reference
ng_id/ng_term.
🧹 Nitpick comments (2)
tx_service/src/tx_operation.cpp (1)

4196-4200: Comment is helpful; consider making the “forward on candidate leader” behavior explicit in code structure

The rationale is solid, but since this is subtle behavior, consider extracting this whole “Recovering + term mismatch” gate into a small helper (e.g., HandleInvalidRecoveredTermOrContinue(txm)) so future edits don’t accidentally reintroduce the old stall condition.

tx_service/include/cc/cc_request.h (1)

961-994: Deduplicate and verify concurrency assumptions for PostWriteAllCc::ValidTermCheck() (same logic as AcquireAllCc).
This is now copy/pasted term logic; consider extracting a shared helper (to avoid drift). Also, if PostWriteAllCc is ever shared across cores concurrently (like other “All” requests), it may need the same synchronization treatment as AcquireAllCc.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1210b03 and 1b5c47d.

📒 Files selected for processing (4)
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/range_cc_map.h
  • tx_service/include/cc/template_cc_map.h
  • tx_service/src/tx_operation.cpp
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.
📚 Learning: 2025-12-02T10:43:27.431Z
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Applied to files:

  • tx_service/include/cc/range_cc_map.h
  • tx_service/include/cc/cc_request.h
  • tx_service/include/cc/template_cc_map.h
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • tx_service/include/cc/range_cc_map.h
  • tx_service/include/cc/template_cc_map.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • tx_service/include/cc/range_cc_map.h
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • tx_service/include/cc/template_cc_map.h
🧬 Code graph analysis (2)
tx_service/src/tx_operation.cpp (4)
tx_service/src/sk_generator.cpp (6)
  • lk (570-570)
  • lk (585-585)
  • lk (598-598)
  • lk (627-627)
  • lk (1040-1040)
  • lk (1046-1046)
tx_service/src/cc/local_cc_shards.cpp (3)
  • lk (274-274)
  • lk (402-402)
  • lk (431-431)
tx_service/src/dead_lock_check.cpp (3)
  • lk (160-160)
  • lk (513-513)
  • lk (525-525)
tx_service/src/checkpointer.cpp (4)
  • lk (409-409)
  • lk (473-473)
  • lk (483-483)
  • lk (489-489)
tx_service/include/cc/cc_request.h (2)
tx_service/src/cc/cc_req_misc.cpp (8)
  • ValidTermCheck (88-125)
  • ValidTermCheck (88-88)
  • ValidTermCheck (852-877)
  • ValidTermCheck (852-852)
  • ValidTermCheck (1016-1027)
  • ValidTermCheck (1016-1016)
  • ValidTermCheck (1117-1127)
  • ValidTermCheck (1117-1117)
tx_service/include/sharder.h (1)
  • Sharder (170-186)
🔇 Additional comments (1)
tx_service/include/cc/cc_request.h (1)

286-290: Nice addition: NodeGroupTerm() accessor is clean and helps standardize term propagation.

Comment on lines +534 to +567
bool ValidTermCheck() override
{
int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_);
int64_t ng_candidate_term =
Sharder::Instance().CandidateLeaderTerm(node_group_id_);
ng_term = std::max(ng_term, ng_candidate_term);

if (ng_term < 0)
{
return false;
}
else
{
uint32_t tx_ng_id = (Txn() >> 32L) >> 10;
if (tx_ng_id == node_group_id_ && ng_term != tx_term_)
{
// The request is processed on the coordinator candidate leader,
// but the term is mismatch.
return false;
}
}
assert(ng_term > 0);

if (ng_term_ < 0)
{
ng_term_ = ng_term;
}
else if (ng_term != ng_term_)
{
return false;
}
return true;
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix potential data race on ng_term_ in parallel AcquireAllCc::ValidTermCheck() + avoid assert-only term validation.
AcquireAllCc is marked parallel_req_ = true, but ValidTermCheck() can write ng_term_ concurrently across cores (UB). Also, assert(ng_term > 0) can crash debug builds if term can be 0 (many other paths only treat <0 as invalid).

Proposed fix (serialize `ng_term_` initialization + make term validation non-assert-only)
 bool ValidTermCheck() override
 {
     int64_t ng_term = Sharder::Instance().LeaderTerm(node_group_id_);
     int64_t ng_candidate_term =
         Sharder::Instance().CandidateLeaderTerm(node_group_id_);
     ng_term = std::max(ng_term, ng_candidate_term);

     if (ng_term < 0)
     {
         return false;
     }
     else
     {
-        uint32_t tx_ng_id = (Txn() >> 32L) >> 10;
+        // Txn encodes ng_id in the high bits; this is effectively >> 42.
+        uint32_t tx_ng_id = static_cast<uint32_t>(Txn() >> 42);
         if (tx_ng_id == node_group_id_ && ng_term != tx_term_)
         {
             // The request is processed on the coordinator candidate leader,
             // but the term is mismatch.
             return false;
         }
     }
-    assert(ng_term > 0);
+    // Don't rely on assert for validity.
+    if (ng_term <= 0)
+    {
+        return false;
+    }

-    if (ng_term_ < 0)
-    {
-        ng_term_ = ng_term;
-    }
-    else if (ng_term != ng_term_)
-    {
-        return false;
-    }
+    // AcquireAllCc runs in parallel across cores; protect ng_term_ init/check.
+    {
+        std::lock_guard<std::mutex> lk(mux_);
+        if (ng_term_ < 0)
+        {
+            ng_term_ = ng_term;
+        }
+        else if (ng_term != ng_term_)
+        {
+            return false;
+        }
+    }
     return true;
 }
🤖 Prompt for AI Agents
In @tx_service/include/cc/cc_request.h around lines 534 - 567, ValidTermCheck in
AcquireAllCc can race on the member ng_term_ when parallel_req_ is true and also
uses assert(ng_term > 0); fix by making ng_term_ initialization thread-safe and
replacing the assert with a runtime check: change ng_term_ to an atomic<int64_t>
(or protect accesses with a mutex) and use an atomic compare-exchange to set
ng_term_ only if it is still <0, and replace assert(ng_term > 0) with a
conditional that returns false when ng_term <= 0; keep the existing comparisons
against ng_term, tx_term_, and node_group_id_ (Txn(), tx_term_, node_group_id_)
but ensure all reads/writes to ng_term_ use the chosen atomic/mutex protection.

Comment on lines +606 to +608
int64_t ng_term = req.NodeGroupTerm();
assert(ng_term > 0);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find and examine the PostWriteAllCc class and its ValidTermCheck implementation
rg -n "class PostWriteAllCc" --type cpp -A 20

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Search for ValidTermCheck implementations and overrides
rg -n "ValidTermCheck" --type cpp -B 2 -A 5

Repository: eloqdata/tx_service

Length of output: 27257


🏁 Script executed:

# Search for Execute(PostWriteAllCc in the actual file to understand the context
rg -n "Execute.*PostWriteAllCc" tx_service/include/cc/range_cc_map.h -B 5 -A 20

Repository: eloqdata/tx_service

Length of output: 1106


🏁 Script executed:

# Let's examine the specific file and lines mentioned
head -650 tx_service/include/cc/range_cc_map.h | tail -60

Repository: eloqdata/tx_service

Length of output: 3031


🏁 Script executed:

# Search for TryPinNodeGroupData, PinNodeGroupData, UnpinNodeGroupData in range_cc_map.h
rg -n "TryPinNodeGroupData|PinNodeGroupData|UnpinNodeGroupData" tx_service/include/cc/range_cc_map.h -B 3 -A 3

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Look for the broader context around lines 606-608, particularly the PostCommit method
sed -n '550,700p' tx_service/include/cc/range_cc_map.h | cat -n

Repository: eloqdata/tx_service

Length of output: 8383


🏁 Script executed:

# Search for NodeGroupTerm() usage and how it's validated elsewhere
rg -n "NodeGroupTerm()" tx_service/include/cc/range_cc_map.h -B 2 -A 2

Repository: eloqdata/tx_service

Length of output: 279


🏁 Script executed:

# Search for similar assertions on ng_term or node_group terms in the codebase
rg -n "assert.*ng_term.*[>0]|assert.*term.*[>0]" --type cpp tx_service/include/cc/ | head -20

Repository: eloqdata/tx_service

Length of output: 921


🏁 Script executed:

# Check if ValidTermCheck() is called at the start of PostWriteAllCc::Execute() in range_cc_map.h
sed -n '400,450p' tx_service/include/cc/range_cc_map.h | cat -n

Repository: eloqdata/tx_service

Length of output: 2619


🏁 Script executed:

# Search for local_cc_shards to understand if pin/unpin operations are involved
rg -n "TryPinNodeGroupData|local_shards_|PostProcessFlushTaskEntries" tx_service/include/cc/range_cc_map.h -B 2 -A 2

Repository: eloqdata/tx_service

Length of output: 4238


🏁 Script executed:

# Check PostWriteAllCc::Execute in cc_request.h to see the full implementation
rg -n "class PostWriteAllCc" tx_service/include/cc/cc_request.h -A 100 | head -120

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Find the full Execute() implementation for PostWriteAllCc in cc_request.h
rg -n "class PostWriteAllCc" tx_service/include/cc/cc_request.h -A 200 | grep -A 50 "bool Execute"

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Search for PostWriteAllCc declaration and where ValidTermCheck is called
rg -n "PostWriteAllCc" tx_service/include/cc/cc_request.h | head -20

Repository: eloqdata/tx_service

Length of output: 694


🏁 Script executed:

# Look at the actual PostWriteAllCc class definition to see if it has an Execute that calls ValidTermCheck
sed -n '940,1100p' tx_service/include/cc/cc_request.h | cat -n

Repository: eloqdata/tx_service

Length of output: 5717


🏁 Script executed:

# Check if InitKeyCache uses pin/unpin operations
rg -n "InitKeyCache" --type cpp -A 10 | head -50

Repository: eloqdata/tx_service

Length of output: 3701


🏁 Script executed:

# Search for TemplateCcMap<KeyT, RangeRecord>::Execute to see if it calls ValidTermCheck
rg -n "class TemplateCcMap" tx_service/include/cc/template_cc_map.h -A 5

Repository: eloqdata/tx_service

Length of output: 292


🏁 Script executed:

# Look for Execute method in TemplateCcMap that handles PostWriteAllCc
rg -n "bool Execute\(PostWriteAllCc" tx_service/include/cc/template_cc_map.h -B 5 -A 20

Repository: eloqdata/tx_service

Length of output: 918


🏁 Script executed:

# Check if there's a ValidTermCheck call at the start of the PostWriteAllCc execution path
sed -n '1,100p' tx_service/include/cc/template_cc_map.h | grep -A 10 "Execute.*PostWriteAllCc"

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Continue reading the Execute(PostWriteAllCc) method in template_cc_map.h to find ValidTermCheck call
sed -n '972,1050p' tx_service/include/cc/template_cc_map.h | cat -n

Repository: eloqdata/tx_service

Length of output: 3466


🏁 Script executed:

# Search for where Execute() calls ValidTermCheck in the codebase
rg -n "if.*ValidTermCheck\|ValidTermCheck\(\)" tx_service/include/cc/cc_request.h -B 2 -A 3 | head -40

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Look for the pattern where Execute delegates to parent class for PostWriteAllCc
rg -n "TemplateCcMap.*Execute.*req" tx_service/include/cc/range_cc_map.h -B 5 -A 5

Repository: eloqdata/tx_service

Length of output: 1587


🏁 Script executed:

# Find where ValidTermCheck is called in the CcShard::Execute flow
rg -n "class CcShard" tx_service/include/cc/ --type cpp -A 50 | grep -A 30 "Execute.*CcRequestBase"

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Look for the Execute method that dispatches to cc requests
rg -n "bool Execute\(CcRequestBase" tx_service/include/cc/cc_shard.h -B 5 -A 20

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Search in cc_shard.h for the main Execute dispatcher
rg -n "ValidTermCheck\(\)" tx_service/include/cc/cc_shard.h -B 5 -A 5

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Let me check the actual flow by looking at CcRequestBase and how Execute is called
rg -n "class CcRequestBase" tx_service/include/cc/cc_request.h -A 30 | head -50

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Look for the entry point that calls ValidTermCheck before Execute
rg -n "ValidTermCheck\(\)" tx_service/src/cc/ --type cpp -B 3 -A 8 | head -60

Repository: eloqdata/tx_service

Length of output: 3739


🏁 Script executed:

# Check local_cc_shards.cpp for pin/unpin patterns mentioned in the learning
rg -n "TryPinNodeGroupData|UnpinNodeGroupData" tx_service/src/cc/local_cc_shards.cpp -B 3 -A 5 | head -80

Repository: eloqdata/tx_service

Length of output: 3021


🏁 Script executed:

# Let me check if there are any conditional checks around InitKeyCache in similar code patterns
rg -n "InitKeyCache" tx_service/include/cc/ --type cpp -B 5 -A 5 | grep -A 5 -B 5 "if.*term\|assert.*term"

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

# Check the full context around lines 606-650 in range_cc_map.h to see error handling
sed -n '600,750p' tx_service/include/cc/range_cc_map.h | cat -n

Repository: eloqdata/tx_service

Length of output: 7870


Replace the assertion with a runtime check to guard the resource-pinning operation.

The code pins the range via InitKeyCache() (which calls range->pins_.fetch_add()) but uses assert(ng_term > 0) instead of a runtime check. Per the codebase pattern shown in PostProcessFlushTaskEntries (local_cc_shards.cpp:3303–3306), pin operations should be guarded with runtime checks (if (term >= 0)) rather than assertions, since assertions are disabled in release builds and could lead to resource leaks. Replace the assertion with:

if (ng_term <= 0) {
    // Handle error: term validation failed
    return false;  // or appropriate error handling
}

This ensures the pinning operation is only performed when the term is valid, matching the defensive pattern used elsewhere in the codebase.

🤖 Prompt for AI Agents
In @tx_service/include/cc/range_cc_map.h around lines 606 - 608, Replace the
assertion on ng_term with a runtime validation before calling
InitKeyCache()/pinning (which triggers range->pins_.fetch_add()); specifically,
check that ng_term (from req.NodeGroupTerm()) is > 0, and if not handle the
error path (e.g., return false or propagate an error) instead of asserting so
pins are not incremented in release builds.

Comment on lines 702 to 705
uint32_t ng_id = req.NodeGroupId();
int64_t ng_term = Sharder::Instance().LeaderTerm(ng_id);
if (ng_term < 0)
{
return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
}
int64_t ng_term = req.NodeGroupTerm();
assert(ng_term > 0);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n --type=cpp --type=hpp -A 2 "NodeGroupTerm\s*\(" | head -80

Repository: eloqdata/tx_service

Length of output: 90


🏁 Script executed:

rg -n --type=cpp --type=hpp "ValidTermCheck\s*\(" -A 3 | head -60

Repository: eloqdata/tx_service

Length of output: 90


🏁 Script executed:

fd -e hpp -e cpp | xargs rg -l "class\s+AcquireAllCc\b|struct\s+AcquireAllCc\b"

Repository: eloqdata/tx_service

Length of output: 47


🏁 Script executed:

rg -n --type=cpp --type=hpp "int64_t.*NodeGroupTerm|CcRequestBase.*NodeGroupTerm" -A 5 | head -100

Repository: eloqdata/tx_service

Length of output: 90


🏁 Script executed:

rg -n "NodeGroupTerm\s*\(" -A 2 | head -100

Repository: eloqdata/tx_service

Length of output: 5274


🏁 Script executed:

rg -n "ValidTermCheck" -A 3 | head -80

Repository: eloqdata/tx_service

Length of output: 4720


🏁 Script executed:

fd -e h -e cpp | xargs grep -l "class AcquireAllCc\|struct AcquireAllCc" 2>/dev/null

Repository: eloqdata/tx_service

Length of output: 128


🏁 Script executed:

rg -n "int64_t.*NodeGroupTerm\(\)|NodeGroupTerm.*int64_t" -B 2 -A 5 | head -120

Repository: eloqdata/tx_service

Length of output: 5127


🏁 Script executed:

rg -n "class AcquireAllCc" -A 50 | head -80

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

rg -n "AcquireAllCc::Execute" -A 100 | head -150

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

rg -n "SetCceLock|AcquireCceKeyLock" -B 2 -A 5 | head -120

Repository: eloqdata/tx_service

Length of output: 8947


🏁 Script executed:

rg -n "AcquireAllCc" | head -50

Repository: eloqdata/tx_service

Length of output: 3795


🏁 Script executed:

rg -n "Execute\(CcShard" -B 5 | grep -A 5 "AcquireAllCc"

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

fd -type f -name "*.h" | xargs grep -l "class AcquireAllCc"

Repository: eloqdata/tx_service

Length of output: 235


🏁 Script executed:

cat tx_service/include/cc/template_cc_map.h | sed -n '690,750p'

Repository: eloqdata/tx_service

Length of output: 2701


🏁 Script executed:

rg -n "AcquireAllCc::Reset|NextRequest\(" -B 3 -A 8 | head -100

Repository: eloqdata/tx_service

Length of output: 7754


🏁 Script executed:

rg -n "ng_term_" tx_service/include/cc/cc_request.h -B 2 -A 2 | head -80

Repository: eloqdata/tx_service

Length of output: 2386


🏁 Script executed:

rg -n "struct AcquireAllCc" -A 80 tx_service/include/cc/cc_request.h | head -120

Repository: eloqdata/tx_service

Length of output: 2811


🏁 Script executed:

rg -n "AcquireAllCc.*Reset" -B 2 -A 15 | head -150

Repository: eloqdata/tx_service

Length of output: 4148


🏁 Script executed:

rg -n "ValidTermCheck\|Execute" tx_service/src/cc/local_cc_shard.cpp -B 2 -A 2 | head -150

Repository: eloqdata/tx_service

Length of output: 139


🏁 Script executed:

rg -n "EnqueueCcRequest\|Execute.*AcquireAllCc" -B 5 -A 5 | head -120

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

cat tx_service/include/cc/template_cc_map.h | sed -n '671,690p'

Repository: eloqdata/tx_service

Length of output: 764


🏁 Script executed:

rg -n "class LocalCcHandler" -A 50 tx_service/include/cc/local_cc_handler.h | head -100

Repository: eloqdata/tx_service

Length of output: 2304


🏁 Script executed:

rg -n "EnqueueCcRequest.*AcquireAllCc|ProcessCcRequest" -B 5 -A 10 | head -150

Repository: eloqdata/tx_service

Length of output: 1086


🏁 Script executed:

rg -n "ValidTermCheck.*Execute\|Execute.*ValidTermCheck" -B 3 -A 3

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

rg -n "ValidTermCheck\(\)" -B 5 -A 5 | grep -A 10 -B 10 "AcquireAllCc\|TemplatedCcRequest"

Repository: eloqdata/tx_service

Length of output: 4234


🏁 Script executed:

rg -n "CcShards::ProcessCcRequest\|LocalCcShards::ProcessCcRequest" -A 50 | head -120

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

rg -n "\.Execute\(" tx_service/src/cc/local_cc_shards.cpp -B 10 -A 5 | head -150

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

rg -n "Execute.*AcquireAllCc" tx_service/include/cc/template_cc_map.h -B 2 -A 50 | head -100

Repository: eloqdata/tx_service

Length of output: 4117


🏁 Script executed:

rg -n "bool Execute\(AcquireAllCc" -B 5 -A 80 tx_service/include/cc/template_cc_map.h | head -150

Repository: eloqdata/tx_service

Length of output: 3863


Replace assert(ng_term > 0) with a runtime guard that validates the term before using it in lock operations.

If req.NodeGroupTerm() returns <= 0, release builds will ignore the assert and propagate an invalid term into lock metadata. Since ng_term initializes to -1 and ValidTermCheck is bypassed in this Execute override, add explicit validation:

Suggested fix
 uint32_t ng_id = req.NodeGroupId();
 int64_t ng_term = req.NodeGroupTerm();
-assert(ng_term > 0);
+if (ng_term <= 0)
+{
+    return hd_res->SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
+}
🤖 Prompt for AI Agents
In @tx_service/include/cc/template_cc_map.h around lines 702 - 705, The code
currently uses assert(ng_term > 0) after setting int64_t ng_term =
req.NodeGroupTerm(); which is unsafe in release builds; replace the assert with
a runtime guard that checks ng_term > 0 and handles the invalid case before any
lock metadata or lock operations use ng_term. Specifically, inside the Execute
override in template_cc_map.h, validate req.NodeGroupTerm() into ng_term, and if
ng_term <= 0 then log or return an appropriate error/status (or abort the
operation) rather than proceeding; ensure this mirrors the behavior of
ValidTermCheck where applicable and prevents propagation of an invalid term into
lock operations that reference ng_id/ng_term.

Comment on lines +5563 to +5577
if (!(*start_key < *slice_end_key))
{
DLOG(ERROR)
<< "!!!ERROR!!! start key: " << start_key->ToString()
<< ", search key: " << search_key.ToString()
<< ", slice start key: "
<< typed_slice->StartKey()->ToString()
<< ", slice end key: " << slice_end_key->ToString()
<< ", export base table item: " << std::boolalpha
<< req.export_base_table_item_
<< ", current slice index: "
<< req.curr_slice_index_[shard_->core_id_]
<< " on core: " << shard_->core_id_
<< ", table: " << table_name_.StringView();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t continue scanning when start_key >= slice_end_key (can violate slice boundaries).

Right now this only emits a DLOG(ERROR) and continues. If that invariant ever breaks, it/end_it can describe an invalid range (e.g., it past end_it), and the scan may export keys outside the slice.

At minimum: (1) make this visible in prod (not DLOG), and (2) force the slice to be treated as empty (or fail the request) to avoid out-of-range iteration.

Safer fallback (treat as empty slice)
 end_it = deduce_iterator(*slice_end_key);
 if (!(*start_key < *slice_end_key))
 {
-    DLOG(ERROR)
+    LOG(ERROR)
         << "!!!ERROR!!! start key: " << start_key->ToString()
         << ", search key: " << search_key.ToString()
         << ", slice start key: "
         << typed_slice->StartKey()->ToString()
         << ", slice end key: " << slice_end_key->ToString()
         << ", export base table item: " << std::boolalpha
         << req.export_base_table_item_
         << ", current slice index: "
         << req.curr_slice_index_[shard_->core_id_]
         << " on core: " << shard_->core_id_
         << ", table: " << table_name_.StringView();
+    // Prevent scanning out of the slice.
+    it = end_it;
 }

Comment on lines 4174 to 4195
if (txm->TxStatus() == TxnStatus::Recovering &&
Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) < 0)
(Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) !=
txm->TxTerm() &&
Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) != txm->TxTerm()))
{
// This is a recovered tx and replay is not done yet. We should wait for
// replay finish before forwarding tx machine.
if (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) !=
txm->TxTerm())
{
// Recovered term is invalid. Do not call ForceToFinish as it will
// cause infinite recursive call. Clean up tx state directly.
txm->bool_resp_->Finish(false);
// Recovered term is invalid. Do not call ForceToFinish as it will
// cause infinite recursive call. Clean up tx state directly.
txm->bool_resp_->Finish(false);

ClearInfos();
ClearInfos();

txm->state_stack_.pop_back();
assert(txm->state_stack_.empty());
txm->state_stack_.pop_back();
assert(txm->state_stack_.empty());

assert(this == txm->split_flush_op_.get());
LocalCcShards *shards = Sharder::Instance().GetLocalCcShards();
std::unique_lock<std::mutex> lk(
shards->split_flush_range_op_pool_mux_);
shards->split_flush_range_op_pool_.emplace_back(
std::move(txm->split_flush_op_));
assert(txm->split_flush_op_ == nullptr);
}
assert(this == txm->split_flush_op_.get());
LocalCcShards *shards = Sharder::Instance().GetLocalCcShards();
std::unique_lock<std::mutex> lk(shards->split_flush_range_op_pool_mux_);
shards->split_flush_range_op_pool_.emplace_back(
std::move(txm->split_flush_op_));
assert(txm->split_flush_op_ == nullptr);
return;
}
Copy link

@coderabbitai coderabbitai bot Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# 1) Find all references to split flush op pooling + lifecycle.
echo "=== SPLIT FLUSH OP POOLING & LIFECYCLE ==="
rg -n --hidden --no-heading -S \
  'split_flush_op_|split_flush_range_op_pool_|split_flush_range_op_pool_mux_|SplitFlushRangeOp::Forward|NewSplitFlush|SplitFlushRangeOp::Reset' . 2>/dev/null | head -100

echo -e "\n=== TX RESPONSE FINISH & TXM RECYCLING ==="
rg -n --hidden --no-heading -S \
  'bool_resp_->Finish\(|Finish\(false\)|Finish\(true\)|TxnStatus::Recovering' . 2>/dev/null | head -100

echo -e "\n=== SPLIT FLUSH OP SUBOPS & ASYNC WORK ==="
rg -n --hidden --no-heading -S \
  'data_sync_op_|ds_upsert_range_op_|ds_clean_old_range_op_|update_key_cache_op_|AsyncOp<' tx_service/src/tx_operation.cpp 2>/dev/null | head -60

Repository: eloqdata/tx_service

Length of output: 11128


🏁 Script executed:

#!/bin/bash
# Get file size and read SplitFlushRangeOp class definition
wc -l tx_service/src/tx_operation.cpp
echo -e "\n=== SEARCHING FOR SPLITFLUSHRANGEOP CLASS ==="
rg -n 'class SplitFlushRangeOp' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 146


🏁 Script executed:

#!/bin/bash
# Read the relevant section including the questioned code
sed -n '4160,4220p' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 2327


🏁 Script executed:

#!/bin/bash
# Find ClearInfos implementation in SplitFlushRangeOp
rg -n -A 20 'SplitFlushRangeOp.*ClearInfos\(|void.*SplitFlushRangeOp.*ClearInfos' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 870


🏁 Script executed:

#!/bin/bash
# Find DataMigrationOp invalid-term cleanup pattern for comparison
rg -n -B 5 -A 15 'DataMigrationOp.*Recovering\|Recovered term is invalid.*DataMigration' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Find DataMigrationOp invalid-term cleanup for comparison
rg -n -B 3 -A 12 'Recovered term is invalid' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 1578


🏁 Script executed:

#!/bin/bash
# Check if DataMigrationOp calls txm->Abort() after recycling
rg -n -B 5 -A 20 'class DataMigrationOp' tx_service/src/tx_operation.cpp | head -100

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

#!/bin/bash
# Find where DataMigrationOp recycling happens and whether Abort is called
rg -n -B 2 -A 10 'txm->Abort()' tx_service/src/tx_operation.cpp | head -80

Repository: eloqdata/tx_service

Length of output: 2050


🏁 Script executed:

#!/bin/bash
# Check AsyncOp::Forward to see if it spawns persistent worker threads
sed -n '3729,3785p' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 1673


Critical UAF risk: subop worker threads not joined before recycling split_flush_op_

The invalid-term cleanup path (line 4174–4193) calls ClearInfos() and immediately recycles split_flush_op_ to the pool without ensuring all member subops are safely stopped.

ClearInfos() only nullifies range_info_, new_range_info_, and range_record_ — it does not call Reset() on the four AsyncOp<Void> member subops (data_sync_op_, ds_upsert_range_op_, ds_clean_old_range_op_, update_key_cache_op_). Each AsyncOp can have a running worker_thread_ that must be joined before the op is reused. While Reset() is eventually called when the op is pulled from the pool, the window between recycle and reuse leaves the operation state undefined and vulnerable to data races.

Additionally, DataMigrationOp's invalid-term cleanup (line 7227) calls txm->Abort() after recycling; SplitFlushRangeOp omits this, risking txm lifecycle mismanagement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can only directly recycle slit flush op if op_ == nullptr

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liunyl Thanks — agreed.

I’ll gate the early recycle path so we only directly recycle the SplitFlushRangeOp when no sub-op has started (op_ == nullptr). If a sub-op is already in-flight, we’ll fall through and let the normal state-machine paths unwind safely.

[plan_code_changes]

  • Guard the invalid-term fast-exit branch in SplitFlushRangeOp::Forward with op_ == nullptr; otherwise, do not recycle here.
  • Add an explanatory comment.

If you’d like, I can open a quick PR amendment with this patch.


✏️ Learnings added
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 357
File: tx_service/src/tx_operation.cpp:4174-4195
Timestamp: 2026-01-12T13:27:41.752Z
Learning: In tx_service/src/tx_operation.cpp, SplitFlushRangeOp::Forward must only directly recycle (Finish(false), ClearInfos(), pop, return the op to the pool) when op_ == nullptr; otherwise let the state machine unwind to avoid recycling while sub-ops (including AsyncOp workers) may still be running.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Copy link
Collaborator Author

@yi-xmu yi-xmu Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When forward SplitFlushRangeOp, there will definitely be no running sub-ops.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

else
{
uint32_t tx_ng_id = (Txn() >> 32L) >> 10;
if (tx_ng_id == node_group_id_ && ng_term != tx_term_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if the tx is processed on coordinator node group, should we allow cc requests to be processed as candidate leader.


if (txservice_enable_key_cache && this->table_name_.IsBase())
{
int64_t ng_term = req.NodeGroupTerm();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduce compile warning in release build. Use
assert(req.NodeGroupTerm() > 0);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable ng_term is used by InitKeyCache

Comment on lines 4174 to 4195
if (txm->TxStatus() == TxnStatus::Recovering &&
Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) < 0)
(Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) !=
txm->TxTerm() &&
Sharder::Instance().LeaderTerm(txm->TxCcNodeId()) != txm->TxTerm()))
{
// This is a recovered tx and replay is not done yet. We should wait for
// replay finish before forwarding tx machine.
if (Sharder::Instance().CandidateLeaderTerm(txm->TxCcNodeId()) !=
txm->TxTerm())
{
// Recovered term is invalid. Do not call ForceToFinish as it will
// cause infinite recursive call. Clean up tx state directly.
txm->bool_resp_->Finish(false);
// Recovered term is invalid. Do not call ForceToFinish as it will
// cause infinite recursive call. Clean up tx state directly.
txm->bool_resp_->Finish(false);

ClearInfos();
ClearInfos();

txm->state_stack_.pop_back();
assert(txm->state_stack_.empty());
txm->state_stack_.pop_back();
assert(txm->state_stack_.empty());

assert(this == txm->split_flush_op_.get());
LocalCcShards *shards = Sharder::Instance().GetLocalCcShards();
std::unique_lock<std::mutex> lk(
shards->split_flush_range_op_pool_mux_);
shards->split_flush_range_op_pool_.emplace_back(
std::move(txm->split_flush_op_));
assert(txm->split_flush_op_ == nullptr);
}
assert(this == txm->split_flush_op_.get());
LocalCcShards *shards = Sharder::Instance().GetLocalCcShards();
std::unique_lock<std::mutex> lk(shards->split_flush_range_op_pool_mux_);
shards->split_flush_range_op_pool_.emplace_back(
std::move(txm->split_flush_op_));
assert(txm->split_flush_op_ == nullptr);
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can only directly recycle slit flush op if op_ == nullptr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

During the eloqdoc jepsen test, log replay failed to complete.

2 participants