Skip to content

Commit

Permalink
Cosmos Spark update dataSource cosmos.changeFeed to cosmos.oltp.chang…
Browse files Browse the repository at this point in the history
…eFeed (#21184)

update dataSource cosmos.changeFeed to cosmos.oltp.changeFeed
  • Loading branch information
moderakh authored May 6, 2021
1 parent 23cea75 commit ef21fc6
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 11 deletions.
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
## Release History

### 4.0.0-beta.4 (Unreleased)
#### Configuration Renames
* Renamed data source name `cosmos.changeFeed` to `cosmos.oltp.changeFeed`, See [PR](https://github.com/Azure/azure-sdk-for-java/pull/21184).

### 4.0.0-beta.3 (2021-05-05)
* Cosmos DB Spark 3.1.1 Connector Preview `4.0.0-beta.3` Release.
#### Configuration Renames
* Renamed data source name `cosmos.items` to `cosmos.oltp`, See [PR](https://github.com/Azure/azure-sdk-for-java/pull/21121).
* Renamed data source name `cosmos.changeFeed` to `cosmos.oltp.changeFeed`, see [PR](https://github.com/Azure/azure-sdk-for-java/pull/21121).
* Configuration renamed. See [PR](https://github.com/Azure/azure-sdk-for-java/pull/21004) for list of changes. See [Configuration-Reference](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md) for more details.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
" \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n",
"}\n",
"changeFeed_df = spark.read.format(\"cosmos.changeFeed\").options(**changeFeedCfg).load()\n",
"changeFeed_df = spark.read.format(\"cosmos.oltp.changeFeed\").options(**changeFeedCfg).load()\n",
"count_changeFeed = changeFeed_df.count()\n",
"print(\"Number of records retrieved via change feed: \", count_changeFeed) \n",
"print(\"Finished validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
"\n",
" changeFeedDF = spark \\\n",
" .readStream \\\n",
" .format(\"cosmos.changeFeed\") \\\n",
" .format(\"cosmos.oltp.changeFeed\") \\\n",
" .options(**changeFeedCfg) \\\n",
" .load()\n",
" \n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private object CosmosConstants {

object Names {
val ItemsDataSourceShortName = "cosmos.oltp"
val ChangeFeedDataSourceShortName = "cosmos.changeFeed"
val ChangeFeedDataSourceShortName = "cosmos.oltp.changeFeed"
}

object Properties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.read.inferSchema.enabled" -> "false"
)

val df = spark.read.format("cosmos.changeFeed").options(cfg).load()
val df = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray = df.collect()
rowsArray should have size 2
df.schema.equals(
Expand All @@ -57,7 +57,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.changeFeed.mode" -> "Incremental"
)

val dfExplicit = spark.read.format("cosmos.changeFeed").options(cfgExplicit).load()
val dfExplicit = spark.read.format("cosmos.oltp.changeFeed").options(cfgExplicit).load()
val rowsArrayExplicit = dfExplicit.collect()
rowsArrayExplicit should have size 2
dfExplicit.schema.equals(
Expand Down Expand Up @@ -94,7 +94,7 @@ class SparkE2EChangeFeedITest
StructField("isAlive", BooleanType)
))

val df = spark.read.schema(customSchema).format("cosmos.changeFeed").options(cfg).load()
val df = spark.read.schema(customSchema).format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray = df.collect()
rowsArray should have size 2
df.schema.equals(customSchema) shouldEqual true
Expand Down Expand Up @@ -124,7 +124,7 @@ class SparkE2EChangeFeedITest
"spark.cosmos.changeFeed.startFrom" -> "NOW"
)

val df = spark.read.format("cosmos.changeFeed").options(cfg).load()
val df = spark.read.format("cosmos.oltp.changeFeed").options(cfg).load()
val rowsArray = df.collect()
rowsArray should have size 0
df.schema.equals(
Expand Down Expand Up @@ -177,7 +177,7 @@ class SparkE2EChangeFeedITest

val changeFeedDF = spark
.readStream
.format("cosmos.changeFeed")
.format("cosmos.oltp.changeFeed")
.options(readCfg)
.load()
val microBatchQuery = changeFeedDF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SparkE2EStructuredStreamingITest

val changeFeedDF = spark
.readStream
.format("cosmos.changeFeed")
.format("cosmos.oltp.changeFeed")
.options(changeFeedCfg)
.load()

Expand Down Expand Up @@ -107,7 +107,7 @@ class SparkE2EStructuredStreamingITest

val secondChangeFeedDF = spark
.readStream
.format("cosmos.changeFeed")
.format("cosmos.oltp.changeFeed")
.options(changeFeedCfg)
.load()

Expand Down

0 comments on commit ef21fc6

Please sign in to comment.