Skip to content

Commit 6bc4a2c

Browse files
author
Matthew Farrellee
committed
[SPARK-3519] add distinct(n) to SchemaRDD in PySpark
1 parent 7a17f2b commit 6bc4a2c

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

python/pyspark/sql.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1694,8 +1694,11 @@ def coalesce(self, numPartitions, shuffle=False):
16941694
rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)
16951695
return SchemaRDD(rdd, self.sql_ctx)
16961696

1697-
def distinct(self):
1698-
rdd = self._jschema_rdd.distinct()
1697+
def distinct(self, numPartitions=None):
1698+
if numPartitions is None:
1699+
rdd = self._jschema_rdd.distinct()
1700+
else:
1701+
rdd = self._jschema_rdd.distinct(numPartitions)
16991702
return SchemaRDD(rdd, self.sql_ctx)
17001703

17011704
def intersection(self, other):

python/pyspark/tests.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,19 @@ def test_basic_functions(self):
646646
srdd.count()
647647
srdd.collect()
648648

649+
def test_distinct(self):
650+
rdd = self.sc.parallelize(['{"a": 1}', '{"b": 2}', '{"c": 3}']*10)
651+
srdd = self.sqlCtx.jsonRDD(rdd).distinct()
652+
self.assertEquals(srdd.count(), 3)
653+
654+
def test_distinct_numPartitions(self):
655+
rdd = self.sc.parallelize(['{"a": 1}', '{"b": 2}', '{"c": 3}']*10, 10)
656+
srdd = self.sqlCtx.jsonRDD(rdd)
657+
self.assertEquals(srdd.getNumPartitions(), 10)
658+
result = srdd.distinct(5)
659+
self.assertEquals(result.getNumPartitions(), 5)
660+
self.assertEquals(result.count(), 3)
661+
649662

650663
class TestIO(PySparkTestCase):
651664

0 commit comments

Comments
 (0)