Skip to content

Commit 4eec962

Browse files
committed
refactor join functions
1 parent f88bc68 commit 4eec962

File tree

2 files changed

+58
-124
lines changed

2 files changed

+58
-124
lines changed

pkg/R/RDD.R

Lines changed: 8 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ setMethod("checkpoint",
309309
#'\dontrun{
310310
#' sc <- sparkR.init()
311311
#' rdd <- parallelize(sc, 1:10, 2L)
312-
#' numParititions(rdd) # 2L
312+
#' numPartitions(rdd) # 2L
313313
#'}
314314
setGeneric("numPartitions", function(rdd) { standardGeneric("numPartitions") })
315315

@@ -1614,31 +1614,7 @@ setMethod("join",
16141614
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })
16151615

16161616
doJoin <- function(v) {
1617-
t1 <- vector("list", length(v))
1618-
t2 <- vector("list", length(v))
1619-
index1 <- 1
1620-
index2 <- 1
1621-
for (x in v) {
1622-
if (x[[1]] == 1L) {
1623-
t1[[index1]] <- x[[2]]
1624-
index1 <- index1 + 1
1625-
} else {
1626-
t2[[index2]] <- x[[2]]
1627-
index2 <- index2 + 1
1628-
}
1629-
}
1630-
length(t1) <- index1 - 1
1631-
length(t2) <- index2 - 1
1632-
1633-
result <- list()
1634-
length(result) <- length(t1) * length(t2)
1635-
index <- 1
1636-
for (i in t1) {
1637-
for (j in t2) {
1638-
result[[index]] <- list(i, j)
1639-
index <- index + 1
1640-
}
1641-
}
1617+
result <- joinTaggedList(v, list(FALSE, FALSE))
16421618
result
16431619
}
16441620

@@ -1678,36 +1654,7 @@ setMethod("leftOuterJoin",
16781654
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })
16791655

16801656
doJoin <- function(v) {
1681-
t1 <- vector("list", length(v))
1682-
t2 <- vector("list", length(v))
1683-
index1 <- 1
1684-
index2 <- 1
1685-
for (x in v) {
1686-
if (x[[1]] == 1L) {
1687-
t1[[index1]] <- x[[2]]
1688-
index1 <- index1 + 1
1689-
} else {
1690-
t2[[index2]] <- x[[2]]
1691-
index2 <- index2 + 1
1692-
}
1693-
}
1694-
length(t1) <- index1 - 1
1695-
len2 <- index2 - 1
1696-
if (len2 == 0) {
1697-
t2 <- list(NULL)
1698-
} else {
1699-
length(t2) <- len2
1700-
}
1701-
1702-
result <- list()
1703-
length(result) <- length(t1) * length(t2)
1704-
index <- 1
1705-
for (i in t1) {
1706-
for (j in t2) {
1707-
result[[index]] <- list(i, j)
1708-
index <- index + 1
1709-
}
1710-
}
1657+
result <- joinTaggedList(v, list(FALSE, TRUE))
17111658
result
17121659
}
17131660

@@ -1747,36 +1694,7 @@ setMethod("rightOuterJoin",
17471694
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })
17481695

17491696
doJoin <- function(v) {
1750-
t1 <- vector("list", length(v))
1751-
t2 <- vector("list", length(v))
1752-
index1 <- 1
1753-
index2 <- 1
1754-
for (x in v) {
1755-
if (x[[1]] == 1L) {
1756-
t1[[index1]] <- x[[2]]
1757-
index1 <- index1 + 1
1758-
} else {
1759-
t2[[index2]] <- x[[2]]
1760-
index2 <- index2 + 1
1761-
}
1762-
}
1763-
len1 <- index1 - 1
1764-
if (len1 == 0) {
1765-
t1 <- list(NULL)
1766-
} else {
1767-
length(t1) <- len1
1768-
}
1769-
length(t2) <- index2 - 1
1770-
1771-
result <- list()
1772-
length(result) <- length(t1) * length(t2)
1773-
index <- 1
1774-
for (i in t1) {
1775-
for (j in t2) {
1776-
result[[index]] <- list(i, j)
1777-
index <- index + 1
1778-
}
1779-
}
1697+
result <- joinTaggedList(v, list(TRUE, FALSE))
17801698
result
17811699
}
17821700

