@@ -849,41 +849,41 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
849849 final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
850850 new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout );
851851
852- for (int i = 0 ; i < updateList .size (); i ++) {
853- final CollectionPipedUpdate <T > update = updateList .get (i );
854- final int idx = i ;
855-
856- Operation op = opFact .collectionPipedUpdate (key , update ,
857- new CollectionPipedUpdateOperation .Callback () {
858- // each result status
859- public void receivedStatus (OperationStatus status ) {
860- CollectionOperationStatus cstatus ;
861-
862- if (status instanceof CollectionOperationStatus ) {
863- cstatus = (CollectionOperationStatus ) status ;
864- } else {
865- getLogger ().warn ("Unhandled state: " + status );
866- cstatus = new CollectionOperationStatus (status );
867- }
868- rv .setOperationStatus (cstatus );
869- }
852+ CollectionPipedUpdateOperation .Callback callback = new CollectionPipedUpdateOperation .Callback () {
853+ private int opIndex = 0 ;
854+ @ Override
855+ public void receivedStatus (OperationStatus status ) {
856+ opIndex += 1 ;
857+ CollectionOperationStatus cstatus ;
870858
871- // complete
872- public void complete () {
873- latch .countDown ();
874- }
859+ if (status instanceof CollectionOperationStatus ) {
860+ cstatus = (CollectionOperationStatus ) status ;
861+ } else {
862+ getLogger ().warn ("Unhandled state: " + status );
863+ cstatus = new CollectionOperationStatus (status );
864+ }
865+ rv .setOperationStatus (cstatus );
866+ }
875867
876- // got status
877- public void gotStatus (Integer index , OperationStatus status ) {
878- if (status instanceof CollectionOperationStatus ) {
879- rv .addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
880- (CollectionOperationStatus ) status );
881- } else {
882- rv .addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
883- new CollectionOperationStatus (status ));
884- }
885- }
886- });
868+ @ Override
869+ public void complete () {
870+ latch .countDown ();
871+ }
872+
873+ @ Override
874+ public void gotStatus (Integer index , OperationStatus status ) {
875+ if (status instanceof CollectionOperationStatus ) {
876+ rv .addEachResult (index + (opIndex * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
877+ (CollectionOperationStatus ) status );
878+ } else {
879+ rv .addEachResult (index + (opIndex * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
880+ new CollectionOperationStatus (status ));
881+ }
882+ }
883+ };
884+
885+ for (CollectionPipedUpdate <T > pipedUpdate : updateList ) {
886+ Operation op = opFact .collectionPipedUpdate (key , pipedUpdate , callback );
887887 rv .addOperation (op );
888888 addOp (key , op );
889889 }
@@ -3291,41 +3291,41 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
32913291 final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
32923292 new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout );
32933293
3294- for (int i = 0 ; i < insertList .size (); i ++) {
3295- final CollectionPipedInsert <T > insert = insertList .get (i );
3296- final int idx = i ;
3294+ CollectionPipedInsertOperation .Callback callback = new CollectionPipedInsertOperation .Callback () {
3295+ private int opIdx = 0 ;
3296+ @ Override
3297+ public void receivedStatus (OperationStatus status ) {
3298+ opIdx += 1 ;
3299+ CollectionOperationStatus cstatus ;
32973300
3298- Operation op = opFact .collectionPipedInsert (key , insert ,
3299- new CollectionPipedInsertOperation .Callback () {
3300- // each result status
3301- public void receivedStatus (OperationStatus status ) {
3302- CollectionOperationStatus cstatus ;
3301+ if (status instanceof CollectionOperationStatus ) {
3302+ cstatus = (CollectionOperationStatus ) status ;
3303+ } else {
3304+ getLogger ().warn ("Unhandled state: " + status );
3305+ cstatus = new CollectionOperationStatus (status );
3306+ }
3307+ rv .setOperationStatus (cstatus );
3308+ }
33033309
3304- if (status instanceof CollectionOperationStatus ) {
3305- cstatus = (CollectionOperationStatus ) status ;
3306- } else {
3307- getLogger ().warn ("Unhandled state: " + status );
3308- cstatus = new CollectionOperationStatus (status );
3309- }
3310- rv .setOperationStatus (cstatus );
3311- }
3310+ @ Override
3311+ public void complete () {
3312+ latch .countDown ();
3313+ }
33123314
3313- // complete
3314- public void complete () {
3315- latch .countDown ();
3316- }
3315+ @ Override
3316+ public void gotStatus (Integer index , OperationStatus status ) {
3317+ if (status instanceof CollectionOperationStatus ) {
3318+ rv .addEachResult (index + (opIdx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3319+ (CollectionOperationStatus ) status );
3320+ } else {
3321+ rv .addEachResult (index + (opIdx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3322+ new CollectionOperationStatus (status ));
3323+ }
3324+ }
3325+ };
33173326
3318- // got status
3319- public void gotStatus (Integer index , OperationStatus status ) {
3320- if (status instanceof CollectionOperationStatus ) {
3321- rv .addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3322- (CollectionOperationStatus ) status );
3323- } else {
3324- rv .addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3325- new CollectionOperationStatus (status ));
3326- }
3327- }
3328- });
3327+ for (CollectionPipedInsert <T > pipedInsert : insertList ) {
3328+ Operation op = opFact .collectionPipedInsert (key , pipedInsert , callback );
33293329 rv .addOperation (op );
33303330 addOp (key , op );
33313331 }
0 commit comments