Skip to content

[SQL] Decrease partitions when testing #2164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package org.apache.spark.sql.test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLConf, SQLContext}

/** A SQLContext that can be used for local testing. */
object TestSQLContext
extends SQLContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
extends SQLContext(new SparkContext("local[2]", "TestSQLContext", new SparkConf())) {

/** Fewer partitions to speed up testing. */
override private[spark] def numShufflePartitions: Int =
getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ case class AllDataTypes(
doubleField: Double,
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte])
booleanField: Boolean)

case class AllDataTypesWithNonPrimitiveType(
stringField: String,
Expand All @@ -70,13 +69,14 @@ case class AllDataTypesWithNonPrimitiveType(
shortField: Short,
byteField: Byte,
booleanField: Boolean,
binaryField: Array[Byte],
array: Seq[Int],
arrayContainsNull: Seq[Option[Int]],
map: Map[Int, Long],
mapValueContainsNull: Map[Int, Option[Long]],
data: Data)

case class BinaryData(binaryData: Array[Byte])

class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll {
TestData // Load test data tables.

Expand Down Expand Up @@ -108,26 +108,26 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
test("Read/Write All Types") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
i =>
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
assert(result(i).getInt(1) === i)
assert(result(i).getLong(2) === i.toLong)
assert(result(i).getFloat(3) === i.toFloat)
assert(result(i).getDouble(4) === i.toDouble)
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
}
val data = sparkContext.parallelize(range)
.map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0))

data.saveAsParquetFile(tempDir)

checkAnswer(
parquetFile(tempDir),
data.toSchemaRDD.collect().toSeq)
}

test("Treat binary as string") {
test("read/write binary data") {
// Since equality for Array[Byte] is broken we test this separately.
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).saveAsParquetFile(tempDir)
parquetFile(tempDir)
.map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8"))
.collect().toSeq == Seq("test")
}

ignore("Treat binary as string") {
val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString

// Create the test file.
Expand All @@ -142,37 +142,16 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
StructField("c2", BinaryType, false) :: Nil)
val schemaRDD1 = applySchema(rowRDD, schema)
schemaRDD1.saveAsParquetFile(path)
val resultWithBinary = parquetFile(path).collect
range.foreach {
i =>
assert(resultWithBinary(i).getInt(0) === i)
assert(resultWithBinary(i)(1) === s"val_$i".getBytes)
}

TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
// This ParquetRelation always use Parquet types to derive output.
val parquetRelation = new ParquetRelation(
path.toString,
Some(TestSQLContext.sparkContext.hadoopConfiguration),
TestSQLContext) {
override val output =
ParquetTypesConverter.convertToAttributes(
ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema,
TestSQLContext.isParquetBinaryAsString)
}
val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation)
val resultWithString = schemaRDD.collect
range.foreach {
i =>
assert(resultWithString(i).getInt(0) === i)
assert(resultWithString(i)(1) === s"val_$i")
}
checkAnswer(
parquetFile(path).select('c1, 'c2.cast(StringType)),
schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)

schemaRDD.registerTempTable("tmp")
setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true")
parquetFile(path).printSchema()
checkAnswer(
sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"),
(5, "val_5") ::
(7, "val_7") :: Nil)
parquetFile(path),
schemaRDD1.select('c1, 'c2.cast(StringType)).collect().toSeq)


