@@ -1054,6 +1054,125 @@ public static Map<byte[], List<TorcEdge>> batchRead(
1054
1054
return edgeMap ;
1055
1055
}
1056
1056
1057
+ /**
1058
+ * Batch reads in parallel all of the edges for all the given vertices.
1059
+ *
1060
+ * @param rctx RAMCloud transaction in which to perform the operation.
1061
+ * @param rcTableId The table in which the edge list is (to be) stored.
1062
+ * @param keyPrefix List of key prefixes for the edge lists.
1063
+ *
1064
+ * @return List of all the edges contained in the edge lists.
1065
+ */
1066
+ public static Map <byte [], List <TorcSerializedEdge >> batchRead (
1067
+ RAMCloudTransaction rctx ,
1068
+ long rcTableId ,
1069
+ List <byte []> keyPrefixes ) {
1070
+ Map <byte [], LinkedList <RAMCloudTransactionReadOp >> readMap = new HashMap <>();
1071
+ Map <byte [], List <TorcSerializedEdge >> eListMap = new HashMap <>();
1072
+
1073
+ /* Async. read head segments. */
1074
+ for (byte [] kp : keyPrefixes ) {
1075
+ LinkedList <RAMCloudTransactionReadOp > readOpList = new LinkedList <>();
1076
+ byte [] headSegKey = getSegmentKey (kp , 0 );
1077
+ readOpList .addLast (new RAMCloudTransactionReadOp (rctx , rcTableId ,
1078
+ headSegKey , true ));
1079
+ readMap .put (kp , readOpList );
1080
+ }
1081
+
1082
+ /* Process returned head segments and async. read tail segments. */
1083
+ for (int i = 0 ; i < keyPrefixes .size (); i ++) {
1084
+ byte [] kp = keyPrefixes .get (i );
1085
+
1086
+ List <TorcSerializedEdge > eList = new LinkedList <>();
1087
+ eListMap .put (kp , eList );
1088
+
1089
+ LinkedList <RAMCloudTransactionReadOp > readOpList = readMap .get (kp );
1090
+ RAMCloudTransactionReadOp readOp = readOpList .removeFirst ();
1091
+ RAMCloudObject headSegObj ;
1092
+ try {
1093
+ headSegObj = readOp .getValue ();
1094
+ if (headSegObj == null ) {
1095
+ continue ;
1096
+ }
1097
+ } catch (ClientException e ) {
1098
+ throw new RuntimeException (e );
1099
+ } finally {
1100
+ readOp .close ();
1101
+ }
1102
+
1103
+ if (headSegObj == null ) {
1104
+ // Object does not exist.
1105
+ continue ;
1106
+ }
1107
+
1108
+ ByteBuffer headSeg =
1109
+ ByteBuffer .allocate (headSegObj .getValueBytes ().length )
1110
+ .order (ByteOrder .LITTLE_ENDIAN );
1111
+ headSeg .put (headSegObj .getValueBytes ());
1112
+ headSeg .flip ();
1113
+
1114
+ int numTailSegments = headSeg .getInt ();
1115
+
1116
+ while (headSeg .hasRemaining ()) {
1117
+ byte [] neighborIdBytes = new byte [UInt128 .BYTES ];
1118
+ headSeg .get (neighborIdBytes );
1119
+
1120
+ UInt128 neighborId = new UInt128 (neighborIdBytes );
1121
+
1122
+ short propLen = headSeg .getShort ();
1123
+
1124
+ byte [] serializedProperties = new byte [propLen ];
1125
+ headSeg .get (serializedProperties );
1126
+
1127
+ eList .add (new TorcSerializedEdge (serializedProperties , neighborId ));
1128
+ }
1129
+
1130
+ /* Queue up async. reads for tail segments. */
1131
+ for (int j = numTailSegments ; j > 0 ; --j ) {
1132
+ byte [] tailSegKey = getSegmentKey (kp , j );
1133
+ readOpList .addLast (new RAMCloudTransactionReadOp (rctx , rcTableId ,
1134
+ tailSegKey , true ));
1135
+ }
1136
+ }
1137
+
1138
+ /* Process returned tail segments. */
1139
+ for (int i = 0 ; i < keyPrefixes .size (); i ++) {
1140
+ byte [] kp = keyPrefixes .get (i );
1141
+
1142
+ List <TorcSerializedEdge > eList = eListMap .get (kp );
1143
+
1144
+ LinkedList <RAMCloudTransactionReadOp > readOpList = readMap .get (kp );
1145
+
1146
+ while (readOpList .size () > 0 ) {
1147
+ RAMCloudTransactionReadOp readOp = readOpList .removeFirst ();
1148
+ RAMCloudObject tailSegObj = readOp .getValue ();
1149
+ readOp .close ();
1150
+
1151
+ ByteBuffer tailSeg =
1152
+ ByteBuffer .allocate (tailSegObj .getValueBytes ().length )
1153
+ .order (ByteOrder .LITTLE_ENDIAN );
1154
+ tailSeg .put (tailSegObj .getValueBytes ());
1155
+ tailSeg .flip ();
1156
+
1157
+ while (tailSeg .hasRemaining ()) {
1158
+ byte [] neighborIdBytes = new byte [UInt128 .BYTES ];
1159
+ tailSeg .get (neighborIdBytes );
1160
+
1161
+ UInt128 neighborId = new UInt128 (neighborIdBytes );
1162
+
1163
+ short propLen = tailSeg .getShort ();
1164
+
1165
+ byte [] serializedProperties = new byte [propLen ];
1166
+ tailSeg .get (serializedProperties );
1167
+
1168
+ eList .add (new TorcSerializedEdge (serializedProperties , neighborId ));
1169
+ }
1170
+ }
1171
+ }
1172
+
1173
+ return eListMap ;
1174
+ }
1175
+
1057
1176
/* Metadata we want to keep track of for MutliReadObjects. */
1058
1177
private static class MultiReadSpec {
1059
1178
public byte [] keyPrefix ;
@@ -1183,6 +1302,97 @@ public static Map<byte[], List<TorcEdge>> batchRead(
1183
1302
return edgeMap ;
1184
1303
}
1185
1304
1305
+ /**
1306
+ * Batch reads in parallel all of the edges for all the given vertices.
1307
+ * This version performs the operation outside of any transaction context.
1308
+ *
1309
+ * @param client RAMCloud client to use to perform the operation.
1310
+ * @param rcTableId The table in which the edge list is (to be) stored.
1311
+ * @param keyPrefix List of key prefixes for the edge lists.
1312
+ *
1313
+ * @return List of all the TorcEdges contained in the edge lists.
1314
+ */
1315
+ public static Map <byte [], List <TorcSerializedEdge >> batchRead (
1316
+ RAMCloud client ,
1317
+ long rcTableId ,
1318
+ List <byte []> keyPrefixes ) {
1319
+ LinkedList <MultiReadObject > requestQ = new LinkedList <>();
1320
+ LinkedList <MultiReadSpec > specQ = new LinkedList <>();
1321
+ Map <byte [], List <TorcSerializedEdge >> eListMap = new HashMap <>();
1322
+
1323
+ /* Add head segments to queue and prepare edgeMap. */
1324
+ for (int i = 0 ; i < keyPrefixes .size (); i ++) {
1325
+ byte [] headSegKey = getSegmentKey (keyPrefixes .get (i ), 0 );
1326
+ requestQ .addLast (new MultiReadObject (rcTableId , headSegKey ));
1327
+ specQ .addLast (new MultiReadSpec (keyPrefixes .get (i ),
1328
+ null , null , null , true ));
1329
+
1330
+ List <TorcSerializedEdge > eList = new LinkedList <>();
1331
+ eListMap .put (keyPrefixes .get (i ), eList );
1332
+ }
1333
+
1334
+ /* Go through request queue and read at most MAX_ASYNC_READS at a time. */
1335
+ while (requestQ .size () > 0 ) {
1336
+ int batchSize = Math .min (requestQ .size (), DEFAULT_MAX_MULTIREAD_SIZE );
1337
+ MultiReadObject [] requests = new MultiReadObject [batchSize ];
1338
+ for (int i = 0 ; i < batchSize ; i ++) {
1339
+ requests [i ] = requestQ .removeFirst ();
1340
+ }
1341
+
1342
+ client .read (requests );
1343
+
1344
+ /* Process this batch, adding more MultiReadObjects to the queue if
1345
+ * needed. */
1346
+ for (int i = 0 ; i < batchSize ; i ++) {
1347
+ MultiReadSpec spec = specQ .removeFirst ();
1348
+
1349
+ if (requests [i ].getStatus () != Status .STATUS_OK ) {
1350
+ if (requests [i ].getStatus () == Status .STATUS_OBJECT_DOESNT_EXIST ) {
1351
+ continue ;
1352
+ } else {
1353
+ throw new RuntimeException ("Segment had status " +
1354
+ requests [i ].getStatus ());
1355
+ }
1356
+ }
1357
+
1358
+ List <TorcSerializedEdge > eList = eListMap .get (spec .keyPrefix );
1359
+
1360
+ ByteBuffer seg =
1361
+ ByteBuffer .allocate (requests [i ].getValueBytes ().length )
1362
+ .order (ByteOrder .LITTLE_ENDIAN );
1363
+ seg .put (requests [i ].getValueBytes ());
1364
+ seg .flip ();
1365
+
1366
+ if (spec .isHeadSeg ) {
1367
+ /* Queue up async. reads for tail segments. */
1368
+ int numTailSegments = seg .getInt ();
1369
+ for (int j = numTailSegments ; j > 0 ; --j ) {
1370
+ byte [] tailSegKey = getSegmentKey (spec .keyPrefix , j );
1371
+ requestQ .addLast (new MultiReadObject (rcTableId , tailSegKey ));
1372
+ spec .isHeadSeg = false ;
1373
+ specQ .addLast (spec );
1374
+ }
1375
+ }
1376
+
1377
+ while (seg .hasRemaining ()) {
1378
+ byte [] neighborIdBytes = new byte [UInt128 .BYTES ];
1379
+ seg .get (neighborIdBytes );
1380
+
1381
+ UInt128 neighborId = new UInt128 (neighborIdBytes );
1382
+
1383
+ short propLen = seg .getShort ();
1384
+
1385
+ byte [] serializedProperties = new byte [propLen ];
1386
+ seg .get (serializedProperties );
1387
+
1388
+ eList .add (new TorcSerializedEdge (serializedProperties , neighborId ));
1389
+ }
1390
+ }
1391
+ }
1392
+
1393
+ return eListMap ;
1394
+ }
1395
+
1186
1396
/**
1187
1397
* Creates a RAMCloud key for the given edge list segment.
1188
1398
*
0 commit comments