Skip to content

Commit 9be8e23

Browse files
committed
storage: test transactions that span merges
Add tests to verify the correct behavior of transactions that span a merge. In the process, resolve TODOs about a) copying the abort span from the RHS to the LHS during a merge, and b) clearing the RHS's transaction wait queue.
1 parent faa1d2b commit 9be8e23

File tree

5 files changed

+288
-56
lines changed

5 files changed

+288
-56
lines changed

pkg/storage/abortspan/abortspan.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import (
2121
"github.com/cockroachdb/cockroach/pkg/roachpb"
2222
"github.com/cockroachdb/cockroach/pkg/storage/engine"
2323
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
24+
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
2425
"github.com/cockroachdb/cockroach/pkg/util/hlc"
26+
"github.com/cockroachdb/cockroach/pkg/util/log"
2527
"github.com/cockroachdb/cockroach/pkg/util/uuid"
28+
"github.com/pkg/errors"
2629
)
2730

2831
// An AbortSpan sets markers for aborted transactions to provide protection
@@ -140,3 +143,54 @@ func (sc *AbortSpan) Put(
140143
key := keys.AbortSpanKey(sc.rangeID, txnID)
141144
return engine.MVCCPutProto(ctx, e, ms, key, hlc.Timestamp{}, nil /* txn */, entry)
142145
}
146+
147+
// CopyTo copies the abort span entries to the abort span for the range
148+
// identified by newRangeID. Entries are read from r and written to w. It is
149+
// safe for r and w to be the same object.
150+
//
151+
// CopyTo takes care to only copy records that are required: certain workloads
152+
// create sizable abort spans, and repeated splitting can blow them up further.
153+
// Once it reaches approximately the Raft MaxCommandSize, splits become
154+
// impossible, which is pretty bad (see #25233).
155+
func (sc *AbortSpan) CopyTo(
156+
ctx context.Context,
157+
r engine.Reader,
158+
w engine.ReadWriter,
159+
ms *enginepb.MVCCStats,
160+
ts hlc.Timestamp,
161+
newRangeID roachpb.RangeID,
162+
) error {
163+
var abortSpanCopyCount, abortSpanSkipCount int
164+
// Abort span entries before this span are eligible for GC, so we don't
165+
// copy them into the new range. We could try to delete them from the LHS
166+
// as well, but that could create a large Raft command in itself. Plus,
167+
// we'd have to adjust the stats computations.
168+
threshold := ts.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0)
169+
var scratch [64]byte
170+
if err := sc.Iterate(ctx, r, func(k roachpb.Key, entry roachpb.AbortSpanEntry) error {
171+
if entry.Timestamp.Less(threshold) {
172+
// The entry would be garbage collected (if GC had run), so
173+
// don't bother copying it. Note that we can't filter on the key,
174+
// that is just where the txn record lives, but it doesn't tell
175+
// us whether the intents that triggered the abort span record
176+
// where on the LHS, RHS, or both.
177+
abortSpanSkipCount++
178+
return nil
179+
}
180+
181+
abortSpanCopyCount++
182+
var txnID uuid.UUID
183+
txnID, err := keys.DecodeAbortSpanKey(k, scratch[:0])
184+
if err != nil {
185+
return err
186+
}
187+
return engine.MVCCPutProto(ctx, w, ms,
188+
keys.AbortSpanKey(newRangeID, txnID),
189+
hlc.Timestamp{}, nil, &entry,
190+
)
191+
}); err != nil {
192+
return roachpb.NewReplicaCorruptionError(errors.Wrap(err, "AbortSpan.CopyTo"))
193+
}
194+
log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount)
195+
return nil
196+
}

pkg/storage/batcheval/cmd_end_transaction.go

Lines changed: 16 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
3434
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3535
"github.com/cockroachdb/cockroach/pkg/util/log"
36-
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3736
"github.com/pkg/errors"
3837
)
3938

@@ -772,45 +771,11 @@ func splitTrigger(
772771
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy last replica GC timestamp")
773772
}
774773