// Set it back.
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
Expand Down Expand Up @@ -275,34 +254,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
test("Read/Write All Types with non-primitive type") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)
TestSQLContext.sparkContext.parallelize(range)
val data = sparkContext.parallelize(range)
.map(x => AllDataTypesWithNonPrimitiveType(
s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0,
(0 to x).map(_.toByte).toArray,
(0 until x),
(0 until x).map(Option(_).filter(_ % 3 == 0)),
(0 until x).map(i => i -> i.toLong).toMap,
(0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None),
Data((0 until x), Nested(x, s"$x"))))
.saveAsParquetFile(tempDir)
val result = parquetFile(tempDir).collect()
range.foreach {
i =>
assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}")
assert(result(i).getInt(1) === i)
assert(result(i).getLong(2) === i.toLong)
assert(result(i).getFloat(3) === i.toFloat)
assert(result(i).getDouble(4) === i.toDouble)
assert(result(i).getShort(5) === i.toShort)
assert(result(i).getByte(6) === i.toByte)
assert(result(i).getBoolean(7) === (i % 2 == 0))
assert(result(i)(8) === (0 to i).map(_.toByte).toArray)
assert(result(i)(9) === (0 until i))
assert(result(i)(10) === (0 until i).map(i => if (i % 3 == 0) i else null))
assert(result(i)(11) === (0 until i).map(i => i -> i.toLong).toMap)
assert(result(i)(12) === (0 until i).map(i => i -> i.toLong).toMap + (i -> null))
assert(result(i)(13) === new GenericRow(Array[Any]((0 until i), new GenericRow(Array[Any](i, s"$i")))))
}
data.saveAsParquetFile(tempDir)

checkAnswer(
parquetFile(tempDir),
data.toSchemaRDD.collect().toSeq)
}

test("self-join parquet files") {
Expand Down Expand Up @@ -399,23 +363,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
}

test("Saving case class RDD table to file and reading it back in") {
val file = getTempFilePath("parquet")
val path = file.toString
val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))
rdd.saveAsParquetFile(path)
val readFile = parquetFile(path)
readFile.registerTempTable("tmpx")
val rdd_copy = sql("SELECT * FROM tmpx").collect()
val rdd_orig = rdd.collect()
for(i <- 0 to 99) {
assert(rdd_copy(i).apply(0) === rdd_orig(i).key, s"key error in line $i")
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value error in line $i")
}
Utils.deleteRecursively(file)
}

test("Read a parquet file instead of a directory") {
val file = getTempFilePath("parquet")
val path = file.toString
Expand Down Expand Up @@ -448,32 +395,19 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
val rdd_copy1 = sql("SELECT * FROM dest").collect()
assert(rdd_copy1.size === 100)
assert(rdd_copy1(0).apply(0) === 1)
assert(rdd_copy1(0).apply(1) === "val_1")
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
// executed twice otherwise?!

sql("INSERT INTO dest SELECT * FROM source")
val rdd_copy2 = sql("SELECT * FROM dest").collect()
val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
assert(rdd_copy2.size === 200)
assert(rdd_copy2(0).apply(0) === 1)
assert(rdd_copy2(0).apply(1) === "val_1")
assert(rdd_copy2(99).apply(0) === 100)
assert(rdd_copy2(99).apply(1) === "val_100")
assert(rdd_copy2(100).apply(0) === 1)
assert(rdd_copy2(100).apply(1) === "val_1")
Utils.deleteRecursively(dirname)
}

test("Insert (appending) to same table via Scala API") {
// TODO: why does collecting break things? It seems InsertIntoParquet::execute() is
// executed twice otherwise?!
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
assert(double_rdd != null)
assert(double_rdd.size === 30)
for(i <- (0 to 14)) {
assert(double_rdd(i) === double_rdd(i+15), s"error: lines $i and ${i+15} to not match")
}

// let's restore the original test data
Utils.deleteRecursively(ParquetTestData.testDir)
ParquetTestData.writeFile()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.SQLConf

/* Implicit conversions */
import scala.collection.JavaConversions._

object TestHive
extends TestHiveContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
extends TestHiveContext(new SparkContext("local[2]", "TestSQLContext", new SparkConf()))

/**
* A locally running test instance of Spark's Hive execution engine.
Expand Down Expand Up @@ -90,6 +91,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
override def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

/** Fewer partitions to speed up testing. */
override private[spark] def numShufflePartitions: Int =
getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt

/**
* Returns the value of specified environmental variable as a [[java.io.File]] after checking
* to ensure it exists
Expand Down