51
51
import org .elasticsearch .cluster .routing .AllocationId ;
52
52
import org .elasticsearch .common .CheckedFunction ;
53
53
import org .elasticsearch .common .Nullable ;
54
+ import org .elasticsearch .common .Randomness ;
54
55
import org .elasticsearch .common .Strings ;
55
56
import org .elasticsearch .common .bytes .BytesArray ;
56
57
import org .elasticsearch .common .bytes .BytesReference ;
@@ -704,6 +705,32 @@ public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica
704
705
return ops ;
705
706
}
706
707
708
+ public List <Engine .Operation > generateReplicaHistory (int numOps , boolean allowGapInSeqNo ) {
709
+ long seqNo = 0 ;
710
+ List <Engine .Operation > operations = new ArrayList <>(numOps );
711
+ for (int i = 0 ; i < numOps ; i ++) {
712
+ String id = Integer .toString (between (1 , 100 ));
713
+ final ParsedDocument doc = EngineTestCase .createParsedDoc (id , null );
714
+ if (randomBoolean ()) {
715
+ operations .add (new Engine .Index (EngineTestCase .newUid (doc ), doc , seqNo , primaryTerm .get (),
716
+ i , null , Engine .Operation .Origin .REPLICA , threadPool .relativeTimeInMillis (),
717
+ -1 , true ));
718
+ } else if (randomBoolean ()) {
719
+ operations .add (new Engine .Delete (doc .type (), doc .id (), EngineTestCase .newUid (doc ), seqNo , primaryTerm .get (),
720
+ i , null , Engine .Operation .Origin .REPLICA , threadPool .relativeTimeInMillis ()));
721
+ } else {
722
+ operations .add (new Engine .NoOp (seqNo , primaryTerm .get (), Engine .Operation .Origin .REPLICA ,
723
+ threadPool .relativeTimeInMillis (), "test-" + i ));
724
+ }
725
+ seqNo ++;
726
+ if (allowGapInSeqNo && rarely ()) {
727
+ seqNo ++;
728
+ }
729
+ }
730
+ Randomness .shuffle (operations );
731
+ return operations ;
732
+ }
733
+
707
734
public static void assertOpsOnReplica (
708
735
final List <Engine .Operation > ops ,
709
736
final InternalEngine replicaEngine ,
@@ -788,14 +815,7 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
788
815
int docOffset ;
789
816
while ((docOffset = offset .incrementAndGet ()) < ops .size ()) {
790
817
try {
791
- final Engine .Operation op = ops .get (docOffset );
792
- if (op instanceof Engine .Index ) {
793
- engine .index ((Engine .Index ) op );
794
- } else if (op instanceof Engine .Delete ){
795
- engine .delete ((Engine .Delete ) op );
796
- } else {
797
- engine .noOp ((Engine .NoOp ) op );
798
- }
818
+ applyOperation (engine , ops .get (docOffset ));
799
819
if ((docOffset + 1 ) % 4 == 0 ) {
800
820
engine .refresh ("test" );
801
821
}
@@ -814,6 +834,36 @@ public static void concurrentlyApplyOps(List<Engine.Operation> ops, InternalEngi
814
834
}
815
835
}
816
836
837
+ public static void applyOperations (Engine engine , List <Engine .Operation > operations ) throws IOException {
838
+ for (Engine .Operation operation : operations ) {
839
+ applyOperation (engine , operation );
840
+ if (randomInt (100 ) < 10 ) {
841
+ engine .refresh ("test" );
842
+ }
843
+ if (rarely ()) {
844
+ engine .flush ();
845
+ }
846
+ }
847
+ }
848
+
849
+ public static Engine .Result applyOperation (Engine engine , Engine .Operation operation ) throws IOException {
850
+ final Engine .Result result ;
851
+ switch (operation .operationType ()) {
852
+ case INDEX :
853
+ result = engine .index ((Engine .Index ) operation );
854
+ break ;
855
+ case DELETE :
856
+ result = engine .delete ((Engine .Delete ) operation );
857
+ break ;
858
+ case NO_OP :
859
+ result = engine .noOp ((Engine .NoOp ) operation );
860
+ break ;
861
+ default :
862
+ throw new IllegalStateException ("No operation defined for [" + operation + "]" );
863
+ }
864
+ return result ;
865
+ }
866
+
817
867
/**
818
868
* Gets a collection of tuples of docId, sequence number, and primary term of all live documents in the provided engine.
819
869
*/
0 commit comments