775-
// Initialize the RHS range's AbortSpan by copying the LHS's. Put a little extra
776-
// effort into only copying records that are required: certain workloads create
777-
// sizable abort spans, and repeated splitting can blow them up further. Once
778-
// it reaches approximately the Raft MaxCommandSize, splits become impossible,
779-
// which is pretty bad (see #25233).
780-
{
781-
var abortSpanCopyCount, abortSpanSkipCount int
782-
// Abort span entries before this span are eligible for GC, so we don't
783-
// copy them into the new range. We could try to delete them from the LHS
784-
// as well, but that could create a large Raft command in itself. Plus,
785-
// we'd have to adjust the stats computations.
786-
threshold := ts.Add(-storagebase.TxnCleanupThreshold.Nanoseconds(), 0)
787-
var scratch [64]byte
788-
if err := rec.AbortSpan().Iterate(ctx, batch, func(k roachpb.Key, entry roachpb.AbortSpanEntry) error {
789-
if entry.Timestamp.Less(threshold) {
790-
// The entry would be garbage collected (if GC had run), so
791-
// don't bother copying it. Note that we can't filter on the key,
792-
// that is just where the txn record lives, but it doesn't tell
793-
// us whether the intents that triggered the abort span record
794-
// where on the LHS, RHS, or both.
795-
abortSpanSkipCount++
796-
return nil
797-
}
798-
799-
abortSpanCopyCount++
800-
var txnID uuid.UUID
801-
txnID, err = keys.DecodeAbortSpanKey(k, scratch[:0])
802-
if err != nil {
803-
return err
804-
}
805-
return engine.MVCCPutProto(ctx, batch, &bothDeltaMS,
806-
keys.AbortSpanKey(split.RightDesc.RangeID, txnID),
807-
hlc.Timestamp{}, nil, &entry,
808-
)
809-
}); err != nil {
810-
// TODO(tschottdorf): ReplicaCorruptionError.
811-
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy AbortSpan to RHS split range")
812-
}
813-
log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount)
774+
// Initialize the RHS range's AbortSpan by copying the LHS's.
775+
if err := rec.AbortSpan().CopyTo(
776+
ctx, batch, batch, &bothDeltaMS, ts, split.RightDesc.RangeID,
777+
); err != nil {
778+
return enginepb.MVCCStats{}, result.Result{}, err
814779
}
815780

816781
// Compute (absolute) stats for RHS range.
@@ -1021,23 +986,19 @@ func mergeTrigger(
1021986
return result.Result{}, err
1022987
}
1023988

