Skip to content

Improving documentation about es.read.fields.include, and fixing a related bug #1822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/reference/asciidoc/core/spark.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,9 @@ val smiths = sqlContext.esDF("spark/people","?q=Smith") <1>
In some cases, especially when the index in {es} contains a lot of fields, it is desireable to create a +DataFrame+ that contains only a _subset_ of them. While one can modify the +DataFrame+ (by working on its backing +RDD+) through the official Spark API or through dedicated queries, {eh} allows the user to specify what fields to include and exclude from {es} when creating the +DataFrame+.

Through +es.read.field.include+ and +es.read.field.exclude+ properties, one can indicate what fields to include or exclude from the index mapping. The syntax is similar to that of {es} {ref}/search-request-body.html#request-body-search-source-filtering[include/exclude]. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included and no properties/fields are excluded.
Note that these properties can include leading and trailing wildcards. Including part of a hierarchy of fields without a trailing wildcard
does not imply that the entire hierarcy is included. However in most cases it does not make sense to include only part of a hierarchy, so a
trailing wildcard should be included.

For example:
[source,ini]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2242,6 +2242,50 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals(2, df.count())
}

@Test
def testReadFieldInclude(): Unit = {
val data = Seq(
Row(Row(List(Row("hello","2"), Row("world","1"))))
)
val rdd: RDD[Row] = sc.parallelize(data)
val schema = new StructType()
.add("features", new StructType()
.add("hashtags", new ArrayType(new StructType()
.add("text", StringType)
.add("count", StringType), true)))

val inputDf = sqc.createDataFrame(rdd, schema)
inputDf.write
.format("org.elasticsearch.spark.sql")
.save("read_field_include_test")
val reader = sqc.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","features.hashtags")

// No "es.read.field.include", so everything is included:
var df = reader.load("read_field_include_test")
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))

// "es.read.field.include" has trailing wildcard, so everything included:
df = reader.option("es.read.field.include","features.hashtags.*").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))

// "es.read.field.include" includes text but not count
df = reader.option("es.read.field.include","features.hashtags.text").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(1, result(0).size)
assertEquals("hello", result(0).getAs("text"))

// "es.read.field.include" does not include the leaves in the hierarchy so they won't be returned
df = reader.option("es.read.field.include","features.hashtags").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(0, result(0).size)
}

/**
* Take advantage of the fixed method order and clear out all created indices.
* The indices will last in Elasticsearch for all parameters of this test suite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
var metadataMap = true
var rootLevel = true
var inArray = false
var currentArrayRowOrder:Seq[String] = null
var currentArrayRowOrder:Seq[String] = Seq.empty[String]

override def readValue(parser: Parser, value: String, esType: FieldType) = {
sparkRowField = if (getCurrentField == null) null else getCurrentField.getFieldName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,50 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals(2, df.count())
}

@Test
def testReadFieldInclude(): Unit = {
val data = Seq(
Row(Row(List(Row("hello","2"), Row("world","1"))))
)
val rdd: RDD[Row] = sc.parallelize(data)
val schema = new StructType()
.add("features", new StructType()
.add("hashtags", new ArrayType(new StructType()
.add("text", StringType)
.add("count", StringType), true)))

val inputDf = sqc.createDataFrame(rdd, schema)
inputDf.write
.format("org.elasticsearch.spark.sql")
.save("read_field_include_test")
val reader = sqc.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","features.hashtags")

// No "es.read.field.include", so everything is included:
var df = reader.load("read_field_include_test")
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))

// "es.read.field.include" has trailing wildcard, so everything included:
df = reader.option("es.read.field.include","features.hashtags.*").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))

// "es.read.field.include" includes text but not count
df = reader.option("es.read.field.include","features.hashtags.text").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(1, result(0).size)
assertEquals("hello", result(0).getAs("text"))

// "es.read.field.include" does not include the leaves in the hierarchy so they won't be returned
df = reader.option("es.read.field.include","features.hashtags").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(0, result(0).size)
}

/**
* Take advantage of the fixed method order and clear out all created indices.
* The indices will last in Elasticsearch for all parameters of this test suite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
var metadataMap = true
var rootLevel = true
var inArray = false
var currentArrayRowOrder:Seq[String] = null
var currentArrayRowOrder:Seq[String] = Seq.empty[String]

override def readValue(parser: Parser, value: String, esType: FieldType) = {
sparkRowField = if (getCurrentField == null) null else getCurrentField.getFieldName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2325,6 +2325,51 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals(2, df.count())
}

@Test
def testReadFieldInclude(): Unit = {
val data = Seq(
Row(Row(List(Row("hello","2"), Row("world","1"))))
)
val rdd: RDD[Row] = sc.parallelize(data)
val schema = new StructType()
.add("features", new StructType()
.add("hashtags", new ArrayType(new StructType()
.add("text", StringType)
.add("count", StringType), true)))

val inputDf = sqc.createDataFrame(rdd, schema)
inputDf.write
.format("org.elasticsearch.spark.sql")
.save("read_field_include_test")
val reader = sqc.read.format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","features.hashtags")

// No "es.read.field.include", so everything is included:
var df = reader.load("read_field_include_test")
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))

// "es.read.field.include" has trailing wildcard, so everything included:
df = reader.option("es.read.field.include","features.hashtags.*").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))

// "es.read.field.include" includes text but not count
df = reader.option("es.read.field.include","features.hashtags.text").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(1, result(0).size)
assertEquals("hello", result(0).getAs("text"))

// "es.read.field.include" does not include the leaves in the hierarchy so they won't be returned
df = reader.option("es.read.field.include","features.hashtags").load("read_field_include_test")
result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
assertEquals(0, result(0).size)
}


/**
* Take advantage of the fixed method order and clear out all created indices.
* The indices will last in Elasticsearch for all parameters of this test suite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ScalaRowValueReader extends ScalaValueReader with RowValueReader with Valu
var metadataMap = true
var rootLevel = true
var inArray = false
var currentArrayRowOrder:Seq[String] = null
var currentArrayRowOrder:Seq[String] = Seq.empty[String]

override def readValue(parser: Parser, value: String, esType: FieldType) = {
sparkRowField = if (getCurrentField == null) null else getCurrentField.getFieldName
Expand Down