Skip to content

Commit a73e8c6

Browse files
craig[bot]benesch
andcommitted
Merge #27149
27149: storage: test transactions that span merges r=tschottdorf,bdarnell a=benesch Add tests to verify the correct behavior of transactions that span a merge. In the process, resolve TODOs about a) copying the abort span from the RHS to the LHS during a merge, and b) clearing the RHS's transaction wait queue. Fixes #27091. Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
2 parents 07a11b8 + 9ea260f commit a73e8c6

File tree

5 files changed

+302
-57
lines changed

5 files changed

+302
-57
lines changed

pkg/storage/abortspan/abortspan.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/roachpb"
2222
"github.com/cockroachdb/cockroach/pkg/storage/engine"
2323
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
24+
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
2425
"github.com/cockroachdb/cockroach/pkg/util/hlc"
26+
"github.com/cockroachdb/cockroach/pkg/util/log"
2527
"github.com/cockroachdb/cockroach/pkg/util/uuid"
28+
"github.com/pkg/errors"
2629
)
2730

2831
// An AbortSpan sets markers for aborted transactions to provide protection
@@ -140,3 +143,54 @@ func (sc *AbortSpan) Put(
140143
key := keys.AbortSpanKey(sc.rangeID, txnID)
141144
return engine.MVCCPutProto(ctx, e, ms, key, hlc.Timestamp{}, nil /* txn */, entry)
142145
}
146+
147+
// CopyTo copies the abort span entries to the abort span for the range
148+
// identified by newRangeID. Entries are read from r and written to w. It is
149+
// safe for r and w to be the same object.
150+
//
151+
// CopyTo takes care to only copy records that are required: certain workloads
152+
// create sizable abort spans, and repeated splitting can blow them up further.
153+
// Once it reaches approximately the Raft MaxCommandSize, splits become
154+
// impossible, which is pretty bad (see #25233).
155+
func (sc *AbortSpan) CopyTo(
156+
ctx context.Context,
157+
r engine.Reader,
158+
w engine.ReadWriter,
159+
ms *enginepb.MVCCStats,
160+
ts hlc.Timestamp,
161+
newRangeID roachpb.RangeID,
162+
) error {
163+
var abortSpanCopyCount, abortSpanSkipCount int
164+
// Abort span entries before this span are eligible for GC, so we don't
165+
// copy them into the new range. We could try to delete them from the LHS
166+
// as well, but that could create a large Raft command in itself. Plus,
167+
// we'd have to adjust the stats computations.
168+
threshold := ts.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0)
169+
var scratch [64]byte
170+
if err := sc.Iterate(ctx, r, func(k roachpb.Key, entry roachpb.AbortSpanEntry) error {
171+
if entry.Timestamp.Less(threshold) {
172+
// The entry would be garbage collected (if GC had run), so
173+
// don't bother copying it. Note that we can't filter on the key,
174+
// that is just where the txn record lives, but it doesn't tell
175+
// us whether the intents that triggered the abort span record
176+
// where on the LHS, RHS, or both.
177+
abortSpanSkipCount++
178+
return nil
179+
}
180+
181+
abortSpanCopyCount++
182+
var txnID uuid.UUID
183+
txnID, err := keys.DecodeAbortSpanKey(k, scratch[:0])
184+
if err != nil {
185+
return err
186+
}
187+
return engine.MVCCPutProto(ctx, w, ms,
188+
keys.AbortSpanKey(newRangeID, txnID),
189+
hlc.Timestamp{}, nil, &entry,
190+
)
191+
}); err != nil {
192+
return roachpb.NewReplicaCorruptionError(errors.Wrap(err, "AbortSpan.CopyTo"))
193+
}
194+
log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount)
195+
return nil
196+
}

pkg/storage/batcheval/cmd_end_transaction.go

Lines changed: 21 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
3434
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3535
"github.com/cockroachdb/cockroach/pkg/util/log"
36-
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3736
"github.com/pkg/errors"
3837
)
3938

@@ -116,7 +115,7 @@ func declareKeysEndTransaction(
116115
}
117116
if mt := et.InternalCommitTrigger.MergeTrigger; mt != nil {
118117
// Merges write to the left side's abort span and the right side's data
119-
// span.
118+
// and range-local spans.
120119
leftRangeIDPrefix := keys.MakeRangeIDReplicatedPrefix(header.RangeID)
121120
spans.Add(spanset.SpanReadWrite, roachpb.Span{
122121
Key: leftRangeIDPrefix,
@@ -126,6 +125,10 @@ func declareKeysEndTransaction(
126125
Key: mt.RightDesc.StartKey.AsRawKey(),
127126
EndKey: mt.RightDesc.EndKey.AsRawKey(),
128127
})
128+
spans.Add(spanset.SpanReadWrite, roachpb.Span{
129+
Key: keys.MakeRangeKeyPrefix(mt.RightDesc.StartKey),
130+
EndKey: keys.MakeRangeKeyPrefix(mt.RightDesc.EndKey),
131+
})
129132
}
130133
}
131134
}
@@ -776,45 +779,11 @@ func splitTrigger(
776779
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy last replica GC timestamp")
777780
}
778781