1024-
// TODO(benesch): copy the non-expired abort span records from the RHS into
1025-
// the LHS. See the corresponding code for splits.
1026-
1027-
// Delete the RHS's range ID keys. Besides the abort span, which we copied
1028-
// above, it's all irrelevant.
1029-
rightRangeIDKey := keys.MakeRangeIDPrefix(merge.RightDesc.RangeID)
1030-
if err := eng.ClearRange(
1031-
engine.MakeMVCCMetadataKey(rightRangeIDKey),
1032-
engine.MakeMVCCMetadataKey(rightRangeIDKey.PrefixEnd()),
989+
if err := abortspan.New(merge.RightDesc.RangeID).CopyTo(
990+
ctx, eng, batch, ms, ts, merge.LeftDesc.RangeID,
1033991
); err != nil {
1034992
return result.Result{}, err
1035993
}
1036994

1037-
// Copy the rewritten RHS data into the command's batch.
995+
// Copy the RHS data into the command's batch. We skip over the range-ID local
996+
// keys. The abort span is the only relevant part of the range-ID local
997+
// keyspace, and we already copied it above.
998+
rhsRelevantStartKey := engine.MakeMVCCMetadataKey(keys.MakeRangeKeyPrefix(merge.RightDesc.StartKey))
1038999
iter := eng.NewIterator(engine.IterOptions{})
10391000
defer iter.Close()
1040-
for iter.Seek(engine.MVCCKey{}); ; iter.Next() {
1001+
for iter.Seek(rhsRelevantStartKey); ; iter.Next() {
10411002
if ok, err := iter.Valid(); err != nil {
10421003
return result.Result{}, err
10431004
} else if !ok {
@@ -1048,8 +1009,10 @@ func mergeTrigger(
10481009
}
10491010
}
10501011

1051-
// Adjust stats for the rewritten RHS data.
1052-
rhsMS, err := iter.ComputeStats(engine.MVCCKey{}, engine.MVCCKeyMax, 0 /* nowNanos */)
1012+
// Adjust stats for the rewritten RHS data. Again, we skip over the range-ID
1013+
// local keys, as only the abort span is relevant and its stats were accounted
1014+
// for above.
1015+
rhsMS, err := iter.ComputeStats(rhsRelevantStartKey, engine.MVCCKeyMax, 0 /* nowNanos */)
10531016
if err != nil {
10541017
return result.Result{}, err
10551018
}

pkg/storage/client_merge_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/storage"
3030
"github.com/cockroachdb/cockroach/pkg/storage/engine"
3131
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
32+
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
3233
"github.com/cockroachdb/cockroach/pkg/testutils"
3334
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3435
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -375,6 +376,38 @@ func TestStoreRangeMergeStats(t *testing.T) {
375376
writeRandomDataToRange(t, store, aDesc.RangeID, []byte("aaa"))
376377
writeRandomDataToRange(t, store, bDesc.RangeID, []byte("ccc"))
377378

379+
// Litter some abort span records. txn1 will leave a record on the LHS, txn2
380+
// will leave a record on the RHS, and txn3 will leave a record on both. This
381+
// tests whether the merge code properly accounts for merging abort span
382+
// records for the same transaction.
383+
txn1 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
384+
if err := txn1.Put(ctx, "a-txn1", "val"); err != nil {
385+
t.Fatal(err)
386+
}
387+
txn2 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
388+
if err := txn2.Put(ctx, "c-txn2", "val"); err != nil {
389+
t.Fatal(err)
390+
}
391+
txn3 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
392+
if err := txn3.Put(ctx, "a-txn3", "val"); err != nil {
393+
t.Fatal(err)
394+
}
395+
if err := txn3.Put(ctx, "c-txn3", "val"); err != nil {
396+
t.Fatal(err)
397+
}
398+
hiPriTxn := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
399+
hiPriTxn.InternalSetPriority(roachpb.MaxTxnPriority)
400+
for _, key := range []string{"a-txn1", "c-txn2", "a-txn3", "c-txn3"} {
401+
if err := hiPriTxn.Put(ctx, key, "val"); err != nil {
402+
t.Fatal(err)
403+
}
404+
}
405+
if err := hiPriTxn.Commit(ctx); err != nil {
406+
t.Fatal(err)
407+
}
408+
// Leave txn1-txn3 open so that their abort span records exist during the
409+
// merge below.
410+
378411
// Get the range stats for both ranges now that we have data.
379412
snap := store.Engine().NewSnapshot()
380413
defer snap.Close()
@@ -418,6 +451,180 @@ func TestStoreRangeMergeStats(t *testing.T) {
418451
}
419452
}
420453

454+
func TestStoreRangeMergeInFlightTxns(t *testing.T) {
455+
defer leaktest.AfterTest(t)
456+
457+
ctx := context.Background()
458+
sc := storage.TestStoreConfig(nil)
459+
sc.TestingKnobs.DisableReplicateQueue = true
460+
mtc := &multiTestContext{storeConfig: &sc}
461+
mtc.Start(t, 2)
462+
defer mtc.Stop()
463+
store := mtc.stores[0]
464+
465+
// Create two adjacent ranges. The left-hand range has exactly one replica,
466+
// on the first store, and the right-hand range has exactly one replica,
467+
// on the second store
468+
var lhsDesc, rhsDesc *roachpb.RangeDescriptor
469+
setupReplicas := func() {
470+
var err *roachpb.Error
471+
lhsDesc, rhsDesc, err = createSplitRanges(store)
472+
if err != nil {
473+
t.Fatal(err)
474+
}
475+
mtc.replicateRange(rhsDesc.RangeID, 1)
476+
mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1)
477+
mtc.unreplicateRange(rhsDesc.RangeID, 0)
478+
}
479+
480+
// Verify that a transaction can span a merge.
481+
t.Run("valid", func(t *testing.T) {
482+
setupReplicas()
483+
lhsKey, rhsKey := roachpb.Key("aa"), roachpb.Key("cc")
484+
485+
txn := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
486+
// Put the key on the RHS side first so ownership of the transaction record
487+
// will need to transfer to the LHS range during the merge.
488+
if err := txn.Put(ctx, rhsKey, t.Name()); err != nil {
489+
t.Fatal(err)
490+
}
491+
if err := txn.Put(ctx, lhsKey, t.Name()); err != nil {
492+
t.Fatal(err)
493+
}
494+
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
495+
if _, err := client.SendWrapped(ctx, store.TestSender(), args); err != nil {
496+
t.Fatal(err)
497+
}
498+
if err := txn.Commit(ctx); err != nil {
499+
t.Fatal(err)
500+
}
501+
502+
for _, key := range []roachpb.Key{lhsKey, rhsKey} {
503+
kv, err := store.DB().Get(ctx, key)
504+
if err != nil {
505+
t.Fatal(err)
506+
} else if string(kv.ValueBytes()) != t.Name() {
507+
t.Fatalf("actual value %q did not match expected value %q", kv.ValueBytes(), t.Name())
508+
}
509+
}
510+
})
511+
512+
// Verify that a transaction's abort span records are preserved when the
513+
// transaction spans a merge.
514+
t.Run("abort-span", func(t *testing.T) {
515+
setupReplicas()
516+
rhsKey := roachpb.Key("cc")
517+
518+
// Create a transaction that will be aborted before the merge but won't
519+
// realize until after the merge.
520+
txn1 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
521+
// Put the key on the RHS side so ownership of the transaction record and
522+
// abort span records will need to transfer to the LHS during the merge.
523+
if err := txn1.Put(ctx, rhsKey, t.Name()); err != nil {
524+
t.Fatal(err)
525+
}
526+
527+
// Create and commit a txn that aborts txn1.
528+
txn2 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
529+
txn2.InternalSetPriority(roachpb.MaxTxnPriority)
530+
if err := txn2.Put(ctx, rhsKey, "muhahahah"); err != nil {
531+
t.Fatal(err)
532+
}
533+
if err := txn2.Commit(ctx); err != nil {
534+
t.Fatal(err)
535+
}
536+
537+
// Complete the merge.
538+
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
539+
if _, err := client.SendWrapped(ctx, store.TestSender(), args); err != nil {
540+
t.Fatal(err)
541+
}
542+
if _, err := txn1.Get(ctx, rhsKey); !testutils.IsError(err, "txn aborted") {
543+
t.Fatalf("expected 'txn aborted' error but got %v", err)
544+
}
545+
})
546+
547+
// Verify that the transaction wait queue on the right-hand range in a merge
548+
// is cleared if the merge commits.
549+
t.Run("wait-queue", func(t *testing.T) {
550+
setupReplicas()
551+
rhsKey := roachpb.Key("cc")
552+
553+
// Set a timeout, and set the the transaction liveness threshold to
554+
// something much larger than our timeout. We want transactions to get stuck
555+
// in the transaction wait queue and trigger the timeout if we forget to
556+
// clear it.
557+
ctx, cancel := context.WithTimeout(ctx, testutils.DefaultSucceedsSoonDuration)
558+
defer cancel()
559+
defer func(old time.Duration) { txnwait.TxnLivenessThreshold = old }(txnwait.TxnLivenessThreshold)
560+
txnwait.TxnLivenessThreshold = 2 * testutils.DefaultSucceedsSoonDuration
561+
562+
// Create a transaction that won't complete until after the merge.
563+
txn1 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
564+
// Put the key on the RHS side so ownership of the transaction record and
565+
// abort span records will need to transfer to the LHS during the merge.
566+
if err := txn1.Put(ctx, rhsKey, t.Name()); err != nil {
567+
t.Fatal(err)
568+
}
569+
570+
// Create a txn that will conflict with txn1.
571+
txn2 := client.NewTxn(store.DB(), 0 /* gatewayNodeID */, client.RootTxn)
572+
txn2ErrCh := make(chan error)
573+
go func() {
574+
txn2ErrCh <- txn2.Put(ctx, rhsKey, "muhahahah")
575+
}()
576+
577+
// Wait for txn2 to realize it conflicts with txn1 and enter its wait queue.
578+
{
579+
repl, err := mtc.stores[1].GetReplica(rhsDesc.RangeID)
580+
if err != nil {
581+
t.Fatal(err)
582+
}
583+
for {
584+
if _, ok := repl.GetTxnWaitQueue().TrackedTxns()[txn1.ID()]; ok {
585+
break
586+
}
587+
select {
588+
case <-time.After(10 * time.Millisecond):
589+
case <-ctx.Done():
590+
t.Fatal("timed out waiting for txn2 to enter wait queue")
591+
}
592+
}
593+
}
594+
595+
// Complete the merge.
596+
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
597+
if _, err := client.SendWrapped(ctx, store.TestSender(), args); err != nil {
598+
t.Fatal(err)
599+
}
600+
601+
if err := txn1.Commit(ctx); err != nil {
602+
t.Fatal(err)
603+
}
604+
605+
kv, pErr := store.DB().Get(ctx, rhsKey)
606+
if pErr != nil {
607+
t.Fatal(pErr)
608+
} else if string(kv.ValueBytes()) != t.Name() {
609+
t.Fatalf("actual value %q did not match expected value %q", kv.ValueBytes(), t.Name())
610+
}
611+
612+
// Now that txn1 has committed, txn2's put operation should complete.
613+
select {
614+
case err := <-txn2ErrCh:
615+
if err != nil {
616+
t.Fatal(err)
617+
}
618+
case <-ctx.Done():
619+
t.Fatal("timed out waiting for txn2 to complete put")
620+
}
621+
622+
if err := txn2.Commit(ctx); err != nil {
623+
t.Fatal(err)
624+
}
625+
})
626+
}
627+
421628
func TestInvalidGetSnapshotForMergeRequest(t *testing.T) {
422629
defer leaktest.AfterTest(t)()
423630

0 commit comments

Comments
 (0)