Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos Spark: Changing inferSchema.forceNullableProperties default to true #22049

Merged
merged 7 commits into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ When doing read operations, users can specify a custom schema or allow the conne
| `spark.cosmos.read.inferSchema.samplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. |
| `spark.cosmos.read.inferSchema.includeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). |
| `spark.cosmos.read.inferSchema.includeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchema.includeSystemProperties` is enabled, as it will already include all system properties. |
| `spark.cosmos.read.inferSchema.forceNullableProperties` | `false` | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default whether inferred columns are treated as nullable or not will depend on whether any record in the sample set has null-values within a column. If set to `true` all columns will be treated as nullable even if all rows within the sample set have non-null values. |
| `spark.cosmos.read.inferSchema.forceNullableProperties` | `true` | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default, all columns will be treated as nullable even if all rows within the sample set have non-null values. When disabled, the inferred columns are treated as nullable or not depending on whether any record in the sample set has null-values within a column. |
ealsur marked this conversation as resolved.
Show resolved Hide resolved
ealsur marked this conversation as resolved.
Show resolved Hide resolved

#### Json conversion configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ private object CosmosSchemaInferenceConfig {
private val inferSchemaForceNullableProperties = CosmosConfigEntry[Boolean](
key = CosmosConfigNames.ReadInferSchemaForceNullableProperties,
mandatory = false,
defaultValue = Some(false),
defaultValue = Some(true),
parseFromStringFunction = include => include.toBoolean,
helpMessage = "Whether schema inference should enforce inferred properties to be nullable - even when no null-values are contained in the sample set")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,57 @@ class SparkE2EQueryITest
fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe false
}

"spark query" can "when forceNullableProperties is false and rows have different schema" in {
ealsur marked this conversation as resolved.
Show resolved Hide resolved
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val samplingSize = 100
val expectedResults = samplingSize * 2
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)

// Inserting documents with slightly different schema
for( _ <- 1 to expectedResults) {
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
val arr = objectNode.putArray("object_array")
val nested = Utils.getSimpleObjectMapper.createObjectNode()
nested.put("A", "test")
nested.put("B", "test")
arr.add(nested)
objectNode.put("id", UUID.randomUUID().toString)
container.createItem(objectNode).block()
}

for( _ <- 1 to samplingSize) {
val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode()
val arr = objectNode2.putArray("object_array")
val nested = Utils.getSimpleObjectMapper.createObjectNode()
nested.put("A", "test")
arr.add(nested)
objectNode2.put("id", UUID.randomUUID().toString)
container.createItem(objectNode2).block()
}

val cfgWithInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.enabled" -> "true",
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "false",
"spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString,
"spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

val dfWithInference = spark.read.format("cosmos.oltp").options(cfgWithInference).load()
try {
dfWithInference.collect()
fail("Should have thrown an exception")
}
catch {
case inner: Exception =>
inner.toString.contains("The 1th field 'B' of input row cannot be null") shouldBe true
}
}

"spark query" can "use custom sampling size" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
Expand Down Expand Up @@ -580,6 +631,7 @@ class SparkE2EQueryITest
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "false",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

Expand Down Expand Up @@ -609,6 +661,62 @@ class SparkE2EQueryITest
item.getAs[String]("id") shouldEqual id
}

"spark query" can "return proper Cosmos specific query plan on explain with nullable properties" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val id = UUID.randomUUID().toString

val rawItem = s"""
| {
| "id" : "${id}",
| "nestedObject" : {
| "prop1" : 5,
| "prop2" : "6"
| }
| }
|""".stripMargin

val objectNode = objectMapper.readValue(rawItem, classOf[ObjectNode])

val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
container.createItem(objectNode).block()

val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "true",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
)

val df = spark.read.format("cosmos.oltp").options(cfg).load()
val rowsArray = df.where("nestedObject.prop2 = '6'").collect()
rowsArray should have size 1

var output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
df.explain()
}
var queryPlan = output.toString.replaceAll("#\\d+", "#x")
logInfo(s"Query Plan: $queryPlan")
queryPlan.contains("Cosmos Query: SELECT * FROM r") shouldEqual true

output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
df.where("nestedObject.prop2 = '6'").explain()
}
queryPlan = output.toString.replaceAll("#\\d+", "#x")
logInfo(s"Query Plan: $queryPlan")
val expected = s"Cosmos Query: SELECT * FROM r WHERE NOT(IS_NULL(r['nestedObject'])) " +
s"AND r['nestedObject']['prop2']=" +
s"@param0${System.getProperty("line.separator")} > param: @param0 = 6"
queryPlan.contains(expected) shouldEqual true

val item = rowsArray(0)
item.getAs[String]("id") shouldEqual id
}

//scalastyle:on magic.number
//scalastyle:on multiple.string.literals
}