Skip to content

Fixed coding style issues in Spark SQL #208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql
package catalyst

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.types._

/**
* Provides experimental support for generating catalyst schemas for scala objects.
*/
object ScalaReflection {
import scala.reflect.runtime.universe._

/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case s: StructType =>
s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
}

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T])

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true)))
case t if t <:< typeOf[Seq[_]] =>
val TypeRef(_, _, Seq(elementType)) = t
ArrayType(schemaFor(elementType))
case t if t <:< typeOf[String] => StringType
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
case t if t <:< definitions.DoubleTpe => DoubleType
case t if t <:< definitions.ShortTpe => ShortType
case t if t <:< definitions.ByteTpe => ByteType
}

implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {

/**
* Implicitly added to Sequences of case class objects. Returns a catalyst logical relation
* for the the data in the sequence.
*/
def asRelation: LocalRelation = {
val output = attributesFor[A]
LocalRelation(output, data)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,28 @@

package org.apache.spark.sql.catalyst

import scala.util.matching.Regex
import scala.util.parsing.combinator._
import scala.util.parsing.combinator.lexical.StdLexical
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.input.CharArrayReader.EofCh
import lexical._
import syntactical._
import token._

import analysis._
import expressions._
import plans._
import plans.logical._
import types._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._

/**
* A very simple SQL parser. Based loosly on:
* A very simple SQL parser. Based loosely on:
* https://github.com/stephentu/scala-sql-parser/blob/master/src/main/scala/parser.scala
*
* Limitations:
* - Only supports a very limited subset of SQL.
* - Keywords must be capital.
*
* This is currently included mostly for illustrative purposes. Users wanting more complete support
* for a SQL like language should checkout the HiveQL support in the sql/hive subproject.
* for a SQL like language should checkout the HiveQL support in the sql/hive sub-project.
*/
class SqlParser extends StandardTokenParsers {

def apply(input: String): LogicalPlan = {
phrase(query)(new lexical.Scanner(input)) match {
case Success(r, x) => r
Expand Down Expand Up @@ -196,7 +192,7 @@ class SqlParser extends StandardTokenParsers {

protected lazy val from: Parser[LogicalPlan] = FROM ~> relations

// Based very loosly on the MySQL Grammar.
// Based very loosely on the MySQL Grammar.
// http://dev.mysql.com/doc/refman/5.0/en/join.html
protected lazy val relations: Parser[LogicalPlan] =
relation ~ "," ~ relation ^^ { case r1 ~ _ ~ r2 => Join(r1, r2, Inner, None) } |
Expand Down Expand Up @@ -261,9 +257,9 @@ class SqlParser extends StandardTokenParsers {
andExpression * (OR ^^^ { (e1: Expression, e2: Expression) => Or(e1,e2) })

protected lazy val andExpression: Parser[Expression] =
comparisionExpression * (AND ^^^ { (e1: Expression, e2: Expression) => And(e1,e2) })
comparisonExpression * (AND ^^^ { (e1: Expression, e2: Expression) => And(e1,e2) })

protected lazy val comparisionExpression: Parser[Expression] =
protected lazy val comparisonExpression: Parser[Expression] =
termExpression ~ "=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Equals(e1, e2) } |
termExpression ~ "<" ~ termExpression ^^ { case e1 ~ _ ~ e2 => LessThan(e1, e2) } |
termExpression ~ "<=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => LessThanOrEqual(e1, e2) } |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql
package catalyst
package analysis

import expressions._
import plans.logical._
import rules._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._


/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql
package catalyst
package analysis

import plans.logical.{LogicalPlan, Subquery}
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package analysis

import expressions._
import org.apache.spark.sql.catalyst.expressions.Expression

/** A catalog for looking up user defined functions, used by an [[Analyzer]]. */
trait FunctionRegistry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.sql
package catalyst
package analysis

import expressions._
import plans.logical._
import rules._
import types._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types._

/**
* A collection of [[catalyst.rules.Rule Rules]] that can be used to coerce differing types that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.spark.sql.catalyst
package analysis

import plans.logical.LogicalPlan
import rules._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* A trait that should be mixed into query operators where an single instance might appear multiple
* times in a logical query plan. It is invalid to have multiple copies of the same attribute
* produced by distinct operators in a query tree as this breaks the gurantee that expression
* ids, which are used to differentate attributes, are unique.
* produced by distinct operators in a query tree as this breaks the guarantee that expression
* ids, which are used to differentiate attributes, are unique.
*
* Before analysis, all operators that include this trait will be asked to produce a new version
* of itself with globally unique expression ids.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql
package catalyst

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.sql
package catalyst
package analysis

import expressions._
import plans.logical.BaseRelation
import trees.TreeNode
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
* Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully
Expand Down Expand Up @@ -95,7 +95,7 @@ case class Star(
// If there is no table specified, use all input attributes.
case None => input
// If there is a table, pick out attributes that are part of this table.
case Some(table) => input.filter(_.qualifiers contains table)
case Some(t) => input.filter(_.qualifiers contains t)
}
val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map {
case (n: NamedExpression, _) => n
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,12 @@ package org.apache.spark.sql
package catalyst

import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import analysis.UnresolvedAttribute
import expressions._
import plans._
import plans.logical._
import types._

/**
* Provides experimental support for generating catalyst schemas for scala objects.
*/
object ScalaReflection {
import scala.reflect.runtime.universe._

/** Returns a Sequence of attributes for the given case class type. */
def attributesFor[T: TypeTag]: Seq[Attribute] = schemaFor[T] match {
case s: StructType =>
s.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
}

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor[T: TypeTag]: DataType = schemaFor(typeOf[T])

/** Returns a catalyst DataType for the given Scala Type using reflection. */
def schemaFor(tpe: `Type`): DataType = tpe match {
case t if t <:< typeOf[Product] =>
val params = t.member("<init>": TermName).asMethod.paramss
StructType(
params.head.map(p => StructField(p.name.toString, schemaFor(p.typeSignature), true)))
case t if t <:< typeOf[Seq[_]] =>
val TypeRef(_, _, Seq(elementType)) = t
ArrayType(schemaFor(elementType))
case t if t <:< typeOf[String] => StringType
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
case t if t <:< definitions.DoubleTpe => DoubleType
case t if t <:< definitions.ShortTpe => ShortType
case t if t <:< definitions.ByteTpe => ByteType
}

implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {

/**
* Implicitly added to Sequences of case class objects. Returns a catalyst logical relation
* for the the data in the sequence.
*/
def asRelation: LocalRelation = {
val output = attributesFor[A]
LocalRelation(output, data)
}
}
}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types._

/**
* A collection of implicit conversions that create a DSL for constructing catalyst data structures.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package org.apache.spark.sql
package catalyst

import trees._
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
* Functions for attaching and retrieving trees that are associated with errors.
*/
package object errors {

class TreeNodeException[TreeType <: TreeNode[_]]
(tree: TreeType, msg: String, cause: Throwable) extends Exception(msg, cause) {
class TreeNodeException[TreeType <: TreeNode[_]](
tree: TreeType, msg: String, cause: Throwable)
extends Exception(msg, cause) {

// Yes, this is the same as a default parameter, but... those don't seem to work with SBT
// external project dependencies for some reason.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ package org.apache.spark.sql
package catalyst
package expressions

import rules._
import errors._

import catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.rules.Rule

/**
* A bound reference points to a specific slot in the input tuple, allowing the actual value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package expressions

import types._
import org.apache.spark.sql.catalyst.types._

/** Cast the child expression to the target data type. */
case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
Expand All @@ -40,7 +40,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
case (StringType, ShortType) => a: Any => castOrNull(a, _.toShort)
case (StringType, ByteType) => a: Any => castOrNull(a, _.toByte)
case (StringType, DecimalType) => a: Any => castOrNull(a, BigDecimal(_))
case (BooleanType, ByteType) => a: Any => a match {
case (BooleanType, ByteType) => {
case null => null
case true => 1.toByte
case false => 0.toByte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.sql
package catalyst
package expressions

import errors._
import trees._
import types._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.{DataType, FractionalType, IntegralType, NumericType}
import org.apache.spark.sql.catalyst.errors.TreeNodeException

abstract class Expression extends TreeNode[Expression] {
self: Product =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package expressions

import types.DoubleType
import org.apache.spark.sql.catalyst.types.DoubleType

case object Rand extends LeafExpression {
def dataType = DoubleType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package expressions

import types._
import org.apache.spark.sql.catalyst.types.NativeType

/**
* Represents one row of output from a relational operator. Allows both generic access by ordinal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql
package catalyst
package expressions

import types._
import org.apache.spark.sql.catalyst.types.DataType

case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
extends Expression {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package expressions

import scala.language.dynamics

import types._
import org.apache.spark.sql.catalyst.types.DataType

case object DynamicType extends DataType

Expand Down
Loading