Skip to content

Commit 13cfa8c

Browse files
authored
Improving documentation about es.read.fields.include, and fixing a related bug (#1822) (#1833)
The documentation for es.read.fields.include left room for confusion. Also the bevaior was different between spark 1, spark 2, and spark 3 -- setting "es.read.fields.include" to part of a hierarchy caused a NullPointerException in spark 1 and spark 2. Closes #1784
1 parent 0b7cefa commit 13cfa8c

File tree

7 files changed

+139
-3
lines changed

7 files changed

+139
-3
lines changed

docs/src/reference/asciidoc/core/spark.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,6 +1578,9 @@ val smiths = sqlContext.esDF("spark/people","?q=Smith") <1>
15781578
In some cases, especially when the index in {es} contains a lot of fields, it is desireable to create a +DataFrame+ that contains only a _subset_ of them. While one can modify the +DataFrame+ (by working on its backing +RDD+) through the official Spark API or through dedicated queries, {eh} allows the user to specify what fields to include and exclude from {es} when creating the +DataFrame+.
15791579

15801580
Through +es.read.field.include+ and +es.read.field.exclude+ properties, one can indicate what fields to include or exclude from the index mapping. The syntax is similar to that of {es} {ref}/search-request-body.html#request-body-search-source-filtering[include/exclude]. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included and no properties/fields are excluded.
1581+
Note that these properties can include leading and trailing wildcards. Including part of a hierarchy of fields without a trailing wildcard
1582+
does not imply that the entire hierarcy is included. However in most cases it does not make sense to include only part of a hierarchy, so a
1583+
trailing wildcard should be included.
15811584

15821585
For example:
15831586
[source,ini]

spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,6 +2242,50 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
22422242
assertEquals(2, df.count())
22432243
}
22442244

