Skip to content

Commit 4833d79

Browse files
committed
ENHANCE: Change creating callback objects multiple times in pipe operation.
1 parent e912341 commit 4833d79

File tree

9 files changed

+92
-85
lines changed

9 files changed

+92
-85
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 62 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -849,41 +849,39 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
849849
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
850850
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);
851851

852-
for (int i = 0; i < updateList.size(); i++) {
853-
final CollectionPipedUpdate<T> update = updateList.get(i);
854-
final int idx = i;
855-
856-
Operation op = opFact.collectionPipedUpdate(key, update,
857-
new CollectionPipedUpdateOperation.Callback() {
858-
// each result status
859-
public void receivedStatus(OperationStatus status) {
860-
CollectionOperationStatus cstatus;
861-
862-
if (status instanceof CollectionOperationStatus) {
863-
cstatus = (CollectionOperationStatus) status;
864-
} else {
865-
getLogger().warn("Unhandled state: " + status);
866-
cstatus = new CollectionOperationStatus(status);
867-
}
868-
rv.setOperationStatus(cstatus);
869-
}
852+
CollectionPipedUpdateOperation.Callback callback = new CollectionPipedUpdateOperation.Callback() {
853+
@Override// each result status
854+
public void receivedStatus(OperationStatus status) {
855+
CollectionOperationStatus cstatus;
870856

871-
// complete
872-
public void complete() {
873-
latch.countDown();
874-
}
857+
if (status instanceof CollectionOperationStatus) {
858+
cstatus = (CollectionOperationStatus) status;
859+
} else {
860+
getLogger().warn("Unhandled state: " + status);
861+
cstatus = new CollectionOperationStatus(status);
862+
}
863+
rv.setOperationStatus(cstatus);
864+
}
875865

876-
// got status
877-
public void gotStatus(Integer index, OperationStatus status) {
878-
if (status instanceof CollectionOperationStatus) {
879-
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
880-
(CollectionOperationStatus) status);
881-
} else {
882-
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
883-
new CollectionOperationStatus(status));
884-
}
885-
}
886-
});
866+
@Override
867+
public void complete() {
868+
latch.countDown();
869+
}
870+
871+
@Override
872+
public void gotStatus(Integer index, Integer opIdx, OperationStatus status) {
873+
if (status instanceof CollectionOperationStatus) {
874+
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
875+
(CollectionOperationStatus) status);
876+
} else {
877+
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
878+
new CollectionOperationStatus(status));
879+
}
880+
}
881+
};
882+
883+
for (int i = 0; i < updateList.size(); i++) {
884+
Operation op = opFact.collectionPipedUpdate(key, updateList.get(i), i, callback);
887885
rv.addOperation(op);
888886
addOp(key, op);
889887
}
@@ -3291,41 +3289,39 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
32913289
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
32923290
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);
32933291

