Skip to content

[WIP][SPARK-2883][SQL]initial support ORC in spark sql #2576

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
fb14a06
initial support orc in spark sql
scwf Sep 29, 2014
ec3cdaf
add unit tests
scwf Sep 29, 2014
7126290
add orc to example of spark sql
scwf Sep 29, 2014
655b23f
merge with apache/master and fix conflict
scwf Oct 4, 2014
1505af4
fix according comments and move orc to hive sub project
scwf Oct 4, 2014
1db30b1
fix scala style
scwf Oct 4, 2014
9529d68
address comments_1
scwf Oct 14, 2014
25416c3
update with apache/spark master and fix conflicts
scwf Oct 14, 2014
89421ef
log.info -> logInfo
scwf Oct 15, 2014
40f9d68
address comments
scwf Oct 16, 2014
b2b5455
Merge branch 'master' of https://github.com/apache/spark into orc
scwf Oct 16, 2014
349bb92
reuse TableReader
scwf Oct 17, 2014
65483e8
fix InsertIntoOrcTable
scwf Oct 17, 2014
629f95e
address comments
scwf Oct 18, 2014
d1bba23
added test for read orc schema
scwf Oct 18, 2014
b46ecb1
add test files
scwf Oct 18, 2014
f68bea0
scala style fixed
scwf Oct 18, 2014
b06e335
revert no related changes
scwf Oct 18, 2014
7bdc503
Merge branch 'test' into orc
scwf Oct 18, 2014
4d0950b
more tests
scwf Oct 18, 2014
37a2192
tests suite fix
scwf Oct 18, 2014
4e1c839
minor fix
scwf Oct 19, 2014
b8495d5
add test case for reading/righting empty RDD
scwf Oct 19, 2014
f680da0
no use method
scwf Oct 19, 2014
269b15f
make wrap consistent with InsertIntoHiveTable.wrapperFor
zhzhan Oct 22, 2014
d06fe5b
Merge pull request #4 from zhzhan/orc1
scwf Oct 23, 2014
9bd3c85
First draft.
marmbrus Nov 14, 2014
5f5fda8
https -> http in pom
scwf Nov 17, 2014
2421f24
Merge branch 'mvn' into parquet
scwf Nov 17, 2014
5d7f863
logging / formatting improvements.
marmbrus Nov 18, 2014
94e0d40
Add an experimental interface to data sources that exposes catalyst e…
marmbrus Nov 18, 2014
dd78aa7
Alternative implementation of parquet based on the datasources API.
marmbrus Nov 18, 2014
244ab59
Merge branch 'newParquet' of https://github.com/marmbrus/spark into p…
scwf Nov 19, 2014
a37f6a8
wf comment
scwf Nov 20, 2014
1d6856e
Merge branch 'master' of https://github.com/apache/spark into mvn
scwf Nov 20, 2014
b8e6f84
Merge branch 'parquet' into newOrc
scwf Nov 20, 2014
e521e6a
draft for datasource api
scwf Nov 20, 2014
9ba04ac
Merge branch 'master' into newOrc
scwf Nov 20, 2014
c90ed2f
test case
scwf Nov 20, 2014
b6ae12b
fix test
scwf Nov 20, 2014
3ca68eb
no used bin file
scwf Nov 20, 2014
abf1b78
update with apache master
scwf Nov 22, 2014
1e0c1d9
Revert "https -> http in pom"
scwf Nov 22, 2014
c5236ef
ppd support
zhzhan Dec 1, 2014
601d242
Merge pull request #13 from zhzhan/orc1
scwf Dec 2, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,96 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
</tr>
</table>

## ORC Files

[ORC](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC) is a columnar format that is supported in Hive, it provides a highly efficient way to store data on HDFS to speed up query performance.
Spark SQL provides support for both reading and writing ORC files that automatically preserves the schema
of the original data.

### Loading Data Programmatically

Using the data from the above example:

<div class="codetabs">

<div data-lang="scala" markdown="1">

{% highlight scala %}
// Use HiveContext to read or write ORC File.
val sqlContext = new HiveContext(sc)
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using ORC.
rdd.registerTempTable("people")
rdd.saveAsOrcFile("people.orc")

// Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
// The result of loading a ORC file is also a SchemaRDD.
val orcFile = hiveContext.orcFile("pair.orc")

//ORC files can also be registered as tables and then used in SQL statements.
orcFile.registerTempTable("orcFile")
val teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

{% highlight java %}
// Use JavaHiveContext to read or write ORC File.
JavaHiveContext sqlContext = new org.apache.spark.sql.hive.api.java.HiveContext(sc);
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.

// JavaSchemaRDDs can be saved as ORC files, maintaining the schema information.
schemaPeople.saveAsOrcFile("people.orc");

// Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
// The result of loading a ORC file is also a JavaSchemaRDD.
JavaSchemaRDD orcFile = sqlContext.orcFile("people.orc");

// ORC files can also be registered as tables and then used in SQL statements.
orcFile.registerTempTable("orcFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
{% endhighlight %}

</div>

<div data-lang="python" markdown="1">

{% highlight python %}
# Use HiveContext to read or write ORC File.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

schemaPeople # The SchemaRDD from the previous example.

# SchemaRDDs can be saved as ORC files, maintaining the schema information.
schemaPeople.saveAsOrcFile("people.orc")

# Read in the ORC file created above. ORC files are self-describing so the schema is preserved.
# The result of loading a ORC file is also a SchemaRDD.
orcFile = sqlContext.orcFile("people.orc")

# ORC files can also be registered as tables and then used in SQL statements.
orcFile.registerTempTable("orcFile");
teenagers = sqlContext.sql("SELECT name FROM orcFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print teenName
{% endhighlight %}

</div>

</div>

## JSON Datasets
<div class="codetabs">

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ package object dsl {

object plans { // scalastyle:ignore
implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) extends LogicalPlanFunctions {
def writeToFile(path: String) = WriteToFile(path, logicalPlan)
def writeToFile(path: String) = WriteToPaquetFile(path, logicalPlan)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ case class CreateTableAsSelect[T](
override lazy val resolved = (databaseName != None && childrenResolved)
}

case class WriteToFile(
case class WriteToPaquetFile(
path: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToPaquetFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
Expand All @@ -73,7 +73,7 @@ private[sql] trait SchemaRDDLike {
* @group schema
*/
def saveAsParquetFile(path: String): Unit = {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
sqlContext.executePlan(WriteToPaquetFile(path, logicalPlan)).toRdd
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

object ParquetOperations extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// TODO: need to support writing to other types of files. Unify the below code paths.
case logical.WriteToFile(path, child) =>
case logical.WriteToPaquetFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration, sqlContext)
// Note: overwrite=false because otherwise the metadata we just created will be deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.Try

import com.google.common.cache.CacheBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path}
import org.apache.hadoop.fs.{FileSystem, BlockLocation, FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat => NewFileOutputFormat}
Expand Down Expand Up @@ -287,7 +287,8 @@ case class InsertIntoParquetTable(
1
} else {
FileSystemHelper
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
.findMaxTaskId(
NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration, "parquet") + 1
}

def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
Expand Down Expand Up @@ -381,6 +382,26 @@ private[parquet] class FilteringParquetRowInputFormat
}
}

override def listStatus(jobContext: JobContext): JList[FileStatus] = {
val conf = ContextUtil.getConfiguration(jobContext)
val paths = NewFileInputFormat.getInputPaths(jobContext)
if (paths.isEmpty) { return new ArrayList[FileStatus]() }
val fs = paths.head.getFileSystem(conf)
val statuses = new ArrayList[FileStatus]
paths.foreach { p =>
val cached = FilteringParquetRowInputFormat.statusCache.getIfPresent(p)
if (cached == null) {
logWarning(s"Status cache miss for $p")
val res = fs.listStatus(p).filterNot(_.getPath.getName.startsWith("_"))
res.foreach(statuses.add)
FilteringParquetRowInputFormat.statusCache.put(p, res)
} else {
cached.foreach(statuses.add)
}
}
statuses
}

override def getFooters(jobContext: JobContext): JList[Footer] = {
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache

Expand Down Expand Up @@ -590,6 +611,10 @@ private[parquet] class FilteringParquetRowInputFormat
}

private[parquet] object FilteringParquetRowInputFormat {
val statusCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.build[Path, Array[FileStatus]]()

private val footerCache = CacheBuilder.newBuilder()
.maximumSize(20000)
.build[FileStatus, Footer]()
Expand All @@ -600,7 +625,7 @@ private[parquet] object FilteringParquetRowInputFormat {
.build[FileStatus, Array[BlockLocation]]()
}

private[parquet] object FileSystemHelper {
private[sql] object FileSystemHelper {
def listFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
Expand All @@ -616,19 +641,37 @@ private[parquet] object FileSystemHelper {
fs.listStatus(path).map(_.getPath)
}

/**
* Finds the maximum taskid in the output file names at the given path.
*/
def findMaxTaskId(pathStr: String, conf: Configuration): Int = {
/**
* List files with special extension
*/
def listFiles(origPath: Path, conf: Configuration, extension: String): Seq[Path] = {
val fs = origPath.getFileSystem(conf)
if (fs == null) {
throw new IllegalArgumentException(
s"Path $origPath is incorrectly formatted")
}
val path = origPath.makeQualified(fs)
if (fs.exists(path)) {
fs.listStatus(path).map(_.getPath).filter(p => p.getName.endsWith(extension))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think FileSystem.globStatus can be convenient and more efficient here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But globStatus does not list the files of the path, here we should list the *.orc or *.parquet files under this dir.

} else {
Seq.empty
}
}

/**
* Finds the maximum taskid in the output file names at the given path.
*/
def findMaxTaskId(pathStr: String, conf: Configuration, extension: String): Int = {
// filename pattern is part-r-<int>.$extension
require(Seq("orc", "parquet").contains(extension), s"Unsupported extension: $extension")
val nameP = new scala.util.matching.Regex(s"""part-r-(\\d{1,}).$extension""", "taskid")
val files = FileSystemHelper.listFiles(pathStr, conf)
// filename pattern is part-r-<int>.parquet
val nameP = new scala.util.matching.Regex("""part-r-(\d{1,}).parquet""", "taskid")
val hiddenFileP = new scala.util.matching.Regex("_.*")
files.map(_.getName).map {
case nameP(taskid) => taskid.toInt
case hiddenFileP() => 0
case other: String =>
sys.error("ERROR: attempting to append to set of Parquet files and found file" +
sys.error(s"ERROR: attempting to append to set of $extension files and found file" +
s"that does not match name pattern: $other")
case _ => 0
}.reduceLeft((a, b) => if (a < b) b else a)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.sql.{Date, Timestamp}

import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.runtime.universe.{TypeTag, typeTag}
import scala.Some

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
Expand All @@ -33,13 +34,18 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.hive.orc.OrcSchemaRDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.catalyst.types.DecimalType
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.execution.{ExtractPythonUdfs, QueryExecutionException, Command => PhysicalCommand}
import org.apache.spark.sql.execution.{Command => PhysicalCommand}
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.plans.logical.NativeCommand
import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand
import org.apache.spark.sql.sources.DataSourceStrategy

Expand Down Expand Up @@ -105,6 +111,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1")
def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)

/**
* Creates a SchemaRDD from an RDD of case classes.
*
* @group userf
*/
implicit override def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
SparkPlan.currentContext.set(self)
val attributeSeq = ScalaReflection.attributesFor[A]
val schema = StructType.fromAttributes(attributeSeq)
val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
new OrcSchemaRDD(this,
LogicalRDD(ScalaReflection.attributesFor[A], rowRDD)(self))
}

/**
* Creates a table using the schema of the given class.
*
Expand All @@ -116,6 +136,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
}

/**
* Loads a ORC file, returning the result as a [[SchemaRDD]].
*
* @group userf
*/
def orcFile(path: String): SchemaRDD = new SchemaRDD(
this, orc.OrcRelation(Seq.empty, path, Some(sparkContext.hadoopConfiguration), this))

/**
* Analyzes the given table in the current database to generate statistics, which will be
* used in query optimizations.
Expand Down Expand Up @@ -344,6 +372,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
HiveCommandStrategy(self),
TakeOrdered,
ParquetOperations,
OrcOperations,
InMemoryScans,
ParquetConversion, // Must be before HiveTableScans
HiveTableScans,
Expand Down
Loading