Skip to content

Commit 9265366

Browse files
committed
Fixed coding style issues in sql/core
1 parent 3dcbbbd commit 9265366

21 files changed

+100
-148
lines changed

sql/core/src/main/scala/org/apache/spark/rdd/PartitionLocalRDDFunctions.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.reflect._
2323
import scala.collection.mutable.ArrayBuffer
2424

2525
import org.apache.spark._
26-
import org.apache.spark.Aggregator
2726
import org.apache.spark.SparkContext._
2827
import org.apache.spark.util.collection.AppendOnlyMap
2928

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,13 @@ package org.apache.spark.sql
2020
import scala.language.implicitConversions
2121
import scala.reflect.runtime.universe.TypeTag
2222

23-
import org.apache.spark.{SparkContext, SparkConf}
23+
import org.apache.spark.SparkContext
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.analysis._
2626
import org.apache.spark.sql.catalyst.dsl
2727
import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.optimizer.Optimizer
29-
import org.apache.spark.sql.catalyst.planning.QueryPlanner
30-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand, WriteToFile}
29+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3130
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3231
import org.apache.spark.sql.execution._
3332

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.{OneToOneDependency, Dependency, Partition, TaskContext}
2120
import org.apache.spark.rdd.RDD
2221
import org.apache.spark.sql.catalyst.analysis._
2322
import org.apache.spark.sql.catalyst.expressions._
24-
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
2523
import org.apache.spark.sql.catalyst.plans.logical._
24+
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
2625
import org.apache.spark.sql.catalyst.types.BooleanType
26+
import org.apache.spark.{OneToOneDependency, Dependency, Partition, TaskContext}
2727

