Skip to content

[SQL] Minor fixes. #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class SqlParser extends StandardTokenParsers {

protected lazy val relationFactor: Parser[LogicalPlan] =
ident ~ (opt(AS) ~> opt(ident)) ^^ {
case ident ~ alias => UnresolvedRelation(alias, ident)
case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
} |
"(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ package object dsl {
def > (other: Expression) = GreaterThan(expr, other)
def >= (other: Expression) = GreaterThanOrEqual(expr, other)
def === (other: Expression) = Equals(expr, other)
def != (other: Expression) = Not(Equals(expr, other))
def !== (other: Expression) = Not(Equals(expr, other))

def like(other: Expression) = Like(expr, other)
def rlike(other: Expression) = RLike(expr, other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.types.NativeType

object Row {
/**
* This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
* {{{
* import org.apache.spark.sql._
*
* val pairs = sql("SELECT key, value FROM src").rdd.map {
* case Row(key: Int, value: String) =>
* key -> value
* }
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
}

/**
* Represents one row of output from a relational operator. Allows both generic access by ordinal,
* which will incur boxing overhead for primitives, as well as native primitive access.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
a.nullable)(
a.exprId,
a.qualifiers)
case other => other
}

def references = Set.empty
Expand Down
15 changes: 1 addition & 14 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,5 @@ package object sql {

type Row = catalyst.expressions.Row

object Row {
/**
* This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
* {{{
* import org.apache.spark.sql._
*
* val pairs = sql("SELECT key, value FROM src").rdd.map {
* case Row(key: Int, value: String) =>
* key -> value
* }
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
}
val Row = catalyst.expressions.Row
}
16 changes: 8 additions & 8 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,17 @@ class SchemaRDD(
*
* @param otherPlan the [[SchemaRDD]] that should be joined with this one.
* @param joinType One of `Inner`, `LeftOuter`, `RightOuter`, or `FullOuter`. Defaults to `Inner.`
* @param condition An optional condition for the join operation. This is equivilent to the `ON`
* clause in standard SQL. In the case of `Inner` joins, specifying a
* `condition` is equivilent to adding `where` clauses after the `join`.
* @param on An optional condition for the join operation. This is equivilent to the `ON`
* clause in standard SQL. In the case of `Inner` joins, specifying a
* `condition` is equivilent to adding `where` clauses after the `join`.
*
* @group Query
*/
def join(
otherPlan: SchemaRDD,
joinType: JoinType = Inner,
condition: Option[Expression] = None): SchemaRDD =
new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, condition))
on: Option[Expression] = None): SchemaRDD =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to update the scaladoc above

new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, on))

/**
* Sorts the results by the given expressions.
Expand Down Expand Up @@ -195,14 +195,14 @@ class SchemaRDD(
* with the same name, for example, when peforming self-joins.
*
* {{{
* val x = schemaRDD.where('a === 1).subquery('x)
* val y = schemaRDD.where('a === 2).subquery('y)
* val x = schemaRDD.where('a === 1).as('x)
* val y = schemaRDD.where('a === 2).as('y)
* x.join(y).where("x.a".attr === "y.a".attr),
* }}}
*
* @group Query
*/
def subquery(alias: Symbol) =
def as(alias: Symbol) =
new SchemaRDD(sqlContext, Subquery(alias.name, logicalPlan))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
self: Product =>
Expand Down Expand Up @@ -69,6 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case InMemoryColumnarTableScan(output, child) =>
InMemoryColumnarTableScan(output.map(_.newInstance), child)
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,17 @@ class CachedTableSuite extends QueryTest {
TestSQLContext.uncacheTable("testData")
}
}

test("SELECT Star Cached Table") {
TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
TestSQLContext.cacheTable("selectStar")
TestSQLContext.sql("SELECT * FROM selectStar")
TestSQLContext.uncacheTable("selectStar")
}

test("Self-join cached") {
TestSQLContext.cacheTable("testData")
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
TestSQLContext.uncacheTable("testData")
}
}
16 changes: 8 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ class DslQuerySuite extends QueryTest {
}

test("inner join, where, multiple matches") {
val x = testData2.where('a === 1).subquery('x)
val y = testData2.where('a === 1).subquery('y)
val x = testData2.where('a === 1).as('x)
val y = testData2.where('a === 1).as('y)
checkAnswer(
x.join(y).where("x.a".attr === "y.a".attr),
(1,1,1,1) ::
Expand All @@ -131,17 +131,17 @@ class DslQuerySuite extends QueryTest {
}

test("inner join, no matches") {
val x = testData2.where('a === 1).subquery('x)
val y = testData2.where('a === 2).subquery('y)
val x = testData2.where('a === 1).as('x)
val y = testData2.where('a === 2).as('y)
checkAnswer(
x.join(y).where("x.a".attr === "y.a".attr),
Nil)
}

test("big inner join, 4 matches per row") {
val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData)
val bigDataX = bigData.subquery('x)
val bigDataY = bigData.subquery('y)
val bigDataX = bigData.as('x)
val bigDataY = bigData.as('y)

checkAnswer(
bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr),
Expand Down Expand Up @@ -181,8 +181,8 @@ class DslQuerySuite extends QueryTest {
}

test("full outer join") {
val left = upperCaseData.where('N <= 4).subquery('left)
val right = upperCaseData.where('N >= 3).subquery('right)
val left = upperCaseData.where('N <= 4).as('left)
val right = upperCaseData.where('N >= 3).as('right)

checkAnswer(
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
}

test("self-join parquet files") {
val x = ParquetTestData.testData.subquery('x)
val y = ParquetTestData.testData.subquery('y)
val x = ParquetTestData.testData.as('x)
val y = ParquetTestData.testData.as('y)
val query = x.join(y).where("x.myint".attr === "y.myint".attr)

// Check to make sure that the attributes from either side of the join have unique expression
Expand Down