Skip to content

Commit 7ded39c

Browse files
ueshingatorsmile
authored andcommitted
[SPARK-19817][SQL] Make it clear that timeZone option is a general option in DataFrameReader/Writer.
## What changes were proposed in this pull request? As timezone setting can also affect partition values, it works for all formats, we should make it clear. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #17281 from ueshin/issues/SPARK-19817.
1 parent 6eac968 commit 7ded39c

File tree

17 files changed

+101
-48
lines changed

17 files changed

+101
-48
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,23 @@ def schema(self, schema):
109109
@since(1.5)
110110
def option(self, key, value):
111111
"""Adds an input option for the underlying data source.
112+
113+
You can set the following option(s) for reading files:
114+
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
115+
in the JSON/CSV datasources or parttion values.
116+
If it isn't set, it uses the default value, session local timezone.
112117
"""
113118
self._jreader = self._jreader.option(key, to_str(value))
114119
return self
115120

116121
@since(1.4)
117122
def options(self, **options):
118123
"""Adds input options for the underlying data source.
124+
125+
You can set the following option(s) for reading files:
126+
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
127+
in the JSON/CSV datasources or parttion values.
128+
If it isn't set, it uses the default value, session local timezone.
119129
"""
120130
for k in options:
121131
self._jreader = self._jreader.option(k, to_str(options[k]))
@@ -159,7 +169,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
159169
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
160170
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
161171
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
162-
timeZone=None, wholeFile=None):
172+
wholeFile=None):
163173
"""
164174
Loads JSON files and returns the results as a :class:`DataFrame`.
165175
@@ -214,8 +224,6 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
214224
formats follow the formats at ``java.text.SimpleDateFormat``.
215225
This applies to timestamp type. If None is set, it uses the
216226
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
217-
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
218-
If None is set, it uses the default value, session local timezone.
219227
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
220228
set, it uses the default value, ``false``.
221229
@@ -234,7 +242,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
234242
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
235243
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
236244
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
237-
timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile)
245+
timestampFormat=timestampFormat, wholeFile=wholeFile)
238246
if isinstance(path, basestring):
239247
path = [path]
240248
if type(path) == list:
@@ -307,7 +315,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
307315
comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
308316
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
309317
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
310-
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
318+
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
311319
columnNameOfCorruptRecord=None, wholeFile=None):
312320
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
313321
@@ -367,8 +375,6 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
367375
uses the default value, ``10``.
368376
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
369377
set, it uses the default value, ``PERMISSIVE``.
370-
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
371-
If None is set, it uses the default value, session local timezone.
372378
373379
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
374380
record, and puts the malformed string into a field configured by \
@@ -399,7 +405,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
399405
nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf,
400406
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
401407
maxCharsPerColumn=maxCharsPerColumn,
402-
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone,
408+
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
403409
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
404410
if isinstance(path, basestring):
405411
path = [path]
@@ -521,13 +527,23 @@ def format(self, source):
521527
@since(1.5)
522528
def option(self, key, value):
523529
"""Adds an output option for the underlying data source.
530+
531+
You can set the following option(s) for writing files:
532+
* ``timeZone``: sets the string that indicates a timezone to be used to format
533+
timestamps in the JSON/CSV datasources or parttion values.
534+
If it isn't set, it uses the default value, session local timezone.
524535
"""
525536
self._jwrite = self._jwrite.option(key, to_str(value))
526537
return self
527538

528539
@since(1.4)
529540
def options(self, **options):
530541
"""Adds output options for the underlying data source.
542+
543+
You can set the following option(s) for writing files:
544+
* ``timeZone``: sets the string that indicates a timezone to be used to format
545+
timestamps in the JSON/CSV datasources or parttion values.
546+
If it isn't set, it uses the default value, session local timezone.
531547
"""
532548
for k in options:
533549
self._jwrite = self._jwrite.option(k, to_str(options[k]))
@@ -619,8 +635,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
619635
self._jwrite.saveAsTable(name)
620636

621637
@since(1.4)
622-
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
623-
timeZone=None):
638+
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
624639
"""Saves the content of the :class:`DataFrame` in JSON format at the specified path.
625640
626641
:param path: the path in any Hadoop supported file system
@@ -641,15 +656,12 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
641656
formats follow the formats at ``java.text.SimpleDateFormat``.
642657
This applies to timestamp type. If None is set, it uses the
643658
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
644-
:param timeZone: sets the string that indicates a timezone to be used to format timestamps.
645-
If None is set, it uses the default value, session local timezone.
646659
647660
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
648661
"""
649662
self.mode(mode)
650663
self._set_opts(
651-
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
652-
timeZone=timeZone)
664+
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
653665
self._jwrite.json(path)
654666

655667
@since(1.4)
@@ -696,7 +708,7 @@ def text(self, path, compression=None):
696708
@since(2.0)
697709
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
698710
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
699-
timestampFormat=None, timeZone=None):
711+
timestampFormat=None):
700712
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
701713
702714
:param path: the path in any Hadoop supported file system
@@ -736,15 +748,13 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
736748
formats follow the formats at ``java.text.SimpleDateFormat``.
737749
This applies to timestamp type. If None is set, it uses the
738750
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
739-
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
740-
If None is set, it uses the default value, session local timezone.
741751
742752
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
743753
"""
744754
self.mode(mode)
745755
self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header,
746756
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
747-
dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone)
757+
dateFormat=dateFormat, timestampFormat=timestampFormat)
748758
self._jwrite.csv(path)
749759

