Skip to content

Commit bd14da6

Browse files
MaxGekkHyukjinKwon
authored andcommitted
[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files
## What changes were proposed in this pull request? I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option: ``` spark.read.schema(schema) .option("multiline", "true") .option("encoding", "UTF-16LE") .json(fileName) ``` If the option is not specified, charset auto-detection mechanism is used by default. The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in `UTF-8` charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like `.option("charset", "UTF-16BE")`. By default the output charset is still `UTF-8` to keep backward compatibility. The solution has the following restrictions for per-line mode (`multiline = false`): - If charset is different from UTF-8, the lineSep option must be specified. The option required because Hadoop LineReader cannot detect the line separator correctly. Here is the ticket for solving the issue: https://issues.apache.org/jira/browse/SPARK-23725 - Encoding with [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) are not supported. For example, the `UTF-16` and `UTF-32` encodings are blacklisted. The problem can be solved by MaxGekk#2 ## How was this patch tested? I added the following tests: - reads an json file in `UTF-16LE` encoding with BOM in `multiline` mode - read json file by using charset auto detection (`UTF-32BE` with BOM) - read json file using of user's charset (`UTF-16LE`) - saving in `UTF-32BE` and read the result by standard library (not by Spark) - checking that default charset is `UTF-8` - handling wrong (unsupported) charset Author: Maxim Gekk <maxim.gekk@databricks.com> Author: Maxim Gekk <max.gekk@gmail.com> Closes #20937 from MaxGekk/json-encoding-line-sep.
1 parent 4df5136 commit bd14da6

File tree

16 files changed

+599
-44
lines changed

16 files changed

+599
-44
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
176176
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
177177
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
178178
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
179-
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None):
179+
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
180+
encoding=None):
180181
"""
181182
Loads JSON files and returns the results as a :class:`DataFrame`.
182183
@@ -237,6 +238,10 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
237238
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
238239
characters (ASCII characters with value less than 32,
239240
including tab and line feed characters) or not.
241+
:param encoding: allows to forcibly set one of standard basic or extended encoding for
242+
the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
243+
the encoding of input JSON will be detected automatically
244+
when the multiLine option is set to ``true``.
240245
:param lineSep: defines the line separator that should be used for parsing. If None is
241246
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
242247
:param samplingRatio: defines fraction of input JSON objects used for schema inferring.
@@ -259,7 +264,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
259264
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
260265
timestampFormat=timestampFormat, multiLine=multiLine,
261266
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
262-
samplingRatio=samplingRatio)
267+
samplingRatio=samplingRatio, encoding=encoding)
263268
if isinstance(path, basestring):
264269
path = [path]
265270
if type(path) == list:
@@ -752,7 +757,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)
752757

753758
@since(1.4)
754759
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
755-
lineSep=None):
760+
lineSep=None, encoding=None):
756761
"""Saves the content of the :class:`DataFrame` in JSON format
757762
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
758763
specified path.
@@ -776,6 +781,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
776781
formats follow the formats at ``java.text.SimpleDateFormat``.
777782
This applies to timestamp type. If None is set, it uses the
778783
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
784+
:param encoding: specifies encoding (charset) of saved json files. If None is set,
785+
the default UTF-8 charset will be used.
779786
:param lineSep: defines the line separator that should be used for writing. If None is
780787
set, it uses the default value, ``\\n``.
781788
@@ -784,7 +791,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
784791
self.mode(mode)
785792
self._set_opts(
786793
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
787-
lineSep=lineSep)
794+
lineSep=lineSep, encoding=encoding)
788795
self._jwrite.json(path)
789796

790797
@since(1.4)

python/pyspark/sql/tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,13 @@ def test_multiline_json(self):
685685
multiLine=True)
686686
self.assertEqual(people1.collect(), people_array.collect())
687687

