Skip to content

Commit 02aa499

Browse files
HyukjinKwonrxin
authored andcommitted
[SPARK-13509][SPARK-13507][SQL] Support for writing CSV with a single function call
https://issues.apache.org/jira/browse/SPARK-13507 https://issues.apache.org/jira/browse/SPARK-13509 ## What changes were proposed in this pull request? This PR adds the support to write CSV data directly by a single call to the given path. Several unitests were added for each functionality. ## How was this patch tested? This was tested with unittests and with `dev/run_tests` for coding style Author: hyukjinkwon <gurwls223@gmail.com> Author: Hyukjin Kwon <gurwls223@gmail.com> Closes #11389 from HyukjinKwon/SPARK-13507-13509.
1 parent 916fc34 commit 02aa499

File tree

6 files changed

+80
-10
lines changed

6 files changed

+80
-10
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,23 @@ def text(self, paths):
233233
paths = [paths]
234234
return self._df(self._jreader.text(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
235235

236+
@since(2.0)
237+
def csv(self, paths):
238+
"""Loads a CSV file and returns the result as a [[DataFrame]].
239+
240+
This function goes through the input once to determine the input schema. To avoid going
241+
through the entire data once, specify the schema explicitly using [[schema]].
242+
243+
:param paths: string, or list of strings, for input path(s).
244+
245+
>>> df = sqlContext.read.csv('python/test_support/sql/ages.csv')
246+
>>> df.dtypes
247+
[('C0', 'string'), ('C1', 'string')]
248+
"""
249+
if isinstance(paths, basestring):
250+
paths = [paths]
251+
return self._df(self._jreader.csv(self._sqlContext._sc._jvm.PythonUtils.toSeq(paths)))
252+
236253
@since(1.5)
237254
def orc(self, path):
238255
"""Loads an ORC file, returning the result as a :class:`DataFrame`.
@@ -448,6 +465,11 @@ def json(self, path, mode=None):
448465
* ``ignore``: Silently ignore this operation if data already exists.
449466
* ``error`` (default case): Throw an exception if data already exists.
450467
468+
You can set the following JSON-specific option(s) for writing JSON files:
469+
* ``compression`` (default ``None``): compression codec to use when saving to file.
470+
This can be one of the known case-insensitive shorten names
471+
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
472+
451473
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
452474
"""
453475
self.mode(mode)._jwrite.json(path)
@@ -476,11 +498,39 @@ def parquet(self, path, mode=None, partitionBy=None):
476498
def text(self, path):
477499
"""Saves the content of the DataFrame in a text file at the specified path.
478500
501+
:param path: the path in any Hadoop supported file system
502+
479503
The DataFrame must have only one column that is of string type.
480504
Each row becomes a new line in the output file.
505+
506+
You can set the following option(s) for writing text files:
507+
* ``compression`` (default ``None``): compression codec to use when saving to file.
508+
This can be one of the known case-insensitive shorten names
509+
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
481510
"""
482511
self._jwrite.text(path)
483512

513+
@since(2.0)
514+
def csv(self, path, mode=None):
515+
"""Saves the content of the [[DataFrame]] in CSV format at the specified path.
516+
517+
:param path: the path in any Hadoop supported file system
518+
:param mode: specifies the behavior of the save operation when data already exists.
519+
520+
* ``append``: Append contents of this :class:`DataFrame` to existing data.
521+
* ``overwrite``: Overwrite existing data.
522+
* ``ignore``: Silently ignore this operation if data already exists.
523+
* ``error`` (default case): Throw an exception if data already exists.
524+
525+
You can set the following CSV-specific option(s) for writing CSV files:
526+
* ``compression`` (default ``None``): compression codec to use when saving to file.
527+
This can be one of the known case-insensitive shorten names
528+
(``bzip2``, ``gzip``, ``lz4``, and ``snappy``).
529+
530+
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
531+
"""
532+
self.mode(mode)._jwrite.csv(path)
533+
484534
@since(1.5)
485535
def orc(self, path, mode=None, partitionBy=None):
486536
"""Saves the content of the :class:`DataFrame` in ORC format at the specified path.

python/test_support/sql/ages.csv

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Joe,20
2+
Tom,30
3+
Hyukjin,25
4+

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,10 @@ final class DataFrameWriter private[sql](df: DataFrame) {
453453
* format("json").save(path)
454454
* }}}
455455
*
456+
* You can set the following JSON-specific option(s) for writing JSON files:
457+
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
458+
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
459+
*
456460
* @since 1.4.0
457461
*/
458462
def json(path: String): Unit = format("json").save(path)
@@ -492,10 +496,29 @@ final class DataFrameWriter private[sql](df: DataFrame) {
492496
* df.write().text("/path/to/output")
493497
* }}}
494498
*
499+
* You can set the following option(s) for writing text files:
500+
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
501+
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
502+
*
495503
* @since 1.6.0
496504
*/
497505
def text(path: String): Unit = format("text").save(path)
498506

507+
/**
508+
* Saves the content of the [[DataFrame]] in CSV format at the specified path.
509+
* This is equivalent to:
510+
* {{{
511+
* format("csv").save(path)
512+
* }}}
513+
*
514+
* You can set the following CSV-specific option(s) for writing CSV files:
515+
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
516+
* one of the known case-insensitive shorten names (`bzip2`, `gzip`, `lz4`, and `snappy`). </li>
517+
*
518+
* @since 2.0.0
519+
*/
520+
def csv(path: String): Unit = format("csv").save(path)
521+
499522
///////////////////////////////////////////////////////////////////////////////////////
500523
// Builder pattern config options
501524
///////////////////////////////////////////////////////////////////////////////////////

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ private[sql] class JSONOptions(
4848
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
4949
val allowBackslashEscapingAnyCharacter =
5050
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
51-
val compressionCodec = {
52-
val name = parameters.get("compression").orElse(parameters.get("codec"))
53-
name.map(CompressionCodecs.getCodecClassName)
54-
}
51+
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
5552

5653
/** Sets config options on a Jackson [[JsonFactory]]. */
5754
def setJacksonOptions(factory: JsonFactory): Unit = {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,7 @@ private[sql] class TextRelation(
115115
/** Write path. */
116116
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
117117
val conf = job.getConfiguration
118-
val compressionCodec = {
119-
val name = parameters.get("compression").orElse(parameters.get("codec"))
120-
name.map(CompressionCodecs.getCodecClassName)
121-
}
118+
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
122119
compressionCodec.foreach { codec =>
123120
CompressionCodecs.setCodecConfiguration(conf, codec)
124121
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,8 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
268268
.load(testFile(carsFile))
269269

270270
cars.coalesce(1).write
271-
.format("csv")
272271
.option("header", "true")
273-
.save(csvDir)
272+
.csv(csvDir)
274273

275274
val carsCopy = sqlContext.read
276275
.format("csv")

0 commit comments

Comments
 (0)