750760
@since(1.5)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ case class CatalogTablePartition(
113113
*/
114114
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
115115
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
116-
val timeZoneId = caseInsensitiveProperties.getOrElse("timeZone", defaultTimeZondId)
116+
val timeZoneId = caseInsensitiveProperties.getOrElse(
117+
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
117118
InternalRow.fromSeq(partitionSchema.map { field =>
118119
Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
119120
})

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2323
import org.apache.commons.lang3.time.FastDateFormat
2424

2525
import org.apache.spark.internal.Logging
26-
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
26+
import org.apache.spark.sql.catalyst.util._
2727

2828
/**
2929
* Options for parsing JSON data into Spark SQL rows.
@@ -69,7 +69,8 @@ private[sql] class JSONOptions(
6969
val columnNameOfCorruptRecord =
7070
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
7171

72-
val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
72+
val timeZone: TimeZone = TimeZone.getTimeZone(
73+
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))
7374

7475
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
7576
val dateFormat: FastDateFormat =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ object DateTimeUtils {
6060
final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
6161
final val MonthOf31Days = Set(1, 3, 5, 7, 8, 10, 12)
6262

63+
val TIMEZONE_OPTION = "timeZone"
64+
6365
def defaultTimeZone(): TimeZone = TimeZone.getDefault()
6466

6567
// Reuse the Calendar object in each thread as it is expensive to create in each method call.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
471471
checkEvaluation(
472472
JsonToStruct(
473473
schema,
474-
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID),
474+
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
475+
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
475476
Literal(jsonData2),
476477
gmtId),
477478
InternalRow(c.getTimeInMillis * 1000L)
@@ -523,14 +524,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
523524

524525
checkEvaluation(
525526
StructToJson(
526-
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get),
527+
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
528+
DateTimeUtils.TIMEZONE_OPTION -> gmtId.get),
527529
struct,
528530
gmtId),
529531
"""{"t":"2016-01-01T00:00:00"}"""
530532
)
531533
checkEvaluation(
532534
StructToJson(
533-
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"),
535+
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
536+
DateTimeUtils.TIMEZONE_OPTION -> "PST"),
534537
struct,
535538
gmtId),
536539
"""{"t":"2015-12-31T16:00:00"}"""

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
7070
/**
7171
* Adds an input option for the underlying data source.
7272
*
73+
* You can set the following option(s):
74+
* <ul>
75+
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
76+
* to be used to parse timestamps in the JSON/CSV datasources or parttion values.</li>
77+
* </ul>
78+
*
7379
* @since 1.4.0
7480
*/
7581
def option(key: String, value: String): DataFrameReader = {
@@ -101,6 +107,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
101107
/**
102108
* (Scala-specific) Adds input options for the underlying data source.
103109
*
110+
* You can set the following option(s):
111+
* <ul>
112+
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
113+
* to be used to parse timestamps in the JSON/CSV datasources or parttion values.</li>
114+
* </ul>
115+
*
104116
* @since 1.4.0
105117
*/
106118
def options(options: scala.collection.Map[String, String]): DataFrameReader = {
@@ -111,6 +123,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
111123
/**
112124
* Adds input options for the underlying data source.
113125
*
126+
* You can set the following option(s):
127+
* <ul>
128+
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
129+
* to be used to parse timestamps in the JSON/CSV datasources or parttion values.</li>
130+
* </ul>
131+
*
114132
* @since 1.4.0
115133
*/
116134
def options(options: java.util.Map[String, String]): DataFrameReader = {
@@ -305,8 +323,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
305323
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
306324
* indicates a timestamp format. Custom date formats follow the formats at
307325
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
308-
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
309-
* to be used to parse timestamps.</li>
310326
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
311327
* per file</li>
312328
* </ul>
@@ -478,8 +494,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
478494
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
479495
* indicates a timestamp format. Custom date formats follow the formats at
480496
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
481-
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
482-
* to be used to parse timestamps.</li>
483497
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
484498
* a record can have.</li>
485499
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
9090
/**
9191
* Adds an output option for the underlying data source.
9292
*
93+
* You can set the following option(s):
94+
* <ul>
95+
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
96+
* to be used to format timestamps in the JSON/CSV datasources or parttion values.</li>
97+
* </ul>
98+
*
9399
* @since 1.4.0
94100
*/
95101
def option(key: String, value: String): DataFrameWriter[T] = {
@@ -121,6 +127,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
121127
/**
122128
* (Scala-specific) Adds output options for the underlying data source.
123129
*
130+
* You can set the following option(s):
131+
* <ul>
132+
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
133+
* to be used to format timestamps in the JSON/CSV datasources or parttion values.</li>
134+
* </ul>
135+
*
124136
* @since 1.4.0
125137
*/
126138
def options(options: scala.collection.Map[String, String]): DataFrameWriter[T] = {
@@ -131,6 +143,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
131143
/**
132144
* Adds output options for the underlying data source.
133145
*
146+
* You can set the following option(s):
147+
* <ul>
148+
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
149+
* to be used to format timestamps in the JSON/CSV datasources or parttion values.</li>
150+
* </ul>
151+
*
134152
* @since 1.4.0
135153
*/
136154
def options(options: java.util.Map[String, String]): DataFrameWriter[T] = {
@@ -457,8 +475,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
457475
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
458476
* indicates a timestamp format. Custom date formats follow the formats at
459477
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
460-
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
461-
* to be used to format timestamps.</li>
462478
* </ul>
463479
*
464480
* @since 1.4.0
@@ -565,8 +581,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
565581
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
566582
* indicates a timestamp format. Custom date formats follow the formats at
567583
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
568-
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
569-
* to be used to format timestamps.</li>
570584
* </ul>
571585
*
572586
* @since 2.0.0

sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ case class OptimizeMetadataOnlyQuery(
105105
val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
106106
val caseInsensitiveProperties =
107107
CaseInsensitiveMap(relation.tableMeta.storage.properties)
108-
val timeZoneId = caseInsensitiveProperties.get("timeZone")
108+
val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION)
109109
.getOrElse(conf.sessionLocalTimeZone)
110110
val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p =>
111111
InternalRow.fromSeq(partAttrs.map { attr =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ object FileFormatWriter extends Logging {
141141
customPartitionLocations = outputSpec.customPartitionLocations,
142142
maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong)
143143
.getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
144-
timeZoneId = caseInsensitiveOptions.get("timeZone")
144+
timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
145145
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
146146
)
147147

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ abstract class PartitioningAwareFileIndex(
127127
}.keys.toSeq
128128

129129
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
130-
val timeZoneId = caseInsensitiveOptions.get("timeZone")
130+
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
131131
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
132132

133133
userPartitionSchema match {

0 commit comments

Comments
 (0)