Skip to content

[SPARK-25393][SQL] Adding new function from_csv() #22379

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 64 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
344c2ab
Initial implementation of CsvToStruct
MaxGekk Sep 9, 2018
5905019
Added CSV Expression Test Suite
MaxGekk Sep 9, 2018
c5ac432
Register from_csv functions and add tests
MaxGekk Sep 9, 2018
bd2124c
Fix imports
MaxGekk Sep 9, 2018
b9bb081
Adding SQL tests
MaxGekk Sep 9, 2018
14ae619
from_csv for PySpark
MaxGekk Sep 9, 2018
cfb2ac3
from_csv for SparkR
MaxGekk Sep 10, 2018
d2bfd94
Making Python style checker happy
MaxGekk Sep 10, 2018
b3a8666
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 11, 2018
d19242d
Addressing review comments
MaxGekk Sep 11, 2018
147c978
Updating comments
MaxGekk Sep 11, 2018
42b8227
added new line at the end of file
MaxGekk Sep 11, 2018
2a0b65b
Moving from_csv closer to from_json and fixing warnings
MaxGekk Sep 11, 2018
1ccca30
Deduplication of schema description
MaxGekk Sep 12, 2018
e7175ec
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 13, 2018
7063af0
Enable tests which was failed because parseLine returned null
MaxGekk Sep 13, 2018
aee472c
Addressing Felix Cheung's review comments
MaxGekk Sep 16, 2018
f13f64a
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 20, 2018
5e3787b
Re-targeting to Spark 2.5.0
MaxGekk Sep 20, 2018
81ae688
Removing unnecessary trim and exception handling.
MaxGekk Sep 20, 2018
4bba75e
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 21, 2018
65197f1
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 22, 2018
f5465ca
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 24, 2018
27e162b
Removing unnecessary sql tests
MaxGekk Sep 24, 2018
783559a
Removing unnecessary import in python's example
MaxGekk Sep 24, 2018
4507351
Merge branch 'from_csv' of github.com:MaxGekk/spark-1 into from_csv
MaxGekk Sep 24, 2018
211a1aa
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 25, 2018
5945322
Moving tests from csvFunctionsSuite to csvExpressionsSuite
MaxGekk Sep 25, 2018
5590855
Merge branch 'master' into from_csv
MaxGekk Sep 26, 2018
4c2fcea
Merge branch 'master' into from_csv
MaxGekk Sep 27, 2018
9102135
Fix a mistake after merge
MaxGekk Sep 27, 2018
b36d96a
Revert the datails tag back
MaxGekk Sep 27, 2018
e75aefc
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 29, 2018
2169b86
An example of from_csv in R
MaxGekk Sep 29, 2018
cb77e6e
Merge remote-tracking branch 'fork/from_csv' into from_csv
MaxGekk Sep 29, 2018
826be4e
Missed }
MaxGekk Sep 29, 2018
2b7c268
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Sep 29, 2018
a5b6f69
Fix wrong reference
MaxGekk Sep 29, 2018
1d31954
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Oct 3, 2018
0ee4ec1
2.5 -> 3.0
MaxGekk Oct 3, 2018
e52b71f
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Oct 4, 2018
3af7bfa
Added an example with options
MaxGekk Oct 4, 2018
24481c2
Passing a parameter by name
MaxGekk Oct 4, 2018
70149d8
Adding {}
MaxGekk Oct 4, 2018
d700896
Test for unsupported mode
MaxGekk Oct 4, 2018
643aaea
Test for corrupted records
MaxGekk Oct 4, 2018
3f76ffb
Speed up test by taking only 50 timezones out of 620
MaxGekk Oct 5, 2018
7d33783
Making the comment clear
MaxGekk Oct 5, 2018
5091625
Addressing Wenchen's review comments
MaxGekk Oct 5, 2018
b318239
Improving the error message
MaxGekk Oct 5, 2018
a95e266
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Oct 11, 2018
3d8a893
Using outstanding timezones instead of all timezones
MaxGekk Oct 11, 2018
9908e2e
Merge branch 'from_csv' of github.com:MaxGekk/spark-1 into from_csv
MaxGekk Oct 11, 2018
8d297b2
Moving back to DataTimeTestUtils
MaxGekk Oct 11, 2018
a6c60e9
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Oct 12, 2018
c3a31d4
Moving CSVHeaderChecker to sql/catalyst
MaxGekk Oct 12, 2018
c479973
Merge remote-tracking branch 'origin/master' into from_csv
MaxGekk Oct 12, 2018
88e3b10
Moving toChar to sql/catalyst
MaxGekk Oct 13, 2018
2ffed5f
Address comments at 22379
HyukjinKwon Oct 15, 2018
a32bbcb
nit
HyukjinKwon Oct 15, 2018
b26e49e
Merge pull request #10 from HyukjinKwon/address-from_csv
HyukjinKwon Oct 15, 2018
93d094f
name nits (#11)
HyukjinKwon Oct 15, 2018
cb23bd7
Address comments (#12)
HyukjinKwon Oct 16, 2018
205e4a4
Fix some nits (#13)
HyukjinKwon Oct 16, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ exportMethods("%<=>%",
"floor",
"format_number",
"format_string",
"from_csv",
"from_json",
"from_unixtime",
"from_utc_timestamp",
Expand Down
40 changes: 38 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ NULL
#' \item \code{to_json}: it is the column containing the struct, array of the structs,
#' the map or array of maps.
#' \item \code{from_json}: it is the column containing the JSON string.
#' \item \code{from_csv}: it is the column containing the CSV string.
#' }
#' @param y Column to compute on.
#' @param value A value to compute on.
Expand All @@ -196,6 +197,13 @@ NULL
#' \item \code{array_position}: a value to locate in the given array.
#' \item \code{array_remove}: a value to remove in the given array.
#' }
#' @param schema
#' \itemize{
#' \item \code{from_json}: a structType object to use as the schema to use
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
#' additional named properties to control how it is converted, accepts the same
#' options as the JSON data source. Additionally \code{to_json} supports the "pretty"
Expand Down Expand Up @@ -2165,8 +2173,6 @@ setMethod("date_format", signature(y = "Column", x = "character"),
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
#' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @examples
Expand Down Expand Up @@ -2203,6 +2209,36 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
column(jc)
})

#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
#' with the specified \code{schema}.
#' If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @aliases from_csv from_csv,Column,character-method
#' @examples
#'
#' \dontrun{
#' df <- sql("SELECT 'Amsterdam,2018' as csv")
#' schema <- "city STRING, year INT"
#' head(select(df, from_csv(df$csv, schema)))}
#' @note from_csv since 3.0.0
setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
function(x, schema, ...) {
if (class(schema) == "Column") {
jschema <- schema@jc
} else if (is.character(schema)) {
jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema)
} else {
stop("schema argument should be a column or character")
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"from_csv",
x@jc, jschema, options)
column(jc)
})

#' @details
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,10 @@ setGeneric("format_string", function(format, x, ...) { standardGeneric("format_s
#' @name NULL
setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("from_csv", function(x, schema, ...) { standardGeneric("from_csv") })

#' @rdname column_datetime_functions
#' @name NULL
setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") })
Expand Down
7 changes: 7 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,13 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test from_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)

# Test to_json(), from_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
Expand Down
37 changes: 36 additions & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
if sys.version < "3":
from itertools import imap as map

if sys.version >= '3':
basestring = str

from pyspark import since, SparkContext
from pyspark.rdd import ignore_unicode_prefix, PythonEvalType
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.column import Column, _to_java_column, _to_seq, _create_column_from_literal
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StringType, DataType
# Keep UserDefinedFunction import for backwards compatible import; moved in SPARK-22409
Expand Down Expand Up @@ -2678,6 +2681,38 @@ def sequence(start, stop, step=None):
_to_java_column(start), _to_java_column(stop), _to_java_column(step)))


@ignore_unicode_prefix
@since(3.0)
def from_csv(col, schema, options={}):
"""
Parses a column containing a CSV string to a row with the specified schema.
Returns `null`, in the case of an unparseable string.

:param col: string column in CSV format
:param schema: a string with schema in DDL format to use when parsing the CSV column.
:param options: options to control parsing. accepts the same options as the CSV datasource

>>> data = [(1, '1')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_csv(df.value, "a INT").alias("csv")).collect()
[Row(csv=Row(a=1))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_csv(df.value, lit("a INT")).alias("csv")).collect()
[Row(csv=Row(a=1))]
"""

sc = SparkContext._active_spark_context
if isinstance(schema, basestring):
schema = _create_column_from_literal(schema)
elif isinstance(schema, Column):
schema = _to_java_column(schema)
else:
raise TypeError("schema argument should be a column or string")

jc = sc._jvm.functions.from_csv(_to_java_column(col), schema, options)
return Column(jc)


# ---------------------------- User Defined Function ----------------------------------

class PandasUDFType(object):
Expand Down
6 changes: 6 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>2.7.3</version>
<type>jar</type>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @MaxGekk , @gatorsmile , and @cloud-fan .

I know this is the same approach with from_json, but suddenly I'm wondering if this is the right evolution direction, sql -> catalyst. Recently, we made avro as a external module and the development direction was the opposite. If we put this into catalyst, it become harder to be separated from sql module.

Ideally, we should separate parquet, orc and other built-in data sources from sql module.

Copy link
Member Author

@MaxGekk MaxGekk Sep 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the dependency only because I have to move UnivocityParser from sql/core to sql/catalyst because it wasn't not accessible from sql/catalyst. Please, tell me what is right approach here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should just make CSV one as a separate external module and this should be the right way given the discussion.

The current change wouldn't necessarily is blocked but I can see the point of moving the dependency makes further refactoring potentially harder as pointed out. Looks many people agreed upon separating them.

The concern here is, it sounds we are stepping back from the ideal approach.

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ object FunctionRegistry {
castAlias("date", DateType),
castAlias("timestamp", TimestampType),
castAlias("binary", BinaryType),
castAlias("string", StringType)
castAlias("string", StringType),

// csv
expression[CsvToStructs]("from_csv")
Copy link
Contributor

@cloud-fan cloud-fan Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only reason to move CSV functionality to catalyst is to register it as built-in function?

Copy link
Member

@HyukjinKwon HyukjinKwon Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it looks so. That's actually the concern in #22379 (comment) and here's my opinion on that #22379 (comment) and #22379 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds a reasonable change. cc @rxin

)

val builtin: SimpleFunctionRegistry = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.csv

object CSVExprUtils {
/**
* Filter ignorable rows for CSV iterator (lines empty and starting with `comment`).
* This is currently being used in CSV reading path and CSV schema inference.
*/
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(options.comment.toString)
}
}

