Skip to content

Elasticsearch-Spark read data contains fields of IP type, triggering type cast error. #1737

Closed
@garylavayou

Description

@garylavayou

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

The target index's data contains an IP field. If using Elasticsearch Spark to read the data,
the returned Spark data schema of the IP field is String, while it actually parsed as Integer (Long) when
the data is retrieved from Elasticsearch. Thus, the real datatype and the Spark schema does not match,
which results in scala.MatchError: 2887535728 (of class java.lang.Long).

Steps to reproduce

The elastic data source contains come fields of IP type, like

"collect_ip": "172.28.76.112",

and the corresponding definition of mapping is:

"dev_ip": {
    "type": "ip",
    "include_in_all": false
},

I try to read the index from pyspark:

df = spark.read.format("org.elasticsearch.spark.sql")
               .options(nodes="es_nodes", port=9200)
               .load("index_name")

and the spark DataFrame is successfully loaded, and we can inspect its schema:

root
  ......
  |-- collect_ip: string (nullable = true)
  |-- end_time: timestamp (nullable = true)
  |-- response_type: integer (nullable = true)
  ......

Then I try to have a preview of the data by calling df.show(), The following exception (scala.MatchError) is triggered.

Strack trace:

Caused by: scala.MatchError: 2887535728 (of class java.lang.Long)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:277)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:276)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:104)
        at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:386)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
        at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        ... 1 more

In particular, the long integer in the error message 2887535728 actually is the integer value of IP address 172.28.76.112, which I have listed above.

Version Info

OS: CentOS Linux 7 (Core), Linux 3.10.0-693.el7.x86_64
JVM : 1.8.0_40
Hadoop/Spark: 2.7.2.3/2.3.1
ES-Hadoop : 5.6.0
ES : 5.1.1

Feature description

The feature should also address the aforementioned issue.
Before reading the data from Elasticsearch, one can add a group of options to specify the desired datatype for fields, in case that there is no corresponding data type in Java, or the auto guess procedure of data type has some flaws.

This option should work like es.read.field.as.array.include, which convert a field to array type.
For example, assume there is an option

es.read.field.as.string.include    collect_ip

and then, the obtained schema should set of type of collect_ip to String as there is no IP type in Java. When parsing data from Elasticsearch, this option specifies that the collect_ip field should be converted to String from its original type (may be long integer). In this way, the returned data will match the spark schema.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions