Skip to content

Commit eb87cef

Browse files
committed
ENHANCE: Change creating callback objects multiple times in pipe operation.
1 parent ae2a82f commit eb87cef

File tree

8 files changed

+133
-150
lines changed

8 files changed

+133
-150
lines changed

src/main/java/net/spy/memcached/ArcusClient.java

Lines changed: 90 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -815,41 +815,40 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
815815
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
816816
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);
817817

818-
for (int i = 0; i < updateList.size(); i++) {
819-
final CollectionPipedUpdate<T> update = updateList.get(i);
820-
final int idx = i;
821-
822-
Operation op = opFact.collectionPipedUpdate(key, update,
823-
new CollectionPipedUpdateOperation.Callback() {
824-
// each result status
825-
public void receivedStatus(OperationStatus status) {
826-
CollectionOperationStatus cstatus;
827-
828-
if (status instanceof CollectionOperationStatus) {
829-
cstatus = (CollectionOperationStatus) status;
830-
} else {
831-
getLogger().warn("Unhandled state: " + status);
832-
cstatus = new CollectionOperationStatus(status);
833-
}
834-
rv.setOperationStatus(cstatus);
835-
}
818+
CollectionPipedUpdateOperation.Callback callback = new CollectionPipedUpdateOperation.Callback() {
836819

837-
// complete
838-
public void complete() {
839-
latch.countDown();
840-
}
820+
@Override
821+
public void receivedStatus(OperationStatus status) {
822+
CollectionOperationStatus cstatus;
841823

842-
// got status
843-
public void gotStatus(Integer index, OperationStatus status) {
844-
if (status instanceof CollectionOperationStatus) {
845-
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
846-
(CollectionOperationStatus) status);
847-
} else {
848-
rv.addEachResult(index + (idx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
849-
new CollectionOperationStatus(status));
850-
}
851-
}
852-
});
824+
if (status instanceof CollectionOperationStatus) {
825+
cstatus = (CollectionOperationStatus) status;
826+
} else {
827+
getLogger().warn("Unhandled state: " + status);
828+
cstatus = new CollectionOperationStatus(status);
829+
}
830+
rv.setOperationStatus(cstatus);
831+
}
832+
833+
@Override
834+
public void complete() {
835+
latch.countDown();
836+
}
837+
838+
@Override
839+
public void gotStatus(Integer index, int opIdx, OperationStatus status) {
840+
if (status instanceof CollectionOperationStatus) {
841+
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
842+
(CollectionOperationStatus) status);
843+
} else {
844+
rv.addEachResult(index + (opIdx * CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT),
845+
new CollectionOperationStatus(status));
846+
}
847+
}
848+
};
849+
850+
for (CollectionPipedUpdate<T> pipedUpdate : updateList) {
851+
Operation op = opFact.collectionPipedUpdate(key, pipedUpdate, callback);
853852
rv.addOperation(op);
854853
addOp(key, op);
855854
}
@@ -1778,15 +1777,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
17781777
}
17791778

17801779
List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
1780+
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(elements,
1781+
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
17811782

1782-
if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
1783-
insertList.add(new BTreePipedInsert<T>(key, elements, attributesForCreate, tc));
1784-
} else {
1785-
PartitionedMap<Long, T> list = new PartitionedMap<Long, T>(
1786-
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1787-
for (Map<Long, T> elementMap : list) {
1788-
insertList.add(new BTreePipedInsert<T>(key, elementMap, attributesForCreate, tc));
1789-
}
1783+
for (int i = 0; i < list.size(); i++) {
1784+
insertList.add(new BTreePipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
17901785
}
17911786
return asyncCollectionPipedInsert(key, insertList);
17921787
}
@@ -1802,15 +1797,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
18021797
}
18031798

18041799
List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
1800+
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(elements,
1801+
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
18051802

1806-
if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
1807-
insertList.add(new ByteArraysBTreePipedInsert<T>(key, elements, attributesForCreate, tc));
1808-
} else {
1809-
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
1810-
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1811-
for (List<Element<T>> elementList : list) {
1812-
insertList.add(new ByteArraysBTreePipedInsert<T>(key, elementList, attributesForCreate, tc));
1813-
}
1803+
for (int i = 0; i < list.size(); i++) {
1804+
insertList.add(new ByteArraysBTreePipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
18141805
}
18151806
return asyncCollectionPipedInsert(key, insertList);
18161807
}
@@ -1830,15 +1821,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
18301821
}
18311822

18321823
List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
1824+
PartitionedMap<String, T> list = new PartitionedMap<String, T>(elements,
1825+
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
18331826

1834-
if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
1835-
insertList.add(new MapPipedInsert<T>(key, elements, attributesForCreate, tc));
1836-
} else {
1837-
PartitionedMap<String, T> list = new PartitionedMap<String, T>(
1838-
elements, CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1839-
for (Map<String, T> elementMap : list) {
1840-
insertList.add(new MapPipedInsert<T>(key, elementMap, attributesForCreate, tc));
1841-
}
1827+
for (int i = 0 ; i < list.size(); i++) {
1828+
insertList.add(new MapPipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
18421829
}
18431830
return asyncCollectionPipedInsert(key, insertList);
18441831
}
@@ -1854,15 +1841,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncLopPip
18541841
}
18551842

18561843
List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
1844+
PartitionedList<T> list = new PartitionedList<T>(valueList,
1845+
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
18571846

1858-
if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
1859-
insertList.add(new ListPipedInsert<T>(key, index, valueList, attributesForCreate, tc));
1860-
} else {
1861-
PartitionedList<T> list = new PartitionedList<T>(valueList,
1862-
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1863-
for (List<T> elementList : list) {
1864-
insertList.add(new ListPipedInsert<T>(key, index, elementList, attributesForCreate, tc));
1865-
}
1847+
for (int i = 0 ; i < list.size(); i++) {
1848+
insertList.add(new ListPipedInsert<T>(key, index, list.get(i), i, attributesForCreate, tc));
18661849
}
18671850
return asyncCollectionPipedInsert(key, insertList);
18681851
}
@@ -1878,15 +1861,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncSopPip
18781861
}
18791862

18801863
List<CollectionPipedInsert<T>> insertList = new ArrayList<CollectionPipedInsert<T>>();
1864+
PartitionedList<T> list = new PartitionedList<T>(valueList,
1865+
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
18811866

1882-
if (valueList.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
1883-
insertList.add(new SetPipedInsert<T>(key, valueList, attributesForCreate, tc));
1884-
} else {
1885-
PartitionedList<T> list = new PartitionedList<T>(valueList,
1886-
CollectionPipedInsert.MAX_PIPED_ITEM_COUNT);
1887-
for (List<T> elementList : list) {
1888-
insertList.add(new SetPipedInsert<T>(key, elementList, attributesForCreate, tc));
1889-
}
1867+
for (int i = 0 ; i < list.size(); i++) {
1868+
insertList.add(new SetPipedInsert<T>(key, list.get(i), i, attributesForCreate, tc));
18901869
}
18911870
return asyncCollectionPipedInsert(key, insertList);
18921871
}
@@ -2326,15 +2305,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncBopPip
23262305
}
23272306

23282307
List<CollectionPipedUpdate<T>> updateList = new ArrayList<CollectionPipedUpdate<T>>();
2308+
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(elements,
2309+
CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);
23292310

2330-
if (elements.size() <= CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT) {
2331-
updateList.add(new BTreePipedUpdate<T>(key, elements, tc));
2332-
} else {
2333-
PartitionedList<Element<T>> list = new PartitionedList<Element<T>>(
2334-
elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);
2335-
for (List<Element<T>> elementList : list) {
2336-
updateList.add(new BTreePipedUpdate<T>(key, elementList, tc));
2337-
}
2311+
for (int i = 0; i < list.size(); i++) {
2312+
updateList.add(new BTreePipedUpdate<T>(key, list.get(i), i, tc));
23382313
}
23392314
return asyncCollectionPipedUpdate(key, updateList);
23402315
}
@@ -2359,16 +2334,11 @@ public <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncMopPip
23592334
}
23602335

23612336
List<CollectionPipedUpdate<T>> updateList = new ArrayList<CollectionPipedUpdate<T>>();
2337+
PartitionedMap<String, T> list = new PartitionedMap<String, T>(elements,
2338+
CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);
23622339

2363-
if (elements.size() <= CollectionPipedInsert.MAX_PIPED_ITEM_COUNT) {
2364-
updateList.add(new MapPipedUpdate<T>(key, elements, tc));
2365-
} else {
2366-
PartitionedMap<String, T> list = new PartitionedMap<String, T>(
2367-
elements, CollectionPipedUpdate.MAX_PIPED_ITEM_COUNT);
2368-
2369-
for (Map<String, T> elementMap : list) {
2370-
updateList.add(new MapPipedUpdate<T>(key, elementMap, tc));
2371-
}
2340+
for (int i = 0 ; i < list.size(); i++) {
2341+
updateList.add(new MapPipedUpdate<T>(key, list.get(i), i, tc));
23722342
}
23732343
return asyncCollectionPipedUpdate(key, updateList);
23742344
}
@@ -3199,41 +3169,40 @@ <T> CollectionFuture<Map<Integer, CollectionOperationStatus>> asyncCollectionPip
31993169
final PipedCollectionFuture<Integer, CollectionOperationStatus> rv =
32003170
new PipedCollectionFuture<Integer, CollectionOperationStatus>(latch, operationTimeout);
32013171

3202-
for (int i = 0; i < insertList.size(); i++) {
3203-
final CollectionPipedInsert<T> insert = insertList.get(i);
3204-
final int idx = i;
3172+
CollectionPipedInsertOperation.Callback callback = new CollectionPipedInsertOperation.Callback() {
32053173

3206-
Operation op = opFact.collectionPipedInsert(key, insert,
3207-
new CollectionPipedInsertOperation.Callback() {
3208-
// each result status
3209-
public void receivedStatus(OperationStatus status) {
3210-
CollectionOperationStatus cstatus;
3174+
@Override
3175+
public void receivedStatus(OperationStatus status) {
3176+
CollectionOperationStatus cstatus;
32113177

3212-
if (status instanceof CollectionOperationStatus) {
3213-
cstatus = (CollectionOperationStatus) status;
3214-
} else {
3215-
getLogger().warn("Unhandled state: " + status);
3216-
cstatus = new CollectionOperationStatus(status);
3217-
}
3218-
rv.setOperationStatus(cstatus);
3219-
}
3178+
if (status instanceof CollectionOperationStatus) {
3179+
cstatus = (CollectionOperationStatus) status;
3180+
} else {
3181+
getLogger().warn("Unhandled state: " + status);
3182+
cstatus = new CollectionOperationStatus(status);
3183+
}
3184+
rv.setOperationStatus(cstatus);
3185+
}
32203186

3221-
// complete
3222-
public void complete() {
3223-
latch.countDown();
3224-
}
3187+
@Override
3188+
public void complete() {
3189+
latch.countDown();
3190+
}
32253191

3226-
// got status
3227-
public void gotStatus(Integer index, OperationStatus status) {
3228-
if (status instanceof CollectionOperationStatus) {
3229-
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3230-
(CollectionOperationStatus) status);
3231-
} else {
3232-
rv.addEachResult(index + (idx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3233-
new CollectionOperationStatus(status));
3234-
}
3235-
}
3236-
});
3192+
@Override
3193+
public void gotStatus(Integer index, int opIdx, OperationStatus status) {
3194+
if (status instanceof CollectionOperationStatus) {
3195+
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3196+
(CollectionOperationStatus) status);
3197+
} else {
3198+
rv.addEachResult(index + (opIdx * CollectionPipedInsert.MAX_PIPED_ITEM_COUNT),
3199+
new CollectionOperationStatus(status));
3200+
}
3201+
}
3202+
};
3203+
3204+
for (CollectionPipedInsert<T> pipedInsert : insertList) {
3205+
Operation op = opFact.collectionPipedInsert(key, pipedInsert, callback);
32373206
rv.addOperation(op);
32383207
addOp(key, op);
32393208
}

src/main/java/net/spy/memcached/collection/CollectionPipedInsert.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,21 @@ public abstract class CollectionPipedInsert<T> extends CollectionPipe {
3636
protected final CollectionAttributes attribute;
3737
protected final Transcoder<T> tc;
3838

39-
protected CollectionPipedInsert(String key, CollectionAttributes attribute,
39+
private final int opIdx;
40+
41+
protected CollectionPipedInsert(String key, int opIdx, CollectionAttributes attribute,
4042
Transcoder<T> tc, int itemCount) {
4143
super(itemCount);
4244
this.key = key;
45+
this.opIdx = opIdx;
4346
this.attribute = attribute;
4447
this.tc = tc;
4548
}
4649

50+
public int getOpIdx() {
51+
return opIdx;
52+
}
53+
4754
/**
4855
*
4956
*/
@@ -53,9 +60,9 @@ public static class ListPipedInsert<T> extends CollectionPipedInsert<T> {
5360
private final Collection<T> list;
5461
private final int index;
5562

56-
public ListPipedInsert(String key, int index, Collection<T> list,
63+
public ListPipedInsert(String key, int index, Collection<T> list, int opIdx,
5764
CollectionAttributes attr, Transcoder<T> tc) {
58-
super(key, attr, tc, list.size());
65+
super(key, opIdx, attr, tc, list.size());
5966
if (attr != null) { /* item creation option */
6067
CollectionCreate.checkOverflowAction(CollectionType.list, attr.getOverflowAction());
6168
}
@@ -115,9 +122,9 @@ public static class SetPipedInsert<T> extends CollectionPipedInsert<T> {
115122
private static final String COMMAND = "sop insert";
116123
private final Collection<T> set;
117124

118-
public SetPipedInsert(String key, Collection<T> set,
125+
public SetPipedInsert(String key, Collection<T> set, int opIdx,
119126
CollectionAttributes attr, Transcoder<T> tc) {
120-
super(key, attr, tc, set.size());
127+
super(key, opIdx, attr, tc, set.size());
121128
if (attr != null) { /* item creation option */
122129
CollectionCreate.checkOverflowAction(CollectionType.set, attr.getOverflowAction());
123130
}
@@ -172,9 +179,9 @@ public static class BTreePipedInsert<T> extends CollectionPipedInsert<T> {
172179
private static final String COMMAND = "bop insert";
173180
private final Map<Long, T> map;
174181

175-
public BTreePipedInsert(String key, Map<Long, T> map,
182+
public BTreePipedInsert(String key, Map<Long, T> map, int opIdx,
176183
CollectionAttributes attr, Transcoder<T> tc) {
177-
super(key, attr, tc, map.size());
184+
super(key, opIdx, attr, tc, map.size());
178185
if (attr != null) { /* item creation option */
179186
CollectionCreate.checkOverflowAction(CollectionType.btree, attr.getOverflowAction());
180187
}
@@ -234,9 +241,9 @@ public static class ByteArraysBTreePipedInsert<T> extends
234241
private static final String COMMAND = "bop insert";
235242
private final List<Element<T>> elements;
236243

237-
public ByteArraysBTreePipedInsert(String key, List<Element<T>> elements,
244+
public ByteArraysBTreePipedInsert(String key, List<Element<T>> elements, int opIdx,
238245
CollectionAttributes attr, Transcoder<T> tc) {
239-
super(key, attr, tc, elements.size());
246+
super(key, opIdx, attr, tc, elements.size());
240247
if (attr != null) { /* item creation option */
241248
CollectionCreate.checkOverflowAction(CollectionType.btree, attr.getOverflowAction());
242249
}
@@ -296,9 +303,9 @@ public static class MapPipedInsert<T> extends CollectionPipedInsert<T> {
296303
private static final String COMMAND = "mop insert";
297304
private final Map<String, T> map;
298305

299-
public MapPipedInsert(String key, Map<String, T> map,
306+
public MapPipedInsert(String key, Map<String, T> map, int opIdx,
300307
CollectionAttributes attr, Transcoder<T> tc) {
301-
super(key, attr, tc, map.size());
308+
super(key, opIdx, attr, tc, map.size());
302309
if (attr != null) { /* item creation option */
303310
CollectionCreate.checkOverflowAction(CollectionType.map, attr.getOverflowAction());
304311
}

0 commit comments

Comments
 (0)