3294-
for (int i = 0; i < insertList.size(); i++) {
3295-
final CollectionPipedInsert<T> insert = insertList.get(i);
3296-
final int idx = i;
3297-
3298-
Operation op = opFact.collectionPipedInsert(key, insert,
3299-
new CollectionPipedInsertOperation.Callback() {
3300-
// each result status
3301-
public void receivedStatus(OperationStatus status) {
3302-
CollectionOperationStatus cstatus;
3303-
3304-
if (status instanceof CollectionOperationStatus) {
3305-
cstatus = (CollectionOperationStatus) status;
3306-
} else {
3307-
getLogger().warn("Unhandled state: " + status);
3308-
cstatus = new CollectionOperationStatus(status);
3309-
}
3310-
rv.setOperationStatus(cstatus);
3311-
}
3292+
CollectionPipedInsertOperation.Callback callback = new CollectionPipedInsertOperation.Callback() {
3293+
@Override // each result status
3294+
public void receivedStatus(OperationStatus status) {
3295+
CollectionOperationStatus cstatus;
33123296

3313-
// complete
3314-
public void complete() {
3315-
latch.countDown();
3316-
}
3297+
if (status instanceof CollectionOperationStatus) {
3298+
cstatus = (CollectionOperationStatus) status;
3299+
} else {
3300+
getLogger().warn("Unhandled state: " + status);
3301+
cstatus = new CollectionOperationStatus(status);
3302+
}
3303+
rv.setOperationStatus(cstatus);
3304+
}
33173305

3318-
// got status
3319-
public void gotStatus(Integer index, OperationStatus status) {
3320-
if (status instanceof CollectionOperationStatus) {
3321-
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3322-
(CollectionOperationStatus) status);
3323-
} else {
3324-
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3325-
new CollectionOperationStatus(status));
3326-
}
3327-
}
3328-
});
3306+
@Override
3307+
public void complete() {
3308+
latch.countDown();
3309+
}
3310+
3311+
@Override
3312+
public void gotStatus(Integer index, Integer opIdx, OperationStatus status) {
3313+
if (status instanceof CollectionOperationStatus) {
3314+
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3315+
(CollectionOperationStatus) status);
3316+
} else {
3317+
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3318+
new CollectionOperationStatus(status));
3319+
}
3320+
}
3321+
};
3322+
3323+
for (int i = 0; i < insertList.size(); i++) {
3324+
Operation op = opFact.collectionPipedInsert(key, insertList.get(i), i, callback);
33293325
rv.addOperation(op);
33303326
addOp(key, op);
33313327
}

