|
30 | 30 | from pyspark.rdd import RDD, PipelinedRDD
|
31 | 31 | from pyspark.serializers import BatchedSerializer, PickleSerializer, CloudPickleSerializer
|
32 | 32 | from pyspark.storagelevel import StorageLevel
|
| 33 | +from pyspark.traceback_utils import SCCallSiteSync |
33 | 34 |
|
34 | 35 | from itertools import chain, ifilter, imap
|
35 | 36 |
|
@@ -1559,7 +1560,7 @@ def limit(self, num):
|
1559 | 1560 | >>> srdd.limit(0).collect()
|
1560 | 1561 | []
|
1561 | 1562 | """
|
1562 |
| - rdd = self._jschema_rdd.limit(num) |
| 1563 | + rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD() |
1563 | 1564 | return SchemaRDD(rdd, self.sql_ctx)
|
1564 | 1565 |
|
1565 | 1566 | def saveAsParquetFile(self, path):
|
@@ -1651,9 +1652,8 @@ def collect(self):
|
1651 | 1652 | >>> srdd.collect()
|
1652 | 1653 | [Row(field1=1, field2=u'row1'), ..., Row(field1=3, field2=u'row3')]
|
1653 | 1654 | """
|
1654 |
| - from pyspark.context import JavaStackTrace |
1655 |
| - with JavaStackTrace(self.context) as st: |
1656 |
| - bytesInJava = self._jschema_rdd.collectToPython().iterator() |
| 1655 | + with SCCallSiteSync(self.context) as css: |
| 1656 | + bytesInJava = self._jschema_rdd.baseSchemaRDD().collectToPython().iterator() |
1657 | 1657 | cls = _create_cls(self.schema())
|
1658 | 1658 | return map(cls, self._collect_iterator_through_file(bytesInJava))
|
1659 | 1659 |
|
|
0 commit comments