2245+
@Test
2246+
def testReadFieldInclude(): Unit = {
2247+
val data = Seq(
2248+
Row(Row(List(Row("hello","2"), Row("world","1"))))
2249+
)
2250+
val rdd: RDD[Row] = sc.parallelize(data)
2251+
val schema = new StructType()
2252+
.add("features", new StructType()
2253+
.add("hashtags", new ArrayType(new StructType()
2254+
.add("text", StringType)
2255+
.add("count", StringType), true)))
2256+
2257+
val inputDf = sqc.createDataFrame(rdd, schema)
2258+
inputDf.write
2259+
.format("org.elasticsearch.spark.sql")
2260+
.save("read_field_include_test")
2261+
val reader = sqc.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","features.hashtags")
2262+
2263+
// No "es.read.field.include", so everything is included:
2264+
var df = reader.load("read_field_include_test")
2265+
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2266+
assertEquals(2, result(0).size)
2267+
assertEquals("hello", result(0).getAs("text"))
2268+
assertEquals("2", result(0).getAs("count"))
2269+
2270+
// "es.read.field.include" has trailing wildcard, so everything included:
2271+
df = reader.option("es.read.field.include","features.hashtags.*").load("read_field_include_test")
2272+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2273+
assertEquals(2, result(0).size)
2274+
assertEquals("hello", result(0).getAs("text"))
2275+
assertEquals("2", result(0).getAs("count"))
2276+
2277+
// "es.read.field.include" includes text but not count
2278+
df = reader.option("es.read.field.include","features.hashtags.text").load("read_field_include_test")
2279+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2280+
assertEquals(1, result(0).size)
2281+
assertEquals("hello", result(0).getAs("text"))
2282+
2283+
// "es.read.field.include" does not include the leaves in the hierarchy so they won't be returned
2284+
df = reader.option("es.read.field.include","features.hashtags").load("read_field_include_test")
2285+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2286+
assertEquals(0, result(0).size)
2287+
}
2288+
22452289
/**
22462290
* Take advantage of the fixed method order and clear out all created indices.
22472291
* The indices will last in Elasticsearch for all parameters of this test suite.

spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/ScalaEsRowValueReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
3333
var metadataMap = true
3434
var rootLevel = true
3535
var inArray = false
36-
var currentArrayRowOrder:Seq[String] = null
36+
var currentArrayRowOrder:Seq[String] = Seq.empty[String]
3737

3838
override def readValue(parser: Parser, value: String, esType: FieldType) = {
3939
sparkRowField = if (getCurrentField == null) null else getCurrentField.getFieldName

spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2324,6 +2324,50 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23242324
assertEquals(2, df.count())
23252325
}
23262326

2327+
@Test
2328+
def testReadFieldInclude(): Unit = {
2329+
val data = Seq(
2330+
Row(Row(List(Row("hello","2"), Row("world","1"))))
2331+
)
2332+
val rdd: RDD[Row] = sc.parallelize(data)
2333+
val schema = new StructType()
2334+
.add("features", new StructType()
2335+
.add("hashtags", new ArrayType(new StructType()
2336+
.add("text", StringType)
2337+
.add("count", StringType), true)))
2338+
2339+
val inputDf = sqc.createDataFrame(rdd, schema)
2340+
inputDf.write
2341+
.format("org.elasticsearch.spark.sql")
2342+
.save("read_field_include_test")
2343+
val reader = sqc.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","features.hashtags")
2344+
2345+
// No "es.read.field.include", so everything is included:
2346+
var df = reader.load("read_field_include_test")
2347+
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2348+
assertEquals(2, result(0).size)
2349+
assertEquals("hello", result(0).getAs("text"))
2350+
assertEquals("2", result(0).getAs("count"))
2351+
2352+
// "es.read.field.include" has trailing wildcard, so everything included:
2353+
df = reader.option("es.read.field.include","features.hashtags.*").load("read_field_include_test")
2354+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2355+
assertEquals(2, result(0).size)
2356+
assertEquals("hello", result(0).getAs("text"))
2357+
assertEquals("2", result(0).getAs("count"))
2358+
2359+
// "es.read.field.include" includes text but not count
2360+
df = reader.option("es.read.field.include","features.hashtags.text").load("read_field_include_test")
2361+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2362+
assertEquals(1, result(0).size)
2363+
assertEquals("hello", result(0).getAs("text"))
2364+
2365+
// "es.read.field.include" does not include the leaves in the hierarchy so they won't be returned
2366+
df = reader.option("es.read.field.include","features.hashtags").load("read_field_include_test")
2367+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2368+
assertEquals(0, result(0).size)
2369+
}
2370+
23272371
/**
23282372
* Take advantage of the fixed method order and clear out all created indices.
23292373
* The indices will last in Elasticsearch for all parameters of this test suite.

spark/sql-20/src/main/scala/org/elasticsearch/spark/sql/ScalaEsRowValueReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
3434
var metadataMap = true
3535
var rootLevel = true
3636
var inArray = false
37-
var currentArrayRowOrder:Seq[String] = null
37+
var currentArrayRowOrder:Seq[String] = Seq.empty[String]
3838

3939
override def readValue(parser: Parser, value: String, esType: FieldType) = {
4040
sparkRowField = if (getCurrentField == null) null else getCurrentField.getFieldName

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2325,6 +2325,51 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23252325
assertEquals(2, df.count())
23262326
}
23272327

2328+
@Test
2329+
def testReadFieldInclude(): Unit = {
2330+
val data = Seq(
2331+
Row(Row(List(Row("hello","2"), Row("world","1"))))
2332+
)
2333+
val rdd: RDD[Row] = sc.parallelize(data)
2334+
val schema = new StructType()
2335+
.add("features", new StructType()
2336+
.add("hashtags", new ArrayType(new StructType()
2337+
.add("text", StringType)
2338+
.add("count", StringType), true)))
2339+
2340+
val inputDf = sqc.createDataFrame(rdd, schema)
2341+
inputDf.write
2342+
.format("org.elasticsearch.spark.sql")
2343+
.save("read_field_include_test")
2344+
val reader = sqc.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","features.hashtags")
2345+
2346+
// No "es.read.field.include", so everything is included:
2347+
var df = reader.load("read_field_include_test")
2348+
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2349+
assertEquals(2, result(0).size)
2350+
assertEquals("hello", result(0).getAs("text"))
2351+
assertEquals("2", result(0).getAs("count"))
2352+
2353+
// "es.read.field.include" has trailing wildcard, so everything included:
2354+
df = reader.option("es.read.field.include","features.hashtags.*").load("read_field_include_test")
2355+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2356+
assertEquals(2, result(0).size)
2357+
assertEquals("hello", result(0).getAs("text"))
2358+
assertEquals("2", result(0).getAs("count"))
2359+
2360+
// "es.read.field.include" includes text but not count
2361+
df = reader.option("es.read.field.include","features.hashtags.text").load("read_field_include_test")
2362+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2363+
assertEquals(1, result(0).size)
2364+
assertEquals("hello", result(0).getAs("text"))
2365+
2366+
// "es.read.field.include" does not include the leaves in the hierarchy so they won't be returned
2367+
df = reader.option("es.read.field.include","features.hashtags").load("read_field_include_test")
2368+
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2369+
assertEquals(0, result(0).size)
2370+
}
2371+
2372+
23282373
/**
23292374
* Take advantage of the fixed method order and clear out all created indices.
23302375
* The indices will last in Elasticsearch for all parameters of this test suite.

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/ScalaEsRowValueReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
3434
var metadataMap = true
3535
var rootLevel = true
3636
var inArray = false
37-
var currentArrayRowOrder:Seq[String] = null
37+
var currentArrayRowOrder:Seq[String] = Seq.empty[String]
3838

3939
override def readValue(parser: Parser, value: String, esType: FieldType) = {
4040
sparkRowField = if (getCurrentField == null) null else getCurrentField.getFieldName

0 commit comments

Comments
 (0)