779-
// Initialize the RHS range's AbortSpan by copying the LHS's. Put a little extra
780-
// effort into only copying records that are required: certain workloads create
781-
// sizable abort spans, and repeated splitting can blow them up further. Once
782-
// it reaches approximately the Raft MaxCommandSize, splits become impossible,
783-
// which is pretty bad (see #25233).
784-
{
785-
var abortSpanCopyCount, abortSpanSkipCount int
786-
// Abort span entries before this span are eligible for GC, so we don't
787-
// copy them into the new range. We could try to delete them from the LHS
788-
// as well, but that could create a large Raft command in itself. Plus,
789-
// we'd have to adjust the stats computations.
790-
threshold := ts.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0)
791-
var scratch [64]byte
792-
if err := rec.AbortSpan().Iterate(ctx, batch, func(k roachpb.Key, entry roachpb.AbortSpanEntry) error {
793-
if entry.Timestamp.Less(threshold) {
794-
// The entry would be garbage collected (if GC had run), so
795-
// don't bother copying it. Note that we can't filter on the key,
796-
// that is just where the txn record lives, but it doesn't tell
797-
// us whether the intents that triggered the abort span record
798-
// where on the LHS, RHS, or both.
799-
abortSpanSkipCount++
800-
return nil
801-
}
802-
803-
abortSpanCopyCount++
804-
var txnID uuid.UUID
805-
txnID, err = keys.DecodeAbortSpanKey(k, scratch[:0])
806-
if err != nil {
807-
return err
808-
}
809-
return engine.MVCCPutProto(ctx, batch, &bothDeltaMS,
810-
keys.AbortSpanKey(split.RightDesc.RangeID, txnID),
811-
hlc.Timestamp{}, nil, &entry,
812-
)
813-
}); err != nil {
814-
// TODO(tschottdorf): ReplicaCorruptionError.
815-
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy AbortSpan to RHS split range")
816-
}
817-
log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount)
782+
// Initialize the RHS range's AbortSpan by copying the LHS's.
783+
if err := rec.AbortSpan().CopyTo(
784+
ctx, batch, batch, &bothDeltaMS, ts, split.RightDesc.RangeID,
785+
); err != nil {
786+
return enginepb.MVCCStats{}, result.Result{}, err
818787
}
819788

820789
// Compute (absolute) stats for RHS range.
@@ -1025,25 +994,21 @@ func mergeTrigger(
1025994
return result.Result{}, err
1026995
}
1027996

1028-
// TODO(benesch): copy the non-expired abort span records from the RHS into
1029-
// the LHS. See the corresponding code for splits.
1030-
1031-
// Delete the RHS's range ID keys. Besides the abort span, which we copied
1032-
// above, it's all irrelevant.
1033-
rightRangeIDKey := keys.MakeRangeIDPrefix(merge.RightDesc.RangeID)
1034-
if err := eng.ClearRange(
1035-
engine.MakeMVCCMetadataKey(rightRangeIDKey),
1036-
engine.MakeMVCCMetadataKey(rightRangeIDKey.PrefixEnd()),
997+
if err := abortspan.New(merge.RightDesc.RangeID).CopyTo(
998+
ctx, eng, batch, ms, ts, merge.LeftDesc.RangeID,
1037999
); err != nil {
10381000
return result.Result{}, err
10391001
}
10401002

1041-
// Copy the rewritten RHS data into the command's batch.
1003+
// Copy the RHS data into the command's batch. We skip over the range-ID local
1004+
// keys. The abort span is the only relevant part of the range-ID local
1005+
// keyspace, and we already copied it above.
1006+
rhsRelevantStartKey := engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(merge.RightDesc.StartKey))
10421007
iter := eng.NewIterator(engine.IterOptions{
10431008
UpperBound: roachpb.KeyMax, // all the data in this engine is relevant
10441009
})
10451010
defer iter.Close()
1046-
for iter.Seek(engine.MVCCKey{}); ; iter.Next() {
1011+
for iter.Seek(rhsRelevantStartKey); ; iter.Next() {
10471012
if ok, err := iter.Valid(); err != nil {
10481013
return result.Result{}, err
10491014
} else if !ok {
@@ -1054,8 +1019,10 @@ func mergeTrigger(
10541019
}
10551020
}
10561021

1057-
// Adjust stats for the rewritten RHS data.
1058-
rhsMS, err := iter.ComputeStats(engine.MVCCKey{}, engine.MVCCKeyMax, 0 /* nowNanos */)
1022+
// Adjust stats for the rewritten RHS data. Again, we skip over the range-ID
1023+
// local keys, as only the abort span is relevant and its stats were accounted
1024+
// for above.
1025+
rhsMS, err := iter.ComputeStats(rhsRelevantStartKey, engine.MVCCKeyMax, 0 /* nowNanos */)
10591026
if err != nil {
10601027
return result.Result{}, err
10611028
}

0 commit comments

Comments
 (0)