@@ -1807,59 +1725,25 @@ setMethod("rightOuterJoin",
18071725
#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
18081726
#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
18091727
#' # list(1, list(3, 1)),
1810-
#' # list(3, list(3, NULL)),
18111728
#' # list(2, list(NULL, 4)))
1729+
#' # list(3, list(3, NULL)),
18121730
#'}
18131731
setGeneric("fullOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("fullOuterJoin") })
18141732

18151733
#' @rdname fullOuterJoin
18161734
#' @aliases fullOuterJoin,RDD,RDD-method
1735+
18171736
setMethod("fullOuterJoin",
18181737
signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"),
18191738
function(rdd1, rdd2, numPartitions) {
18201739
rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) })
18211740
rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) })
18221741

18231742
doJoin <- function(v) {
1824-
t1 <- vector("list", length(v))
1825-
t2 <- vector("list", length(v))
1826-
index1 <- 1
1827-
index2 <- 1
1828-
for (x in v) {
1829-
if (x[[1]] == 1L) {
1830-
t1[[index1]] <- x[[2]]
1831-
index1 <- index1 + 1
1832-
} else {
1833-
t2[[index2]] <- x[[2]]
1834-
index2 <- index2 + 1
1835-
}
1836-
}
1837-
len1 <- index1 - 1
1838-
len2 <- index2 - 1
1839-
1840-
if (len1 == 0) {
1841-
t1 <- list(NULL)
1842-
} else {
1843-
length(t1) <- len1
1844-
}
1845-
1846-
if (len2 == 0) {
1847-
t2 <- list(NULL)
1848-
} else {
1849-
length(t2) <- len2
1850-
}
1851-
1852-
result <- list()
1853-
length(result) <- length(t1) * length(t2)
1854-
index <- 1
1855-
for(i in t1) {
1856-
for(j in t2) {
1857-
result[[index]] <- list(i, j)
1858-
index <- index + 1
1859-
}
1860-
}
1743+
result <- joinTaggedList(v, list(TRUE, TRUE))
18611744
result
18621745
}
1746+
18631747
joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin)
18641748
})
18651749

pkg/R/utils.R

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,53 @@ sortKeyValueList <- function(kv_list) {
218218
keys <- sapply(kv_list, function(x) x[[1]])
219219
kv_list[order(keys)]
220220
}
221+
222+
## Utility function to generate compact R lists from grouped rdd
223+
# Used in Join-family functions
224+
genCompactLists <- function(tagged_list, cnull) {
225+
len <- length(tagged_list)
226+
num <- length(cnull)
227+
lists <- list(vector("list", len), vector("list", len))
228+
index <- list(1, 1)
229+
230+
for (x in tagged_list) {
231+
tag <- x[[1]]
232+
idx <- index[[tag]]
233+
lists[[tag]][[idx]] <- x[[2]]
234+
index[[tag]] <- idx + 1
235+
}
236+
237+
len <- lapply(index, function(x) x-1)
238+
for (i in (1:num)) {
239+
if (cnull[[i]] && len[[i]] == 0) {
240+
lists[[i]] <- list(NULL)
241+
} else {
242+
length(lists[[i]]) <- len[[i]]
243+
}
244+
}
245+
246+
lists
247+
}
248+
249+
## Utility function to merge compact R lists
250+
# Used in Join-family functions
251+
mergeCompactLists <- function(left, right) {
252+
result <- list()
253+
length(result) <- length(left) * length(right)
254+
index <- 1
255+
for (i in left) {
256+
for (j in right) {
257+
result[[index]] <- list(i, j)
258+
index <- index + 1
259+
}
260+
}
261+
result
262+
}
263+
264+
## Utility function to wrapper above two operations
265+
# Used in Join-family functions
266+
joinTaggedList <- function(tagged_list, cnull) {
267+
lists <- genCompactLists(tagged_list, cnull)
268+
result <- mergeCompactLists(lists[[1]], lists[[2]])
269+
result
270+
}

0 commit comments

Comments
 (0)