def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.dropWhile { line =>
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
}
} else {
iter.dropWhile(_.trim.isEmpty)
}
}

/**
* Extracts header and moves iterator forward so that only data remains in it
*/
def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = {
val nonEmptyLines = skipComments(iter, options)
if (nonEmptyLines.hasNext) {
Some(nonEmptyLines.next())
} else {
None
}
}

/**
* Helper method that converts string representation of a character to actual character.
* It handles some Java escaped strings and throws exception if given string is longer than one
* character.
*/
@throws[IllegalArgumentException]
def toChar(str: String): Char = {
(str: Seq[Char]) match {
case Seq() => throw new IllegalArgumentException("Delimiter cannot be empty string")
case Seq('\\') => throw new IllegalArgumentException("Single backslash is prohibited." +
" It has special meaning as beginning of an escape sequence." +
" To get the backslash character, pass a string with two backslashes as the delimiter.")
case Seq(c) => c
case Seq('\\', 't') => '\t'
case Seq('\\', 'r') => '\r'
case Seq('\\', 'b') => '\b'
case Seq('\\', 'f') => '\f'
// In case user changes quote char and uses \" as delimiter in options
case Seq('\\', '\"') => '\"'
case Seq('\\', '\'') => '\''
case Seq('\\', '\\') => '\\'
case _ if str == """\u0000""" => '\u0000'
case Seq('\\', _) =>
throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str")
case _ =>
throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv
package org.apache.spark.sql.catalyst.csv

import com.univocity.parsers.csv.CsvParser

Expand Down Expand Up @@ -123,7 +123,7 @@ class CSVHeaderChecker(
// Note: if there are only comments in the first block, the header would probably
// be not extracted.
if (options.headerFlag && isStartOfFile) {
CSVUtils.extractHeader(lines, options).foreach { header =>
CSVExprUtils.extractHeader(lines, options).foreach { header =>
checkHeaderColumnNames(tokenizer.parseLine(header))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv
package org.apache.spark.sql.catalyst.csv

import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}
Expand Down Expand Up @@ -83,7 +83,7 @@ class CSVOptions(
}
}

val delimiter = CSVUtils.toChar(
val delimiter = CSVExprUtils.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv
package org.apache.spark.sql.catalyst.csv

import java.io.InputStream
import java.math.BigDecimal
Expand All @@ -28,8 +28,7 @@ import com.univocity.parsers.csv.CsvParser
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.execution.datasources.FailureSafeParser
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -264,7 +263,7 @@ class UnivocityParser(
}
}

private[csv] object UnivocityParser {
private[sql] object UnivocityParser {

/**
* Parses a stream that contains CSV strings and turns it into an iterator of tokens.
Expand Down Expand Up @@ -339,7 +338,7 @@ private[csv] object UnivocityParser {

val options = parser.options

val filteredLines: Iterator[String] = CSVUtils.filterCommentAndEmpty(lines, options)
val filteredLines: Iterator[String] = CSVExprUtils.filterCommentAndEmpty(lines, options)

val safeParser = new FailureSafeParser[String](
input => Seq(parser.parse(input)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
import org.apache.spark.sql.types.{MapType, StringType, StructType}

object ExprUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment in this object


def evalSchemaExpr(exp: Expression): StructType = exp match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make JsonExprUtils.evalSchemaExpr and ExprUtils.evalSchemaExpr consistent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between the two functions appeared when I modified evalSchemaExpr in JsonExprUtils to support schema_of_json. When we rebase and prepare schema_of_csv in the PR #22666, we will merge those two functions.

case Literal(s, StringType) => StructType.fromDDL(s.toString)
case e => throw new AnalysisException(
s"Schema should be specified in DDL format as a string literal instead of ${e.sql}")
}

def convertToMapData(exp: Expression): Map[String, String] = exp match {
case m: CreateMap
if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
key.toString -> value.toString
}
case m: CreateMap =>
throw new AnalysisException(
s"A type of keys and values in map() must be string, but got ${m.dataType.catalogString}")
case _ =>
throw new AnalysisException("Must use a map() function for options")
}
}
Loading