Skip to content

Commit 377e23d

Browse files
committed
storage: process intents synchronously
Process intents synchronously on the goroutine which generated them.
1 parent aecff9f commit 377e23d

File tree

2 files changed

+20
-19
lines changed

2 files changed

+20
-19
lines changed

pkg/storage/replica.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ type proposalResult struct {
183183
Reply *roachpb.BatchResponse
184184
Err *roachpb.Error
185185
ProposalRetry proposalRetryReason
186+
Intents []intentsWithArg
186187
}
187188

188189
type replicaChecksum struct {
@@ -1667,7 +1668,7 @@ func (r *Replica) addReadOnlyCmd(
16671668
if result.Local.intents != nil && len(*result.Local.intents) > 0 {
16681669
log.Eventf(ctx, "submitting %d intents to asynchronous processing",
16691670
len(*result.Local.intents))
1670-
r.store.intentResolver.processIntentsAsync(r, *result.Local.intents)
1671+
r.store.intentResolver.processIntents(r, *result.Local.intents)
16711672
}
16721673
if pErr != nil {
16731674
log.ErrEvent(ctx, pErr.String())
@@ -1869,6 +1870,11 @@ func (r *Replica) tryAddWriteCmd(
18691870
// Set endCmds to nil because they have already been invoked
18701871
// in processRaftCommand.
18711872
endCmds = nil
1873+
if propResult.Intents != nil {
1874+
// Synchronously process any intents that need resolving here in order
1875+
// to apply back pressure on the client which generated them.
1876+
r.store.intentResolver.processIntents(r, propResult.Intents)
1877+
}
18721878
return propResult.Reply, propResult.Err, propResult.ProposalRetry
18731879
case <-ctxDone:
18741880
// If our context was cancelled, return an AmbiguousResultError
@@ -2077,12 +2083,13 @@ func (r *Replica) propose(
20772083
// An error here corresponds to a failfast-proposal: The command resulted
20782084
// in an error and did not need to commit a batch (the common error case).
20792085
if pErr != nil {
2086+
intents := pCmd.Local.detachIntents()
20802087
r.handleEvalResult(ctx, repDesc, pCmd.Local, pCmd.Replicated)
20812088
if endCmds != nil {
20822089
endCmds.done(nil, pErr, proposalNoRetry)
20832090
}
20842091
ch := make(chan proposalResult, 1)
2085-
ch <- proposalResult{Err: pErr}
2092+
ch <- proposalResult{Err: pErr, Intents: intents}
20862093
close(ch)
20872094
return ch, func() bool { return false }, nil
20882095
}
@@ -3287,6 +3294,7 @@ func (r *Replica) processRaftCommand(
32873294
} else {
32883295
log.Fatalf(ctx, "proposal must return either a reply or an error: %+v", cmd)
32893296
}
3297+
response.Intents = cmd.Local.detachIntents()
32903298
lResult = cmd.Local
32913299
}
32923300

@@ -4228,7 +4236,7 @@ func (r *Replica) loadSystemConfigSpan() ([]roachpb.KeyValue, []byte, error) {
42284236
// There were intents, so what we read may not be consistent. Attempt
42294237
// to nudge the intents in case they're expired; next time around we'll
42304238
// hopefully have more luck.
4231-
r.store.intentResolver.processIntentsAsync(r, *result.Local.intents)
4239+
r.store.intentResolver.processIntents(r, *result.Local.intents)
42324240
return nil, nil, errSystemConfigIntent
42334241
}
42344242
kvs := br.Responses[0].GetInner().(*roachpb.ScanResponse).Rows

pkg/storage/replica_proposal.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ func (lResult *LocalEvalResult) finish(pr proposalResult) {
110110
close(lResult.doneCh)
111111
}
112112

113+
func (lResult *LocalEvalResult) detachIntents() []intentsWithArg {
114+
if lResult.intents == nil {
115+
return nil
116+
}
117+
intents := *lResult.intents
118+
lResult.intents = nil
119+
return intents
120+
}
121+
113122
// EvalResult is the result of evaluating a KV request. That is, the
114123
// proposer (which holds the lease, at least in the case in which the command
115124
// will complete successfully) has evaluated the request and is holding on to:
@@ -600,22 +609,6 @@ func (r *Replica) handleLocalEvalResult(
600609
// Non-state updates and actions.
601610
// ======================
602611

603-
if originReplica.StoreID == r.store.StoreID() {
604-
// On the replica on which this command originated, resolve skipped
605-
// intents asynchronously - even on failure.
606-
//
607-
// TODO(tschottdorf): EndTransaction will use this pathway to return
608-
// intents which should immediately be resolved. However, there's
609-
// a slight chance that an error between the origin of that intents
610-
// slice and here still results in that intent slice arriving here
611-
// without the EndTransaction having committed. We should clearly
612-
// separate the part of the EvalResult which also applies on errors.
613-
if lResult.intents != nil {
614-
r.store.intentResolver.processIntentsAsync(r, *lResult.intents)
615-
}
616-
}
617-
lResult.intents = nil
618-
619612
// The above are present too often, so we assert only if there are
620613
// "nontrivial" actions below.
621614
shouldAssert = (lResult != LocalEvalResult{})

0 commit comments

Comments
 (0)