Skip to content

[SPARK-6781] [SQL] use sqlContext in python shell #5425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext

sc = SparkContext(appName="SimpleTextClassificationPipeline")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)

# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
Expand Down
4 changes: 2 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1642,15 +1642,15 @@ moved into the udf object in `SQLContext`.
<div data-lang="scala" markdown="1">
{% highlight java %}

sqlCtx.udf.register("strLen", (s: String) => s.length())
sqlContext.udf.register("strLen", (s: String) => s.length())

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

sqlCtx.udf().register("strLen", (String s) -> { s.length(); });
sqlContext.udf().register("strLen", (String s) -> { s.length(); });

{% endhighlight %}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void setAge(int age) {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
SQLContext sqlCtx = new SQLContext(ctx);
SQLContext sqlContext = new SQLContext(ctx);

System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
Expand All @@ -74,11 +74,11 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class);
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
Expand All @@ -99,12 +99,12 @@ public String call(Row row) {
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");
DataFrame parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand All @@ -120,7 +120,7 @@ public String call(Row row) {
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a DataFrame from the file(s) pointed by path
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);
DataFrame peopleFromJsonFile = sqlContext.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -133,8 +133,8 @@ public String call(Row row) {
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
Expand All @@ -151,7 +151,7 @@ public String call(Row row) {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
Expand All @@ -164,7 +164,7 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

if __name__ == "__main__":
sc = SparkContext(appName="SimpleTextClassificationPipeline")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)

# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
Expand Down
6 changes: 3 additions & 3 deletions examples/src/main/python/mllib/dataset_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ def summarize(dataset):
print >> sys.stderr, "Usage: dataset_example.py <libsvm file>"
exit(-1)
sc = SparkContext(appName="DatasetExample")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)
if len(sys.argv) == 2:
input = sys.argv[1]
else:
input = "data/mllib/sample_libsvm_data.txt"
points = MLUtils.loadLibSVMFile(sc, input)
dataset0 = sqlCtx.inferSchema(points).setName("dataset0").cache()
dataset0 = sqlContext.inferSchema(points).setName("dataset0").cache()
summarize(dataset0)
tempdir = tempfile.NamedTemporaryFile(delete=False).name
os.unlink(tempdir)
print "Save dataset as a Parquet file to %s." % tempdir
dataset0.saveAsParquetFile(tempdir)
print "Load it back and summarize it again."
dataset1 = sqlCtx.parquetFile(tempdir).setName("dataset1").cache()
dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
summarize(dataset1)
shutil.rmtree(tempdir)
4 changes: 2 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class LogisticRegressionModel(JavaModel):
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
sc = SparkContext("local[2]", "ml.feature tests")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx
globs['sqlContext'] = sqlContext
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
sc.stop()
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
sc = SparkContext("local[2]", "ml.feature tests")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx
globs['sqlContext'] = sqlContext
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
sc.stop()
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
try:
# Try to access HiveConf, it will raise exception if Hive is not added
sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
sqlCtx = HiveContext(sc)
sqlCtx = sqlContext = HiveContext(sc)
except py4j.protocol.Py4JError:
sqlCtx = SQLContext(sc)
sqlCtx = sqlContext = SQLContext(sc)

print("""Welcome to
____ __
Expand All @@ -68,7 +68,7 @@
platform.python_version(),
platform.python_build()[0],
platform.python_build()[1]))
print("SparkContext available as sc, %s available as sqlCtx." % sqlCtx.__class__.__name__)
print("SparkContext available as sc, %s available as sqlContext." % sqlContext.__class__.__name__)

if add_files is not None:
print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead")
Expand Down
Loading