688+
def test_encoding_json(self):
689+
people_array = self.spark.read\
690+
.json("python/test_support/sql/people_array_utf16le.json",
691+
multiLine=True, encoding="UTF-16LE")
692+
expected = [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
693+
self.assertEqual(people_array.collect(), expected)
694+
688695
def test_linesep_json(self):
689696
df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",")
690697
expected = [Row(_corrupt_record=None, name=u'Michael'),
182 Bytes
Binary file not shown.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818
package org.apache.spark.sql.catalyst.json
1919

2020
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
21+
import java.nio.channels.Channels
22+
import java.nio.charset.Charset
2123

2224
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2325
import org.apache.hadoop.io.Text
26+
import sun.nio.cs.StreamDecoder
2427

28+
import org.apache.spark.sql.catalyst.InternalRow
2529
import org.apache.spark.unsafe.types.UTF8String
2630

2731
private[sql] object CreateJacksonParser extends Serializable {
@@ -43,7 +47,48 @@ private[sql] object CreateJacksonParser extends Serializable {
4347
jsonFactory.createParser(record.getBytes, 0, record.getLength)
4448
}
4549

46-
def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
47-
jsonFactory.createParser(record)
50+
// Jackson parsers can be ranked according to their performance:
51+
// 1. Array based with actual encoding UTF-8 in the array. This is the fastest parser
52+
// but it doesn't allow to set encoding explicitly. Actual encoding is detected automatically
53+
// by checking leading bytes of the array.
54+
// 2. InputStream based with actual encoding UTF-8 in the stream. Encoding is detected
55+
// automatically by analyzing first bytes of the input stream.
56+
// 3. Reader based parser. This is the slowest parser used here but it allows to create
57+
// a reader with specific encoding.
58+
// The method creates a reader for an array with given encoding and sets size of internal
59+
// decoding buffer according to size of input array.
60+
private def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = {
61+
val bais = new ByteArrayInputStream(in, 0, length)
62+
val byteChannel = Channels.newChannel(bais)
63+
val decodingBufferSize = Math.min(length, 8192)
64+
val decoder = Charset.forName(enc).newDecoder()
65+
66+
StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize)
67+
}
68+
69+
def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = {
70+
val sd = getStreamDecoder(enc, record.getBytes, record.getLength)
71+
jsonFactory.createParser(sd)
72+
}
73+
74+
def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = {
75+
jsonFactory.createParser(is)
76+
}
77+
78+
def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
79+
jsonFactory.createParser(new InputStreamReader(is, enc))
80+
}
81+
82+
def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
83+
val ba = row.getBinary(0)
84+
85+
jsonFactory.createParser(ba, 0, ba.length)
86+
}
87+
88+
def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
89+
val binary = row.getBinary(0)
90+
val sd = getStreamDecoder(enc, binary, binary.length)
91+
92+
jsonFactory.createParser(sd)
4893
}
4994
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.json
1919

20-
import java.nio.charset.StandardCharsets
20+
import java.nio.charset.{Charset, StandardCharsets}
2121
import java.util.{Locale, TimeZone}
2222