src/main/java/net/spy/memcached/OperationFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ CollectionInsertOperation collectionInsert(String key, String subkey,
301301
*/
302302
CollectionPipedInsertOperation collectionPipedInsert(String key,
303303
CollectionPipedInsert<?> insert,
304+
Integer opIdx,
304305
OperationCallback cb);
305306

306307
/**
@@ -442,6 +443,7 @@ CollectionUpdateOperation collectionUpdate(String key, String subkey,
442443
*/
443444
CollectionPipedUpdateOperation collectionPipedUpdate(String key,
444445
CollectionPipedUpdate<?> update,
446+
Integer opIdx,
445447
OperationCallback cb);
446448

447449
/**

src/main/java/net/spy/memcached/ops/CollectionPipedInsertOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public interface CollectionPipedInsertOperation extends KeyedOperation {
2626
CollectionPipedInsert<?> getInsert();
2727

2828
interface Callback extends OperationCallback {
29-
void gotStatus(Integer index, OperationStatus status);
29+
void gotStatus(Integer index, Integer opIdx, OperationStatus status);
3030
}
3131

3232
}

src/main/java/net/spy/memcached/ops/CollectionPipedUpdateOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public interface CollectionPipedUpdateOperation extends KeyedOperation {
2626
CollectionPipedUpdate<?> getUpdate();
2727

2828
interface Callback extends OperationCallback {
29-
void gotStatus(Integer index, OperationStatus status);
29+
void gotStatus(Integer index, Integer opIdx, OperationStatus status);
3030
}
3131

3232
}

src/main/java/net/spy/memcached/protocol/ascii/AsciiOperationFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,9 @@ public CollectionInsertOperation collectionInsert(String key, String subkey,
185185

186186
public CollectionPipedInsertOperation collectionPipedInsert(String key,
187187
CollectionPipedInsert<?> insert,
188+
Integer opIdx,
188189
OperationCallback cb) {
189-
return new CollectionPipedInsertOperationImpl(key, insert, cb);
190+
return new CollectionPipedInsertOperationImpl(key, insert, opIdx, cb);
190191
}
191192

192193
public CollectionGetOperation collectionGet(String key,
@@ -246,8 +247,9 @@ public CollectionUpdateOperation collectionUpdate(String key,
246247
@Override
247248
public CollectionPipedUpdateOperation collectionPipedUpdate(String key,
248249
CollectionPipedUpdate<?> update,
250+
Integer opIdx,
249251
OperationCallback cb) {
250-
return new CollectionPipedUpdateOperationImpl(key, update, cb);
252+
return new CollectionPipedUpdateOperationImpl(key, update, opIdx, cb);
251253
}
252254

253255
@Override

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedInsertOperationImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,18 @@ public class CollectionPipedInsertOperationImpl extends OperationImpl
6565
protected final String key;
6666
protected final CollectionPipedInsert<?> insert;
6767
protected final CollectionPipedInsertOperation.Callback cb;
68+
private final Integer opIdx;
6869

6970
protected int count;
7071
protected int index = 0;
7172
protected boolean successAll = true;
7273

73-
public CollectionPipedInsertOperationImpl(String key,
74-
CollectionPipedInsert<?> insert, OperationCallback cb) {
74+
public CollectionPipedInsertOperationImpl(String key, CollectionPipedInsert<?> insert,
75+
Integer opIdx, OperationCallback cb) {
7576
super(cb);
7677
this.key = key;
7778
this.insert = insert;
79+
this.opIdx = opIdx;
7880
this.cb = (Callback) cb;
7981
if (this.insert instanceof CollectionPipedInsert.ListPipedInsert) {
8082
setAPIType(APIType.LOP_INSERT);
@@ -122,7 +124,7 @@ assert getState() == OperationState.READING
122124
if (status.isSuccess()) {
123125
cb.receivedStatus((successAll) ? END : FAILED_END);
124126
} else {
125-
cb.gotStatus(index, status);
127+
cb.gotStatus(index, opIdx, status);
126128
cb.receivedStatus(FAILED_END);
127129
}
128130
transitionState(OperationState.COMPLETE);
@@ -161,7 +163,7 @@ assert getState() == OperationState.READING
161163
TYPE_MISMATCH, BKEY_MISMATCH);
162164

163165
if (!status.isSuccess()) {
164-
cb.gotStatus(index, status);
166+
cb.gotStatus(index, opIdx, status);
165167
successAll = false;
166168
}
167169
index++;

src/main/java/net/spy/memcached/protocol/ascii/CollectionPipedUpdateOperationImpl.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,18 @@ public class CollectionPipedUpdateOperationImpl extends OperationImpl implements
6868
protected final CollectionPipedUpdate<?> update;
6969
protected final CollectionPipedUpdateOperation.Callback cb;
7070

71+
private final Integer opIdx;
72+
7173
protected int count;
7274
protected int index = 0;
7375
protected boolean successAll = true;
7476

75-
public CollectionPipedUpdateOperationImpl(String key,
76-
CollectionPipedUpdate<?> update, OperationCallback cb) {
77+
public CollectionPipedUpdateOperationImpl(String key, CollectionPipedUpdate<?> update,
78+
Integer opIdx, OperationCallback cb) {
7779
super(cb);
7880
this.key = key;
7981
this.update = update;
82+
this.opIdx = opIdx;
8083
this.cb = (Callback) cb;
8184
if (this.update instanceof BTreePipedUpdate) {
8285
setAPIType(APIType.BOP_UPDATE);
@@ -118,7 +121,7 @@ assert getState() == OperationState.READING : "Read ``" + line
118121
if (status.isSuccess()) {
119122
cb.receivedStatus((successAll) ? END : FAILED_END);
120123
} else {
121-
cb.gotStatus(index, status);
124+
cb.gotStatus(index, opIdx, status);
122125
cb.receivedStatus(FAILED_END);
123126
}
124127
transitionState(OperationState.COMPLETE);
@@ -157,7 +160,7 @@ assert getState() == OperationState.READING : "Read ``" + line
157160
BKEY_MISMATCH, EFLAG_MISMATCH, SERVER_ERROR);
158161

159162
if (!status.isSuccess()) {
160-
cb.gotStatus(index, status);
163+
cb.gotStatus(index, opIdx, status);
161164
successAll = false;
162165
}
163166
index++;

src/main/java/net/spy/memcached/protocol/binary/BinaryOperationFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ public CollectionInsertOperation collectionInsert(String key, byte[] subkey,
206206

207207
public CollectionPipedInsertOperation collectionPipedInsert(String key,
208208
CollectionPipedInsert<?> insert,
209+
Integer opIdx,
209210
OperationCallback cb) {
210211
throw new RuntimeException(
211212
"CollectionPipedInsertOperation is not supported in binary protocol yet.");
@@ -278,6 +279,7 @@ public CollectionUpdateOperation collectionUpdate(String key,
278279
@Override
279280
public CollectionPipedUpdateOperation collectionPipedUpdate(String key,
280281
CollectionPipedUpdate<?> update,
282+
Integer opIdx,
281283
OperationCallback cb) {
282284
throw new RuntimeException(
283285
"CollectionPipedUpdateOperation is not supported in binary protocol yet.");

src/test/manual/net/spy/memcached/MultibyteKeyTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ public void CollectionPipedInsertOperationImplTest() {
348348
CollectionPipedInsertOperation.Callback cpsCallback =
349349
new CollectionPipedInsertOperation.Callback() {
350350
@Override
351-
public void gotStatus(Integer index, OperationStatus status) {
351+
public void gotStatus(Integer index, Integer opIdx, OperationStatus status) {
352352
}
353353

354354
@Override
@@ -368,7 +368,7 @@ public void complete() {
368368
new CollectionPipedInsert.ByteArraysBTreePipedInsert<Integer>(
369369
MULTIBYTE_KEY, elements, new CollectionAttributes(), new IntegerTranscoder());
370370
try {
371-
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, cpsCallback).initialize();
371+
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, 0, cpsCallback).initialize();
372372
} catch (java.nio.BufferOverflowException e) {
373373
Assert.fail();
374374
}
@@ -380,7 +380,7 @@ public void complete() {
380380
insert = new CollectionPipedInsert.BTreePipedInsert<Integer>(
381381
MULTIBYTE_KEY, elementsMap, new CollectionAttributes(), new IntegerTranscoder());
382382
try {
383-
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, cpsCallback).initialize();
383+
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, 0, cpsCallback).initialize();
384384
} catch (java.nio.BufferOverflowException e) {
385385
Assert.fail();
386386
}
@@ -393,7 +393,7 @@ public void complete() {
393393
MULTIBYTE_KEY, 0, elementsList, new CollectionAttributes(),
394394
new IntegerTranscoder());
395395
try {
396-
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, cpsCallback).initialize();
396+
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, 0, cpsCallback).initialize();
397397
} catch (java.nio.BufferOverflowException e) {
398398
Assert.fail();
399399
}
@@ -405,7 +405,7 @@ public void complete() {
405405
insert = new CollectionPipedInsert.SetPipedInsert<Integer>(
406406
MULTIBYTE_KEY, elementsSet, new CollectionAttributes(), new IntegerTranscoder());
407407
try {
408-
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, cpsCallback).initialize();
408+
opFact.collectionPipedInsert(MULTIBYTE_KEY, insert, 0, cpsCallback).initialize();
409409
} catch (java.nio.BufferOverflowException e) {
410410
Assert.fail();
411411
}
@@ -598,10 +598,10 @@ public void CollectionPipedUpdateOperationImplTest() {
598598
try {
599599
opFact.collectionPipedUpdate(MULTIBYTE_KEY,
600600
new CollectionPipedUpdate.BTreePipedUpdate<Integer>(
601-
MULTIBYTE_KEY, elementsList, new IntegerTranscoder()),
601+
MULTIBYTE_KEY, elementsList, new IntegerTranscoder()),0,
602602
new CollectionPipedUpdateOperation.Callback() {
603603
@Override
604-
public void gotStatus(Integer index, OperationStatus status) {
604+
public void gotStatus(Integer index, Integer opIdx, OperationStatus status) {
605605
}
606606

607607
@Override

0 commit comments

Comments
 (0)