@@ -730,6 +730,7 @@ setMethod("take",
730
730
index <- - 1
731
731
jrdd <- getJRDD(x )
732
732
numPartitions <- numPartitions(x )
733
+ serializedModeRDD <- getSerializedMode(x )
733
734
734
735
# TODO(shivaram): Collect more than one partition based on size
735
736
# estimates similar to the scala version of `take`.
@@ -748,13 +749,14 @@ setMethod("take",
748
749
elems <- convertJListToRList(partition ,
749
750
flatten = TRUE ,
750
751
logicalUpperBound = size ,
751
- serializedMode = getSerializedMode( x ) )
752
- # TODO: Check if this append is O(n^2)?
752
+ serializedMode = serializedModeRDD )
753
+
753
754
resList <- append(resList , elems )
754
755
}
755
756
resList
756
757
})
757
758
759
+
758
760
# ' First
759
761
# '
760
762
# ' Return the first element of an RDD
@@ -1092,21 +1094,42 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
1092
1094
if (num < length(part )) {
1093
1095
# R limitation: order works only on primitive types!
1094
1096
ord <- order(unlist(part , recursive = FALSE ), decreasing = ! ascending )
1095
- list ( part [ord [1 : num ]])
1097
+ part [ord [1 : num ]]
1096
1098
} else {
1097
- list ( part )
1099
+ part
1098
1100
}
1099
1101
}
1100
1102
1101
- reduceFunc <- function (elems , part ) {
1102
- newElems <- append(elems , part )
1103
- # R limitation: order works only on primitive types!
1104
- ord <- order(unlist(newElems , recursive = FALSE ), decreasing = ! ascending )
1105
- newElems [ord [1 : num ]]
1106
- }
1107
-
1108
1103
newRdd <- mapPartitions(x , partitionFunc )
1109
- reduce(newRdd , reduceFunc )
1104
+
1105
+ resList <- list ()
1106
+ index <- - 1
1107
+ jrdd <- getJRDD(newRdd )
1108
+ numPartitions <- numPartitions(newRdd )
1109
+ serializedModeRDD <- getSerializedMode(newRdd )
1110
+
1111
+ while (TRUE ) {
1112
+ index <- index + 1
1113
+
1114
+ if (index > = numPartitions ) {
1115
+ ord <- order(unlist(resList , recursive = FALSE ), decreasing = ! ascending )
1116
+ resList <- resList [ord [1 : num ]]
1117
+ break
1118
+ }
1119
+
1120
+ # a JList of byte arrays
1121
+ partitionArr <- callJMethod(jrdd , " collectPartitions" , as.list(as.integer(index )))
1122
+ partition <- partitionArr [[1 ]]
1123
+
1124
+ # elems is capped to have at most `num` elements
1125
+ elems <- convertJListToRList(partition ,
1126
+ flatten = TRUE ,
1127
+ logicalUpperBound = num ,
1128
+ serializedMode = serializedModeRDD )
1129
+
1130
+ resList <- append(resList , elems )
1131
+ }
1132
+ resList
1110
1133
}
1111
1134
1112
1135
# ' Returns the first N elements from an RDD in ascending order.
@@ -1465,67 +1488,105 @@ setMethod("zipRDD",
1465
1488
stop(" Can only zip RDDs which have the same number of partitions." )
1466
1489
}
1467
1490
1468
- if (getSerializedMode(x ) != getSerializedMode(other ) ||
1469
- getSerializedMode(x ) == " byte" ) {
1470
- # Append the number of elements in each partition to that partition so that we can later
1471
- # check if corresponding partitions of both RDDs have the same number of elements.
1472
- #
1473
- # Note that this appending also serves the purpose of reserialization, because even if
1474
- # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
1475
- # as a single byte array. For example, partitions of an RDD generated from partitionBy()
1476
- # may be encoded as multiple byte arrays.
1477
- appendLength <- function (part ) {
1478
- part [[length(part ) + 1 ]] <- length(part ) + 1
1479
- part
1480
- }
1481
- x <- lapplyPartition(x , appendLength )
1482
- other <- lapplyPartition(other , appendLength )
1483
- }
1491
+ rdds <- appendPartitionLengths(x , other )
1492
+ jrdd <- callJMethod(getJRDD(rdds [[1 ]]), " zip" , getJRDD(rdds [[2 ]]))
1493
+ # The jrdd's elements are of scala Tuple2 type. The serialized
1494
+ # flag here is used for the elements inside the tuples.
1495
+ rdd <- RDD(jrdd , getSerializedMode(rdds [[1 ]]))
1484
1496
1485
- zippedJRDD <- callJMethod(getJRDD(x ), " zip" , getJRDD(other ))
1486
- # The zippedRDD's elements are of scala Tuple2 type. The serialized
1487
- # flag Here is used for the elements inside the tuples.
1488
- serializerMode <- getSerializedMode(x )
1489
- zippedRDD <- RDD(zippedJRDD , serializerMode )
1497
+ mergePartitions(rdd , TRUE )
1498
+ })
1499
+
1500
+ # ' Cartesian product of this RDD and another one.
1501
+ # '
1502
+ # ' Return the Cartesian product of this RDD and another one,
1503
+ # ' that is, the RDD of all pairs of elements (a, b) where a
1504
+ # ' is in this and b is in other.
1505
+ # '
1506
+ # ' @param x An RDD.
1507
+ # ' @param other An RDD.
1508
+ # ' @return A new RDD which is the Cartesian product of these two RDDs.
1509
+ # ' @examples
1510
+ # '\dontrun{
1511
+ # ' sc <- sparkR.init()
1512
+ # ' rdd <- parallelize(sc, 1:2)
1513
+ # ' sortByKey(cartesian(rdd, rdd))
1514
+ # ' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
1515
+ # '}
1516
+ # ' @rdname cartesian
1517
+ # ' @aliases cartesian,RDD,RDD-method
1518
+ setMethod ("cartesian ",
1519
+ signature(x = " RDD" , other = " RDD" ),
1520
+ function (x , other ) {
1521
+ rdds <- appendPartitionLengths(x , other )
1522
+ jrdd <- callJMethod(getJRDD(rdds [[1 ]]), " cartesian" , getJRDD(rdds [[2 ]]))
1523
+ # The jrdd's elements are of scala Tuple2 type. The serialized
1524
+ # flag here is used for the elements inside the tuples.
1525
+ rdd <- RDD(jrdd , getSerializedMode(rdds [[1 ]]))
1490
1526
1491
- partitionFunc <- function (split , part ) {
1492
- len <- length(part )
1493
- if (len > 0 ) {
1494
- if (serializerMode == " byte" ) {
1495
- lengthOfValues <- part [[len ]]
1496
- lengthOfKeys <- part [[len - lengthOfValues ]]
1497
- stopifnot(len == lengthOfKeys + lengthOfValues )
1498
-
1499
- # check if corresponding partitions of both RDDs have the same number of elements.
1500
- if (lengthOfKeys != lengthOfValues ) {
1501
- stop(" Can only zip RDDs with same number of elements in each pair of corresponding partitions." )
1502
- }
1503
-
1504
- if (lengthOfKeys > 1 ) {
1505
- keys <- part [1 : (lengthOfKeys - 1 )]
1506
- values <- part [(lengthOfKeys + 1 ) : (len - 1 )]
1507
- } else {
1508
- keys <- list ()
1509
- values <- list ()
1510
- }
1511
- } else {
1512
- # Keys, values must have same length here, because this has
1513
- # been validated inside the JavaRDD.zip() function.
1514
- keys <- part [c(TRUE , FALSE )]
1515
- values <- part [c(FALSE , TRUE )]
1516
- }
1517
- mapply(
1518
- function (k , v ) {
1519
- list (k , v )
1520
- },
1521
- keys ,
1522
- values ,
1523
- SIMPLIFY = FALSE ,
1524
- USE.NAMES = FALSE )
1525
- } else {
1526
- part
1527
- }
1527
+ mergePartitions(rdd , FALSE )
1528
+ })
1529
+
1530
+ # ' Subtract an RDD with another RDD.
1531
+ # '
1532
+ # ' Return an RDD with the elements from this that are not in other.
1533
+ # '
1534
+ # ' @param x An RDD.
1535
+ # ' @param other An RDD.
1536
+ # ' @param numPartitions Number of the partitions in the result RDD.
1537
+ # ' @return An RDD with the elements from this that are not in other.
1538
+ # ' @examples
1539
+ # '\dontrun{
1540
+ # ' sc <- sparkR.init()
1541
+ # ' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
1542
+ # ' rdd2 <- parallelize(sc, list(2, 4))
1543
+ # ' collect(subtract(rdd1, rdd2))
1544
+ # ' # list(1, 1, 3)
1545
+ # '}
1546
+ # ' @rdname subtract
1547
+ # ' @aliases subtract,RDD
1548
+ setMethod ("subtract ",
1549
+ signature(x = " RDD" , other = " RDD" ),
1550
+ function (x , other , numPartitions = SparkR :: numPartitions(x )) {
1551
+ mapFunction <- function (e ) { list (e , NA ) }
1552
+ rdd1 <- map(x , mapFunction )
1553
+ rdd2 <- map(other , mapFunction )
1554
+ keys(subtractByKey(rdd1 , rdd2 , numPartitions ))
1555
+ })
1556
+
1557
+ # ' Intersection of this RDD and another one.
1558
+ # '
1559
+ # ' Return the intersection of this RDD and another one.
1560
+ # ' The output will not contain any duplicate elements,
1561
+ # ' even if the input RDDs did. Performs a hash partition
1562
+ # ' across the cluster.
1563
+ # ' Note that this method performs a shuffle internally.
1564
+ # '
1565
+ # ' @param x An RDD.
1566
+ # ' @param other An RDD.
1567
+ # ' @param numPartitions The number of partitions in the result RDD.
1568
+ # ' @return An RDD which is the intersection of these two RDDs.
1569
+ # ' @examples
1570
+ # '\dontrun{
1571
+ # ' sc <- sparkR.init()
1572
+ # ' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
1573
+ # ' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
1574
+ # ' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
1575
+ # ' # list(1, 2, 3)
1576
+ # '}
1577
+ # ' @rdname intersection
1578
+ # ' @aliases intersection,RDD
1579
+ setMethod ("intersection ",
1580
+ signature(x = " RDD" , other = " RDD" ),
1581
+ function (x , other , numPartitions = SparkR :: numPartitions(x )) {
1582
+ rdd1 <- map(x , function (v ) { list (v , NA ) })
1583
+ rdd2 <- map(other , function (v ) { list (v , NA ) })
1584
+
1585
+ filterFunction <- function (elem ) {
1586
+ iters <- elem [[2 ]]
1587
+ all(as.vector(
1588
+ lapply(iters , function (iter ) { length(iter ) > 0 }), mode = " logical" ))
1528
1589
}
1529
-
1530
- PipelinedRDD( zippedRDD , partitionFunc )
1590
+
1591
+ keys(filterRDD(cogroup( rdd1 , rdd2 , numPartitions = numPartitions ), filterFunction ) )
1531
1592
})
0 commit comments