2323
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
@@ -86,14 +86,43 @@ private[sql] class JSONOptions(
8686

8787
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
8888

89+
/**
90+
* A string between two consecutive JSON records.
91+
*/
8992
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
9093
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
9194
sep
9295
}
93-
// Note that the option 'lineSep' uses a different default value in read and write.
94-
val lineSeparatorInRead: Option[Array[Byte]] =
95-
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
96-
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
96+
97+
/**
98+
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE.
99+
* If the encoding is not specified (None), it will be detected automatically
100+
* when the multiLine option is set to `true`.
101+
*/
102+
val encoding: Option[String] = parameters.get("encoding")
103+
.orElse(parameters.get("charset")).map { enc =>
104+
// The following encodings are not supported in per-line mode (multiline is false)
105+
// because they cause some problems in reading files with BOM which is supposed to
106+
// present in the files with such encodings. After splitting input files by lines,
107+
// only the first lines will have the BOM which leads to impossibility for reading
108+
// the rest lines. Besides of that, the lineSep option must have the BOM in such
109+
// encodings which can never present between lines.
110+
val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32"))
111+
val isBlacklisted = blacklist.contains(Charset.forName(enc))
112+
require(multiLine || !isBlacklisted,
113+
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled:
114+
| ${blacklist.mkString(", ")}""".stripMargin)
115+
116+
val isLineSepRequired = !(multiLine == false &&
117+
Charset.forName(enc) != StandardCharsets.UTF_8 && lineSeparator.isEmpty)
118+
require(isLineSepRequired, s"The lineSep option must be specified for the $enc encoding")
119+
120+
enc
121+
}
122+
123+
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
124+
lineSep.getBytes(encoding.getOrElse("UTF-8"))
125+
}
97126
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")
98127

99128
/** Sets config options on a Jackson [[JsonFactory]]. */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.json
1919

20-
import java.io.ByteArrayOutputStream
20+
import java.io.{ByteArrayOutputStream, CharConversionException}
2121

2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.util.Try
@@ -361,6 +361,14 @@ class JacksonParser(
361361
// For such records, all fields other than the field configured by
362362
// `columnNameOfCorruptRecord` are set to `null`.
363363
throw BadRecordException(() => recordLiteral(record), () => None, e)
364+
case e: CharConversionException if options.encoding.isEmpty =>
365+
val msg =
366+
"""JSON parser cannot handle a character in its input.
367+
|Specifying encoding as an input option explicitly might help to resolve the issue.
368+
|""".stripMargin + e.getMessage
369+
val wrappedCharException = new CharConversionException(msg)
370+
wrappedCharException.initCause(e)
371+
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
364372
}
365373
}
366374
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
372372
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
373373
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
374374
* per file</li>
375+
* <li>`encoding` (by default it is not set): allows to forcibly set one of standard basic
376+
* or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE. If the encoding
377+
* is not specified and `multiLine` is set to `true`, it will be detected automatically.</li>
375378
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
376379
* that should be used for parsing.</li>
377380
* <li>`samplingRatio` (default is 1.0): defines fraction of input JSON objects used

sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -518,8 +518,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
518518
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
519519
* indicates a timestamp format. Custom date formats follow the formats at
520520
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
521-
* <li>`lineSep` (default `\n`): defines the line separator that should
522-
* be used for writing.</li>
521+
* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved json
522+
* files. If it is not set, the UTF-8 charset will be used. </li>
523+
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.</li>
523524
* </ul>
524525
*
525526
* @since 1.4.0
@@ -589,8 +590,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
589590
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
590591
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
591592
* `snappy` and `deflate`). </li>
592-
* <li>`lineSep` (default `\n`): defines the line separator that should
593-
* be used for writing.</li>
593+
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.</li>
594594
* </ul>
595595
*
596596
* @since 1.6.0

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
3131
import org.apache.spark.TaskContext
3232
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
3333
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
34-
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
34+
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3535
import org.apache.spark.sql.catalyst.InternalRow
3636
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
3737
import org.apache.spark.sql.execution.datasources._
38-
import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOptions}
38+
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
3939
import org.apache.spark.sql.types.StructType
4040
import org.apache.spark.unsafe.types.UTF8String
4141
import org.apache.spark.util.Utils
@@ -92,26 +92,30 @@ object TextInputJsonDataSource extends JsonDataSource {
9292
sparkSession: SparkSession,
9393
inputPaths: Seq[FileStatus],
9494
parsedOptions: JSONOptions): StructType = {
95-
val json: Dataset[String] = createBaseDataset(
96-
sparkSession, inputPaths, parsedOptions.lineSeparator)
95+
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)
96+
9797
inferFromDataset(json, parsedOptions)
9898
}
9999

100100
def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
101101
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
102-
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0))
103-
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
102+
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd
103+
val rowParser = parsedOptions.encoding.map { enc =>
104+
CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
105+
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow))
106+
107+
JsonInferSchema.infer(rdd, parsedOptions, rowParser)
104108
}
105109

106110
private def createBaseDataset(
107111
sparkSession: SparkSession,
108112
inputPaths: Seq[FileStatus],
109-
lineSeparator: Option[String]): Dataset[String] = {
110-
val textOptions = lineSeparator.map { lineSep =>
111-
Map(TextOptions.LINE_SEPARATOR -> lineSep)
112-
}.getOrElse(Map.empty[String, String])
113-
113+
parsedOptions: JSONOptions): Dataset[String] = {
114114
val paths = inputPaths.map(_.getPath.toString)
115+
val textOptions = Map.empty[String, String] ++
116+
parsedOptions.encoding.map("encoding" -> _) ++
117+
parsedOptions.lineSeparator.map("lineSep" -> _)
118+
115119
sparkSession.baseRelationToDataFrame(
116120
DataSource.apply(
117121
sparkSession,
@@ -129,8 +133,12 @@ object TextInputJsonDataSource extends JsonDataSource {
129133
schema: StructType): Iterator[InternalRow] = {
130134
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
131135
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
136+
val textParser = parser.options.encoding
137+
.map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
138+
.getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))
139+
132140
val safeParser = new FailureSafeParser[Text](
133-
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
141+
input => parser.parse(input, textParser, textToUTF8String),
134142
parser.options.parseMode,
135143
schema,
136144
parser.options.columnNameOfCorruptRecord)
@@ -153,7 +161,11 @@ object MultiLineJsonDataSource extends JsonDataSource {
153161
parsedOptions: JSONOptions): StructType = {
154162
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
155163
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
156-
JsonInferSchema.infer(sampled, parsedOptions, createParser)
164+
val parser = parsedOptions.encoding
165+
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
166+
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))
167+
168+
JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
157169
}
158170

159171
private def createBaseRdd(
@@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
175187
.values
176188
}
177189

178-
private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
179-
val path = new Path(record.getPath())
180-
CreateJacksonParser.inputStream(
181-
jsonFactory,
182-
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
190+
private def dataToInputStream(dataStream: PortableDataStream): InputStream = {
191+
val path = new Path(dataStream.getPath())
192+
CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path)
193+
}
194+
195+
private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = {
196+
CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream))
197+
}
198+
199+
private def createParser(enc: String, jsonFactory: JsonFactory,
200+
stream: PortableDataStream): JsonParser = {
201+
CreateJacksonParser.inputStream(enc, jsonFactory, dataToInputStream(stream))
183202
}
184203

185204
override def readFile(
@@ -194,9 +213,12 @@ object MultiLineJsonDataSource extends JsonDataSource {
194213
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
195214
}
196215
}
216+
val streamParser = parser.options.encoding
217+
.map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream))
218+
.getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream))
197219

198220
val safeParser = new FailureSafeParser[InputStream](
199-
input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString),
221+
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
200222
parser.options.parseMode,
201223
schema,
202224
parser.options.columnNameOfCorruptRecord)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.json
1919

20+
import java.nio.charset.{Charset, StandardCharsets}
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.hadoop.fs.{FileStatus, Path}
2224
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -151,7 +153,13 @@ private[json] class JsonOutputWriter(
151153
context: TaskAttemptContext)
152154
extends OutputWriter with Logging {
153155

154-
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
156+
private val encoding = options.encoding match {
157+
case Some(charsetName) => Charset.forName(charsetName)
158+
case None => StandardCharsets.UTF_8
159+
}
160+
161+
private val writer = CodecStreams.createOutputStreamWriter(
162+
context, new Path(path), encoding)
155163

156164
// create the Generator without separator inserted between 2 records
157165
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)

0 commit comments

Comments
 (0)