@@ -497,14 +497,19 @@ func (v *dataTxValidator) parallelReadMvccValidation(
497497) error {
498498 reads := make (map [string ]map [string ]* readCache )
499499 errorChan := make (chan error )
500+ defer close (errorChan )
500501
501- // Submit a "get-version" Go routine for each key in the envelope.
502+ // Submit a "get-version" Go routine for each read key in the envelope.
502503 // We avoid reading the same key twice.
503504 for txNum , txEnv := range dataTxEnvs {
504505 if valInfoArray [txNum ].Flag != types .Flag_VALID {
505506 continue
506507 }
507508 for _ , txOps := range txEnv .Payload .DbOperations {
509+ if len (txOps .DataReads ) == 0 {
510+ continue
511+ }
512+
508513 dbName := txOps .DbName
509514 dbReads , ok := reads [dbName ]
510515 if ! ok {
@@ -513,25 +518,23 @@ func (v *dataTxValidator) parallelReadMvccValidation(
513518 }
514519
515520 for _ , r := range txOps .DataReads {
516- key := r .Key
517- if _ , ok := dbReads [key ]; ok {
521+ if _ , ok := dbReads [r .Key ]; ok {
518522 continue
519523 }
520524
521525 c := & readCache {
522526 dbName : dbName ,
523- key : key ,
527+ key : r . Key ,
524528 }
525529 c .wg .Add (1 )
526- dbReads [key ] = c
530+ dbReads [r . Key ] = c
527531 go func (txNum int , c * readCache ) {
528532 defer c .wg .Done ()
529533 c .ver , c .err = v .db .GetVersion (c .dbName , c .key )
530534 if c .err != nil {
531535 v .logger .Errorf ("error validating signatures in tx number %d, error: %s" , txNum , c .err )
532536 defer func () {
533- // Ignore panic when errorChan is closed
534- recover ()
537+ recover () // Ignore panic when errorChan is closed
535538 }()
536539 errorChan <- c .err
537540 }
@@ -543,45 +546,56 @@ func (v *dataTxValidator) parallelReadMvccValidation(
543546 // Submit a "validation" Go routine for read operation in the envelope.
544547 wg := sync.WaitGroup {}
545548 for txNum , txEnv := range dataTxEnvs {
549+ if valInfoArray [txNum ].Flag != types .Flag_VALID {
550+ continue
551+ }
546552 for _ , txOps := range txEnv .Payload .DbOperations {
553+ if len (txOps .DataReads ) == 0 {
554+ continue
555+ }
556+ dbReads , ok := reads [txOps .DbName ]
557+ if ! ok {
558+ panic ("all read DBs should be in the map" )
559+ }
547560 for _ , r := range txOps .DataReads {
548- if valInfoArray [txNum ].Flag != types .Flag_VALID {
549- continue
561+ keyRead , ok := dbReads [r .Key ]
562+ if ! ok {
563+ panic ("all read keys should be in the map" )
550564 }
551565
552566 wg .Add (1 )
553567 go func (txNum int , c * readCache , expectedVer * types.Version ) {
554568 defer wg .Done ()
555- if c == nil {
556- panic ("all reads keys should be in the map" )
569+ // Stop early in case another validation routine already invalidated this TX.
570+ if valInfoArray [txNum ].Flag != types .Flag_VALID {
571+ return
557572 }
558573
559574 c .wg .Wait ()
560- if valInfoArray [txNum ].Flag != types .Flag_VALID || c .err != nil {
561- return
562- }
563- if proto .Equal (expectedVer , c .ver ) {
564- return
565- }
566- valInfoArray [txNum ] = & types.ValidationInfo {
567- Flag : types .Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE ,
568- ReasonIfInvalid : "mvcc conflict has occurred as the committed state for the key [" + c .key + "] in database [" + c .dbName + "] changed" ,
575+
576+ // We check the flag again after waiting for the read version.
577+ // The version comparison is last to avoid redundant comparison (short circuit evaluation).
578+ if c .err == nil && valInfoArray [txNum ].Flag == types .Flag_VALID && ! proto .Equal (expectedVer , c .ver ) {
579+ valInfoArray [txNum ] = & types.ValidationInfo {
580+ Flag : types .Flag_INVALID_MVCC_CONFLICT_WITH_COMMITTED_STATE ,
581+ ReasonIfInvalid : "mvcc conflict has occurred as the committed state for the key [" + c .key + "] in database [" + c .dbName + "] changed" ,
582+ }
569583 }
570- }(txNum , reads [txOps. DbName ][r. Key ] , r .Version )
584+ }(txNum , keyRead , r .Version )
571585 }
572586 }
573587 }
574588
575- // Wait for all the validation routines to end.
589+ // Wait in the background for all the validation routines to end, then inject nil to make sure we have data
590+ // to read from the channel if no error occurred.
576591 go func () {
577592 wg .Wait ()
578- // Inject nil to make sure we have data to read from the channel if no error occurred.
579593 errorChan <- nil
580594 }()
581595
596+ // Wait for all the validation routines to end or for the first error.
582597 select {
583598 case err := <- errorChan :
584- close (errorChan )
585599 return err
586600 }
587601}
0 commit comments