2828
/**
2929
* <span class="badge" style="float: right; background-color: darkblue;">ALPHA COMPONENT</span>

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql
1919
package execution
2020

21-
import java.nio.ByteBuffer
2221

2322
import com.esotericsoftware.kryo.{Kryo, Serializer}
2423
import com.esotericsoftware.kryo.io.{Output, Input}
@@ -28,16 +27,16 @@ import org.apache.spark.rdd.ShuffledRDD
2827
import org.apache.spark.serializer.KryoSerializer
2928
import org.apache.spark.util.MutablePair
3029

31-
import catalyst.rules.Rule
32-
import catalyst.errors._
33-
import catalyst.expressions._
34-
import catalyst.plans.physical._
30+
import org.apache.spark.sql.catalyst.rules.Rule
31+
import org.apache.spark.sql.catalyst.errors._
32+
import org.apache.spark.sql.catalyst.expressions._
33+
import org.apache.spark.sql.catalyst.plans.physical._
3534

3635
private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
3736
override def newKryo(): Kryo = {
3837
val kryo = new Kryo
3938
kryo.setRegistrationRequired(true)
40-
kryo.register(classOf[MutablePair[_,_]])
39+
kryo.register(classOf[MutablePair[_, _]])
4140
kryo.register(classOf[Array[Any]])
4241
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
4342
kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
@@ -52,7 +51,7 @@ private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
5251
private class BigDecimalSerializer extends Serializer[BigDecimal] {
5352
def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
5453
// TODO: There are probably more efficient representations than strings...
55-
output.writeString(bd.toString)
54+
output.writeString(bd.toString())
5655
}
5756

5857
def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
@@ -68,7 +67,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
6867

6968
def execute() = attachTree(this , "execute") {
7069
newPartitioning match {
71-
case HashPartitioning(expressions, numPartitions) => {
70+
case HashPartitioning(expressions, numPartitions) =>
7271
// TODO: Eliminate redundant expressions in grouping key and value.
7372
val rdd = child.execute().mapPartitions { iter =>
7473
val hashExpressions = new MutableProjection(expressions)
@@ -79,8 +78,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
7978
val shuffled = new ShuffledRDD[Row, Row, MutablePair[Row, Row]](rdd, part)
8079
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
8180
shuffled.map(_._2)
82-
}
83-
case RangePartitioning(sortingExpressions, numPartitions) => {
81+
82+
case RangePartitioning(sortingExpressions, numPartitions) =>
8483
// TODO: RangePartitioner should take an Ordering.
8584
implicit val ordering = new RowOrdering(sortingExpressions)
8685

@@ -93,9 +92,8 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
9392
shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
9493

9594
shuffled.map(_._1)
96-
}
9795
case SinglePartition =>
98-
child.execute().coalesce(1, true)
96+
child.execute().coalesce(1, shuffle = true)
9997

10098
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
10199
// TODO: Handle BroadcastPartitioning.

sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
package org.apache.spark.sql
1919
package execution
2020

21-
import catalyst.expressions._
22-
import catalyst.types._
21+
import org.apache.spark.sql.catalyst.expressions._
2322

2423
/**
2524
* Applies a [[catalyst.expressions.Generator Generator]] to a stream of input rows, combining the

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ package execution
2020

2121
import org.apache.spark.SparkContext
2222

23-
import catalyst.expressions._
24-
import catalyst.planning._
25-
import catalyst.plans._
26-
import catalyst.plans.logical.LogicalPlan
27-
import catalyst.plans.physical._
28-
import parquet.ParquetRelation
29-
import parquet.InsertIntoParquetTable
23+
import org.apache.spark.sql.catalyst.expressions._
24+
import org.apache.spark.sql.catalyst.planning._
25+
import org.apache.spark.sql.catalyst.plans._
26+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
27+
import org.apache.spark.sql.catalyst.plans.physical._
28+
import org.apache.spark.sql.parquet.ParquetRelation
29+
import org.apache.spark.sql.parquet.InsertIntoParquetTable
3030

3131
abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3232

@@ -172,7 +172,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
172172

173173
// Can we automate these 'pass through' operations?
174174
object BasicOperators extends Strategy {
175-
// TOOD: Set
175+
// TODO: Set
176176
val numPartitions = 200
177177
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
178178
case logical.Distinct(child) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/aggregates.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@ package org.apache.spark.sql
1919
package execution
2020

2121
import org.apache.spark.SparkContext
22-
23-
import catalyst.errors._
24-
import catalyst.expressions._
25-
import catalyst.plans.physical.{UnspecifiedDistribution, ClusteredDistribution, AllTuples}
26-
import catalyst.types._
27-
2822
import org.apache.spark.rdd.PartitionLocalRDDFunctions._
23+
import org.apache.spark.sql.catalyst.errors._
24+
import org.apache.spark.sql.catalyst.expressions._
25+
import org.apache.spark.sql.catalyst.plans.physical._
2926

3027
/**
3128
* Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,17 @@ import scala.reflect.runtime.universe.TypeTag
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.SparkContext
2525

26-
import catalyst.errors._
27-
import catalyst.expressions._
28-
import catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution}
29-
import catalyst.plans.logical.LogicalPlan
30-
import catalyst.ScalaReflection
26+
import org.apache.spark.sql.catalyst.errors._
27+
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.catalyst.plans.physical.{UnspecifiedDistribution, OrderedDistribution}
29+
import org.apache.spark.sql.catalyst.ScalaReflection
3130

3231
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
3332
def output = projectList.map(_.toAttribute)
3433

3534
def execute() = child.execute().mapPartitions { iter =>
36-
@transient val resuableProjection = new MutableProjection(projectList)
37-
iter.map(resuableProjection)
35+
@transient val reusableProjection = new MutableProjection(projectList)
36+
iter.map(reusableProjection)
3837
}
3938
}
4039

sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import scala.collection.mutable
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.SparkContext
2525

26-
import catalyst.errors._
27-
import catalyst.expressions._
28-
import catalyst.plans._
29-
import catalyst.plans.physical.{ClusteredDistribution, Partitioning}
26+
import org.apache.spark.sql.catalyst.errors._
27+
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.catalyst.plans._
29+
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning}
3030

3131
import org.apache.spark.rdd.PartitionLocalRDDFunctions._
3232

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,28 +19,28 @@ package org.apache.spark.sql.parquet
1919

2020
import java.io.{IOException, FileNotFoundException}
2121

22-
import org.apache.hadoop.fs.{Path, FileSystem}
22+
import scala.collection.JavaConversions._
23+
2324
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.mapreduce.Job
2525
import org.apache.hadoop.fs.permission.FsAction
26+
import org.apache.hadoop.fs.{Path, FileSystem}
27+
import org.apache.hadoop.mapreduce.Job
2628

27-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, BaseRelation}
28-
import org.apache.spark.sql.catalyst.types._
29-
import org.apache.spark.sql.catalyst.types.ArrayType
30-
import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference, Attribute}
31-
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
32-
33-
import parquet.schema.{MessageTypeParser, MessageType}
29+
import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
30+
import parquet.hadoop.util.ContextUtil
31+
import parquet.hadoop.{Footer, ParquetFileWriter, ParquetFileReader}
32+
import parquet.io.api.{Binary, RecordConsumer}
3433
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
34+
import parquet.schema.Type.Repetition
35+
import parquet.schema.{MessageTypeParser, MessageType}
3536
import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
3637
import parquet.schema.{Type => ParquetType}
37-
import parquet.schema.Type.Repetition
38-
import parquet.io.api.{Binary, RecordConsumer}
39-
import parquet.hadoop.{Footer, ParquetFileWriter, ParquetFileReader}
40-
import parquet.hadoop.metadata.{FileMetaData, ParquetMetadata}
41-
import parquet.hadoop.util.ContextUtil
4238

43-
import scala.collection.JavaConversions._
39+
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
40+
import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference, Attribute}
41+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, BaseRelation}
42+
import org.apache.spark.sql.catalyst.types.ArrayType
43+
import org.apache.spark.sql.catalyst.types._
4444

4545
/**
4646
* Relation that consists of data stored in a Parquet columnar format.
@@ -55,7 +55,7 @@ import scala.collection.JavaConversions._
5555
* @param tableName The name of the relation that can be used in queries.
5656
* @param path The path to the Parquet file.
5757
*/
58-
case class ParquetRelation(val tableName: String, val path: String) extends BaseRelation {
58+
case class ParquetRelation(tableName: String, path: String) extends BaseRelation {
5959

6060
/** Schema derived from ParquetFile **/
6161
def parquetSchema: MessageType =
@@ -145,11 +145,10 @@ object ParquetTypesConverter {
145145
case ParquetPrimitiveTypeName.FLOAT => FloatType
146146
case ParquetPrimitiveTypeName.INT32 => IntegerType
147147
case ParquetPrimitiveTypeName.INT64 => LongType
148-
case ParquetPrimitiveTypeName.INT96 => {
148+
case ParquetPrimitiveTypeName.INT96 =>
149149
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
150150
sys.error("Warning: potential loss of precision: converting INT96 to long")
151151
LongType
152-
}
153152
case _ => sys.error(
154153
s"Unsupported parquet datatype $parquetType")
155154
}
@@ -186,11 +185,10 @@ object ParquetTypesConverter {
186185

187186
def convertToAttributes(parquetSchema: MessageType) : Seq[Attribute] = {
188187
parquetSchema.getColumns.map {
189-
case (desc) => {
188+
case (desc) =>
190189
val ctype = toDataType(desc.getType)
191190
val name: String = desc.getPath.mkString(".")
192191
new AttributeReference(name, ctype, false)()
193-
}
194192
}
195193
}
196194

@@ -245,7 +243,7 @@ object ParquetTypesConverter {
245243
* Try to read Parquet metadata at the given Path. We first see if there is a summary file
246244
* in the parent directory. If so, this is used. Else we read the actual footer at the given
247245
* location.
248-
* @param path The path at which we expect one (or more) Parquet files.
246+
* @param origPath The path at which we expect one (or more) Parquet files.
249247
* @return The `ParquetMetadata` containing among other things the schema.
250248
*/
251249
def readMetaData(origPath: Path): ParquetMetadata = {

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,24 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20+
import java.io.IOException
21+
import java.text.SimpleDateFormat
22+
import java.util.Date
23+
24+
import org.apache.hadoop.conf.Configuration
25+
import org.apache.hadoop.fs.Path
26+
import org.apache.hadoop.mapreduce._
27+
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
28+
29+
import parquet.hadoop.util.ContextUtil
30+
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
2031
import parquet.io.InvalidRecordException
2132
import parquet.schema.MessageType
22-
import parquet.hadoop.{ParquetOutputFormat, ParquetInputFormat}
23-
import parquet.hadoop.util.ContextUtil
2433

2534
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.{TaskContext, SerializableWritable, SparkContext}
2735
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute, Expression}
2836
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, LeafNode}
29-
30-
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
31-
import org.apache.hadoop.mapreduce._
32-
import org.apache.hadoop.conf.Configuration
33-
import org.apache.hadoop.fs.Path
34-
35-
import java.io.IOException
36-
import java.text.SimpleDateFormat
37-
import java.util.Date
37+
import org.apache.spark.{TaskContext, SerializableWritable, SparkContext}
3838

3939
/**
4040
* Parquet table scan operator. Imports the file that backs the given

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ package org.apache.spark.sql.parquet
1919

2020
import org.apache.hadoop.conf.Configuration
2121

22-
import org.apache.spark.Logging
23-
22+
import parquet.column.ParquetProperties
23+
import parquet.hadoop.ParquetOutputFormat
24+
import parquet.hadoop.api.ReadSupport.ReadContext
25+
import parquet.hadoop.api.{WriteSupport, ReadSupport}
2426
import parquet.io.api._
2527
import parquet.schema.{MessageTypeParser, MessageType}
26-
import parquet.hadoop.api.{WriteSupport, ReadSupport}
27-
import parquet.hadoop.api.ReadSupport.ReadContext
28-
import parquet.hadoop.ParquetOutputFormat
29-
import parquet.column.ParquetProperties
3028

29+
import org.apache.spark.Logging
3130
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
3231
import org.apache.spark.sql.catalyst.types._
3332

@@ -95,8 +94,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
9594
}
9695

9796
def getSchema(configuration: Configuration): MessageType = {
98-
return MessageTypeParser.parseMessageType(
99-
configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA))
97+
MessageTypeParser.parseMessageType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA))
10098
}
10199

102100
private var schema: MessageType = null
@@ -108,7 +106,7 @@ class RowWriteSupport extends WriteSupport[Row] with Logging {
108106
attributes = ParquetTypesConverter.convertToAttributes(schema)
109107
new WriteSupport.WriteContext(
110108
schema,
111-
new java.util.HashMap[java.lang.String, java.lang.String]());
109+
new java.util.HashMap[java.lang.String, java.lang.String]())
112110
}
113111

114112
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {

0 commit comments

Comments
 (0)