@@ -30,7 +30,7 @@ class SQLContext:
30
30
tables, execute SQL over tables, cache tables, and read parquet files.
31
31
"""
32
32
33
- def __init__ (self , sparkContext , sqlContext = None ):
33
+ def __init__ (self , sparkContext , sqlContext = None ):
34
34
"""Create a new SQLContext.
35
35
36
36
@param sparkContext: The SparkContext to wrap.
@@ -137,7 +137,6 @@ def parquetFile(self, path):
137
137
jschema_rdd = self ._ssql_ctx .parquetFile (path )
138
138
return SchemaRDD (jschema_rdd , self )
139
139
140
-
141
140
def jsonFile (self , path ):
142
141
"""Loads a text file storing one JSON object per line,
143
142
returning the result as a L{SchemaRDD}.
@@ -234,8 +233,8 @@ def _ssql_ctx(self):
234
233
self ._scala_HiveContext = self ._get_hive_ctx ()
235
234
return self ._scala_HiveContext
236
235
except Py4JError as e :
237
- raise Exception ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run " \
238
- "sbt/sbt assembly" , e )
236
+ raise Exception ("You must build Spark with Hive. Export 'SPARK_HIVE=true' and run "
237
+ "sbt/sbt assembly" , e )
239
238
240
239
def _get_hive_ctx (self ):
241
240
return self ._jvm .HiveContext (self ._jsc .sc ())
@@ -377,7 +376,7 @@ def registerAsTable(self, name):
377
376
"""
378
377
self ._jschema_rdd .registerAsTable (name )
379
378
380
- def insertInto (self , tableName , overwrite = False ):
379
+ def insertInto (self , tableName , overwrite = False ):
381
380
"""Inserts the contents of this SchemaRDD into the specified table.
382
381
383
382
Optionally overwriting any existing data.
@@ -420,7 +419,7 @@ def _toPython(self):
420
419
# in Java land in the javaToPython function. May require a custom
421
420
# pickle serializer in Pyrolite
422
421
return RDD (jrdd , self ._sc , BatchedSerializer (
423
- PickleSerializer ())).map (lambda d : Row (d ))
422
+ PickleSerializer ())).map (lambda d : Row (d ))
424
423
425
424
# We override the default cache/persist/checkpoint behavior as we want to cache the underlying
426
425
# SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class
@@ -483,6 +482,7 @@ def subtract(self, other, numPartitions=None):
483
482
else :
484
483
raise ValueError ("Can only subtract another SchemaRDD" )
485
484
485
+
486
486
def _test ():
487
487
import doctest
488
488
from array import array
@@ -493,25 +493,29 @@ def _test():
493
493
sc = SparkContext ('local[4]' , 'PythonTest' , batchSize = 2 )
494
494
globs ['sc' ] = sc
495
495
globs ['sqlCtx' ] = SQLContext (sc )
496
- globs ['rdd' ] = sc .parallelize ([{"field1" : 1 , "field2" : "row1" },
497
- {"field1" : 2 , "field2" : "row2" }, {"field1" : 3 , "field2" : "row3" }])
498
- jsonStrings = ['{"field1": 1, "field2": "row1", "field3":{"field4":11}}' ,
499
- '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}' ,
500
- '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}' ]
496
+ globs ['rdd' ] = sc .parallelize (
497
+ [{"field1" : 1 , "field2" : "row1" },
498
+ {"field1" : 2 , "field2" : "row2" },
499
+ {"field1" : 3 , "field2" : "row3" }]
500
+ )
501
+ jsonStrings = [
502
+ '{"field1": 1, "field2": "row1", "field3":{"field4":11}}' ,
503
+ '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]}, "field6":[{"field7": "row2"}]}' ,
504
+ '{"field1" : null, "field2": "row3", "field3":{"field4":33, "field5": []}}'
505
+ ]
501
506
globs ['jsonStrings' ] = jsonStrings
502
507
globs ['json' ] = sc .parallelize (jsonStrings )
503
508
globs ['nestedRdd1' ] = sc .parallelize ([
504
- {"f1" : array ('i' , [1 , 2 ]), "f2" : {"row1" : 1.0 }},
505
- {"f1" : array ('i' , [2 , 3 ]), "f2" : {"row2" : 2.0 }}])
509
+ {"f1" : array ('i' , [1 , 2 ]), "f2" : {"row1" : 1.0 }},
510
+ {"f1" : array ('i' , [2 , 3 ]), "f2" : {"row2" : 2.0 }}])
506
511
globs ['nestedRdd2' ] = sc .parallelize ([
507
- {"f1" : [[1 , 2 ], [2 , 3 ]], "f2" : set ([1 , 2 ]), "f3" : (1 , 2 )},
508
- {"f1" : [[2 , 3 ], [3 , 4 ]], "f2" : set ([2 , 3 ]), "f3" : (2 , 3 )}])
509
- (failure_count , test_count ) = doctest .testmod (globs = globs ,optionflags = doctest .ELLIPSIS )
512
+ {"f1" : [[1 , 2 ], [2 , 3 ]], "f2" : set ([1 , 2 ]), "f3" : (1 , 2 )},
513
+ {"f1" : [[2 , 3 ], [3 , 4 ]], "f2" : set ([2 , 3 ]), "f3" : (2 , 3 )}])
514
+ (failure_count , test_count ) = doctest .testmod (globs = globs , optionflags = doctest .ELLIPSIS )
510
515
globs ['sc' ].stop ()
511
516
if failure_count :
512
517
exit (- 1 )
513
518
514
519
515
520
if __name__ == "__main__" :
516
521
_test ()
517
-
0 commit comments