Skip to content

[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc #10307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
timeZone=None, wholeFile=None):
"""
Loads a JSON file and returns the results as a :class:`DataFrame`.
Loads JSON files and returns the results as a :class:`DataFrame`.

`JSON Lines <http://jsonlines.org/>`_(newline-delimited JSON) is supported by default.
For JSON (one record per file), set the `wholeFile` parameter to ``true``.

If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.

:param path: string represents path to the JSON dataset,
:param path: string represents path to the JSON dataset, or a list of paths,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param primitivesAsString: infers all primitive values as a string type. If None is set,
Expand Down Expand Up @@ -252,7 +252,7 @@ def func(iterator):
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
return self._df(self._jreader.json(jrdd))
else:
raise TypeError("path can be only string or RDD")
raise TypeError("path can be only string, list or RDD")

@since(1.4)
def table(self, tableName):
Expand All @@ -269,7 +269,7 @@ def table(self, tableName):

@since(1.4)
def parquet(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.
"""Loads Parquet files, returning the result as a :class:`DataFrame`.

You can set the following Parquet-specific option(s) for reading Parquet files:
* ``mergeSchema``: sets whether we should merge schemas collected from all \
Expand Down Expand Up @@ -407,15 +407,17 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non

@since(1.5)
def orc(self, path):
"""Loads an ORC file, returning the result as a :class:`DataFrame`.
"""Loads ORC files, returning the result as a :class:`DataFrame`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a test for loading with a list of orc files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in python/pyspark/sql/tests.py


.. note:: Currently ORC support is only available together with Hive support.

>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
return self._df(self._jreader.orc(path))
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))

@since(1.4)
def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None,
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,11 @@ def test_wholefile_csv(self):
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
self.assertEqual(ages_newlines.collect(), expected)

def test_read_multiple_orc_file(self):
df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0",
"python/test_support/sql/orc_partitioned/b=1/c=1"])
self.assertEqual(2, df.count())

def test_udf_with_input_file_name(self):
from pyspark.sql.functions import udf, input_file_name
from pyspark.sql.types import StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

/**
* Loads a JSON file and returns the results as a `DataFrame`.
* Loads JSON files and returns the results as a `DataFrame`.
*
* <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by
* default. For JSON (one record per file), set the `wholeFile` option to true.
Expand Down Expand Up @@ -438,7 +438,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

/**
* Loads a CSV file and returns the result as a `DataFrame`.
* Loads CSV files and returns the result as a `DataFrame`.
*
* This function will go through the input once to determine the input schema if `inferSchema`
* is enabled. To avoid going through the entire data once, disable `inferSchema` option or
Expand Down Expand Up @@ -549,7 +549,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

/**
* Loads an ORC file and returns the result as a `DataFrame`.
* Loads ORC files and returns the result as a `DataFrame`.
*
* @param paths input paths
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.Utils

case class AllDataTypesWithNonPrimitiveType(
stringField: String,
Expand Down Expand Up @@ -611,4 +612,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
}

test("read from multiple orc input paths") {
val path1 = Utils.createTempDir()
val path2 = Utils.createTempDir()
makeOrcFile((1 to 10).map(Tuple1.apply), path1)
makeOrcFile((1 to 10).map(Tuple1.apply), path2)
assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count())
}
}