@@ -848,41 +848,42 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
848848 final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
849849 new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout );
850850
851- for (int i = 0 ; i < updateList .size (); i ++) {
852- final CollectionPipedUpdate <T > update = updateList .get (i );
853- final int idx = i ;
854-
855- Operation op = opFact .collectionPipedUpdate (key , update ,
856- new CollectionPipedUpdateOperation .Callback () {
857- // each result status
858- public void receivedStatus (OperationStatus status ) {
859- CollectionOperationStatus cstatus ;
860-
861- if (status instanceof CollectionOperationStatus ) {
862- cstatus = (CollectionOperationStatus ) status ;
863- } else {
864- getLogger ().warn ("Unhandled state: " + status );
865- cstatus = new CollectionOperationStatus (status );
866- }
867- rv .setOperationStatus (cstatus );
868- }
851+ CollectionPipedUpdateOperation .Callback callback = new CollectionPipedUpdateOperation .Callback () {
852+ private int opIdx = 0 ;
869853
870- // complete
871- public void complete () {
872- latch .countDown ();
873- }
854+ @ Override
855+ public void receivedStatus (OperationStatus status ) {
856+ CollectionOperationStatus cstatus ;
874857
875- // got status
876- public void gotStatus (Integer index , OperationStatus status ) {
877- if (status instanceof CollectionOperationStatus ) {
878- rv .addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
879- (CollectionOperationStatus ) status );
880- } else {
881- rv .addEachResult (index + (idx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
882- new CollectionOperationStatus (status ));
883- }
884- }
885- });
858+ if (status instanceof CollectionOperationStatus ) {
859+ cstatus = (CollectionOperationStatus ) status ;
860+ } else {
861+ getLogger ().warn ("Unhandled state: " + status );
862+ cstatus = new CollectionOperationStatus (status );
863+ }
864+ rv .setOperationStatus (cstatus );
865+ }
866+
867+ @ Override
868+ public void complete () {
869+ opIdx += 1 ;
870+ latch .countDown ();
871+ }
872+
873+ @ Override
874+ public void gotStatus (Integer index , OperationStatus status ) {
875+ if (status instanceof CollectionOperationStatus ) {
876+ rv .addEachResult (index + (opIdx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
877+ (CollectionOperationStatus ) status );
878+ } else {
879+ rv .addEachResult (index + (opIdx * CollectionPipedUpdate .MAX_PIPED_ITEM_COUNT ),
880+ new CollectionOperationStatus (status ));
881+ }
882+ }
883+ };
884+
885+ for (CollectionPipedUpdate <T > pipedUpdate : updateList ) {
886+ Operation op = opFact .collectionPipedUpdate (key , pipedUpdate , callback );
886887 rv .addOperation (op );
887888 addOp (key , op );
888889 }
@@ -3293,41 +3294,42 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
32933294 final PipedCollectionFuture <Integer , CollectionOperationStatus > rv =
32943295 new PipedCollectionFuture <Integer , CollectionOperationStatus >(latch , operationTimeout );
32953296
3296- for (int i = 0 ; i < insertList .size (); i ++) {
3297- final CollectionPipedInsert <T > insert = insertList .get (i );
3298- final int idx = i ;
3297+ CollectionPipedInsertOperation .Callback callback = new CollectionPipedInsertOperation .Callback () {
3298+ private int opIdx = 0 ;
32993299
3300- Operation op = opFact .collectionPipedInsert (key , insert ,
3301- new CollectionPipedInsertOperation .Callback () {
3302- // each result status
3303- public void receivedStatus (OperationStatus status ) {
3304- CollectionOperationStatus cstatus ;
3300+ @ Override
3301+ public void receivedStatus (OperationStatus status ) {
3302+ CollectionOperationStatus cstatus ;
33053303
3306- if (status instanceof CollectionOperationStatus ) {
3307- cstatus = (CollectionOperationStatus ) status ;
3308- } else {
3309- getLogger ().warn ("Unhandled state: " + status );
3310- cstatus = new CollectionOperationStatus (status );
3311- }
3312- rv .setOperationStatus (cstatus );
3313- }
3304+ if (status instanceof CollectionOperationStatus ) {
3305+ cstatus = (CollectionOperationStatus ) status ;
3306+ } else {
3307+ getLogger ().warn ("Unhandled state: " + status );
3308+ cstatus = new CollectionOperationStatus (status );
3309+ }
3310+ rv .setOperationStatus (cstatus );
3311+ }
33143312
3315- // complete
3316- public void complete () {
3317- latch .countDown ();
3318- }
3313+ @ Override
3314+ public void complete () {
3315+ opIdx += 1 ;
3316+ latch .countDown ();
3317+ }
33193318
3320- // got status
3321- public void gotStatus (Integer index , OperationStatus status ) {
3322- if (status instanceof CollectionOperationStatus ) {
3323- rv .addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3324- (CollectionOperationStatus ) status );
3325- } else {
3326- rv .addEachResult (index + (idx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3327- new CollectionOperationStatus (status ));
3328- }
3329- }
3330- });
3319+ @ Override
3320+ public void gotStatus (Integer index , OperationStatus status ) {
3321+ if (status instanceof CollectionOperationStatus ) {
3322+ rv .addEachResult (index + (opIdx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3323+ (CollectionOperationStatus ) status );
3324+ } else {
3325+ rv .addEachResult (index + (opIdx * CollectionPipedInsert .MAX_PIPED_ITEM_COUNT ),
3326+ new CollectionOperationStatus (status ));
3327+ }
3328+ }
3329+ };
3330+
3331+ for (CollectionPipedInsert <T > pipedInsert : insertList ) {
3332+ Operation op = opFact .collectionPipedInsert (key , pipedInsert , callback );
33313333 rv .addOperation (op );
33323334 addOp (key , op );
33333335 }
0 commit comments