Description
What kind an issue is this?
- Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
The easier it is to track down the bug, the faster it is solved. - Feature Request. Start by telling us what problem you’re trying to solve.
Often a solution already exists! Don’t send pull requests to implement new features without
first getting our support. Sometimes we leave features out on purpose to keep the project small.
Issue description
The target index's data contains an IP field. If using Elasticsearch Spark to read the data,
the returned Spark data schema of the IP field is String
, while it actually parsed as Integer (Long
) when
the data is retrieved from Elasticsearch. Thus, the real datatype and the Spark schema does not match,
which results in scala.MatchError: 2887535728 (of class java.lang.Long)
.
Steps to reproduce
The elastic data source contains come fields of IP type, like
"collect_ip": "172.28.76.112",
and the corresponding definition of mapping is:
"dev_ip": {
"type": "ip",
"include_in_all": false
},
I try to read the index from pyspark
:
df = spark.read.format("org.elasticsearch.spark.sql")
.options(nodes="es_nodes", port=9200)
.load("index_name")
and the spark DataFrame
is successfully loaded, and we can inspect its schema:
root
......
|-- collect_ip: string (nullable = true)
|-- end_time: timestamp (nullable = true)
|-- response_type: integer (nullable = true)
......
Then I try to have a preview of the data by calling df.show()
, The following exception (scala.MatchError
) is triggered.
Strack trace:
Caused by: scala.MatchError: 2887535728 (of class java.lang.Long)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:277)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:104)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:386)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
In particular, the long integer in the error message 2887535728
actually is the integer value of IP address 172.28.76.112
, which I have listed above.
Version Info
OS: CentOS Linux 7 (Core), Linux 3.10.0-693.el7.x86_64
JVM : 1.8.0_40
Hadoop/Spark: 2.7.2.3/2.3.1
ES-Hadoop : 5.6.0
ES : 5.1.1
Feature description
The feature should also address the aforementioned issue.
Before reading the data from Elasticsearch, one can add a group of options to specify the desired datatype for fields, in case that there is no corresponding data type in Java, or the auto guess procedure of data type has some flaws.
This option should work like es.read.field.as.array.include
, which convert a field to array type.
For example, assume there is an option
es.read.field.as.string.include collect_ip
and then, the obtained schema should set of type of collect_ip
to String
as there is no IP type in Java. When parsing data from Elasticsearch, this option specifies that the collect_ip
field should be converted to String
from its original type (may be long integer
). In this way, the returned data will match the spark schema.