Skip to content

Commit 2d68169

Browse files
Added missing RequestOptions getters and allowing Spark ClientConfig to enable warm-up (#38858)
* Refactoring to make CosmosReadManyRequestOptions not have a base class * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update CosmosReadManyRequestOptions.java * Update module-info.java * Update module-info.java * Added missing RequestOpitons getters and allowing Spark ClientConfig to enable warm-up * Update CosmosClientCache.scala * Update configuration-reference.md * Reacting to code review feeedback * Removing SparkUtil.safeOpenConnectionInitCaches * Update basicScenario.scala * Update basicScenario.scala --------- Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com>
1 parent 70d8722 commit 2d68169

File tree

26 files changed

+281
-59
lines changed

26 files changed

+281
-59
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ private class ChangeFeedMicroBatchStream
5959
containerConfig,
6060
clientCacheItem,
6161
throughputControlClientCacheItemOpt)
62-
SparkUtils.safeOpenConnectionInitCaches(container, log)
6362

6463
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6564

sdk/cosmos/azure-cosmos-spark_3-2_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ private class ChangeFeedMicroBatchStream
5454
containerConfig,
5555
clientCacheItem,
5656
throughputControlClientCacheItemOpt)
57-
SparkUtils.safeOpenConnectionInitCaches(container, log)
5857

5958
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6059

sdk/cosmos/azure-cosmos-spark_3-3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ private class ChangeFeedMicroBatchStream
5454
containerConfig,
5555
clientCacheItem,
5656
throughputControlClientCacheItemOpt)
57-
SparkUtils.safeOpenConnectionInitCaches(container, log)
5857

5958
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6059

sdk/cosmos/azure-cosmos-spark_3-4_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedMicroBatchStream.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ private class ChangeFeedMicroBatchStream
5656
containerConfig,
5757
clientCacheItem,
5858
throughputControlClientCacheItemOpt)
59-
SparkUtils.safeOpenConnectionInitCaches(container, log)
6059

6160
private var latestOffsetSnapshot: Option[ChangeFeedOffset] = None
6261

sdk/cosmos/azure-cosmos-spark_3_2-12/docs/configuration-reference.md

Lines changed: 12 additions & 10 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ private case class ChangeFeedPartitionReader
6868
containerTargetConfig,
6969
clientCacheItem,
7070
throughputControlClientCacheItemOpt)
71-
SparkUtils.safeOpenConnectionInitCaches(cosmosAsyncContainer, log)
7271

7372
private val partitionKeyDefinition: Option[PartitionKeyDefinition] =
7473
if (diagnosticsConfig.mode.isDefined &&

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosClientCache.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import com.azure.cosmos.models.{CosmosClientTelemetryConfig, CosmosMetricCategor
99
import com.azure.cosmos.spark.CosmosPredicates.isOnSparkDriver
1010
import com.azure.cosmos.spark.catalog.{CosmosCatalogClient, CosmosCatalogCosmosSDKClient, CosmosCatalogManagementSDKClient}
1111
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
12-
import com.azure.cosmos.{ConsistencyLevel, CosmosAsyncClient, CosmosClientBuilder, DirectConnectionConfig, GatewayConnectionConfig, ThrottlingRetryOptions}
12+
import com.azure.cosmos.{ConsistencyLevel, CosmosAsyncClient, CosmosClientBuilder, CosmosContainerProactiveInitConfigBuilder, DirectConnectionConfig, GatewayConnectionConfig, ThrottlingRetryOptions}
1313
import com.azure.identity.ClientSecretCredentialBuilder
1414
import com.azure.resourcemanager.cosmos.CosmosManager
1515
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
@@ -275,6 +275,19 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
275275
.setIoThreadPriority(directConfig, Thread.MAX_PRIORITY)
276276

277277
builder = builder.directMode(directConfig)
278+
279+
if (cosmosClientConfiguration.proactiveConnectionInitialization.isDefined &&
280+
!cosmosClientConfiguration.proactiveConnectionInitialization.get.isEmpty) {
281+
val containerIdentities = CosmosAccountConfig.parseProactiveConnectionInitConfigs(
282+
cosmosClientConfiguration.proactiveConnectionInitialization.get)
283+
284+
val initConfig = new CosmosContainerProactiveInitConfigBuilder(containerIdentities)
285+
.setAggressiveWarmupDuration(
286+
Duration.ofSeconds(cosmosClientConfiguration.proactiveConnectionInitializationDurationInSeconds))
287+
.setProactiveConnectionRegionsCount(1)
288+
.build
289+
builder.openConnectionsAndInitCaches(initConfig)
290+
}
278291
}
279292

280293
if (cosmosClientConfiguration.preferredRegionsList.isDefined) {
@@ -441,6 +454,9 @@ private[spark] object CosmosClientCache extends BasicLoggingTrait {
441454
authConfig: CosmosAuthConfig,
442455
applicationName: String,
443456
useGatewayMode: Boolean,
457+
// Intentionally not looking at proactive connection
458+
// initialization to distinguish cache key
459+
// You would never want to clients just for the diffs
444460
httpConnectionPoolSize: Int,
445461
useEventualConsistency: Boolean,
446462
preferredRegionsList: String)

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosClientConfiguration.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ private[spark] case class CosmosClientConfiguration (
1414
customApplicationNameSuffix: Option[String],
1515
applicationName: String,
1616
useGatewayMode: Boolean,
17+
proactiveConnectionInitialization: Option[String],
18+
proactiveConnectionInitializationDurationInSeconds: Int,
1719
httpConnectionPoolSize: Int,
1820
useEventualConsistency: Boolean,
1921
enableClientTelemetry: Boolean,
@@ -67,6 +69,8 @@ private[spark] object CosmosClientConfiguration {
6769
customApplicationNameSuffix,
6870
applicationName,
6971
cosmosAccountConfig.useGatewayMode,
72+
cosmosAccountConfig.proactiveConnectionInitialization,
73+
cosmosAccountConfig.proactiveConnectionInitializationDurationInSeconds,
7074
cosmosAccountConfig.httpConnectionPoolSize,
7175
useEventualConsistency,
7276
enableClientTelemetry = diagnosticsConfig.isClientTelemetryEnabled,

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.azure.core.management.AzureEnvironment
77
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
88
import com.azure.cosmos.implementation.routing.LocationHelper
99
import com.azure.cosmos.implementation.{Configs, SparkBridgeImplementationInternal, Strings}
10-
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange, PartitionKeyDefinition}
10+
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosContainerIdentity, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange, PartitionKeyDefinition}
1111
import com.azure.cosmos.spark.ChangeFeedModes.ChangeFeedMode
1212
import com.azure.cosmos.spark.ChangeFeedStartFromModes.{ChangeFeedStartFromMode, PointInTime}
1313
import com.azure.cosmos.spark.CosmosAuthType.CosmosAuthType
@@ -59,6 +59,8 @@ private[spark] object CosmosConfigNames {
5959
val DisableTcpConnectionEndpointRediscovery = "spark.cosmos.disableTcpConnectionEndpointRediscovery"
6060
val ApplicationName = "spark.cosmos.applicationName"
6161
val UseGatewayMode = "spark.cosmos.useGatewayMode"
62+
val ProactiveConnectionInitialization = "spark.cosmos.proactiveConnectionInitialization"
63+
val ProactiveConnectionInitializationDurationInSeconds = "spark.cosmos.proactiveConnectionInitializationDurationInSeconds"
6264
val GatewayConnectionPoolSize = "spark.cosmos.http.connectionPoolSize"
6365
val AllowInvalidJsonWithDuplicateJsonProperties = "spark.cosmos.read.allowInvalidJsonWithDuplicateJsonProperties"
6466
val ReadCustomQuery = "spark.cosmos.read.customQuery"
@@ -149,6 +151,8 @@ private[spark] object CosmosConfigNames {
149151
DisableTcpConnectionEndpointRediscovery,
150152
ApplicationName,
151153
UseGatewayMode,
154+
ProactiveConnectionInitialization,
155+
ProactiveConnectionInitializationDurationInSeconds,
152156
GatewayConnectionPoolSize,
153157
AllowInvalidJsonWithDuplicateJsonProperties,
154158
ReadCustomQuery,
@@ -324,6 +328,8 @@ private case class CosmosAccountConfig(endpoint: String,
324328
accountName: String,
325329
applicationName: Option[String],
326330
useGatewayMode: Boolean,
331+
proactiveConnectionInitialization: Option[String],
332+
proactiveConnectionInitializationDurationInSeconds: Int,
327333
httpConnectionPoolSize: Int,
328334
disableTcpConnectionEndpointRediscovery: Boolean,
329335
preferredRegionsList: Option[Array[String]],
@@ -402,6 +408,27 @@ private object CosmosAccountConfig {
402408
parseFromStringFunction = useGatewayMode => useGatewayMode.toBoolean,
403409
helpMessage = "Use gateway mode for the client operations")
404410

411+
private val ProactiveConnectionInitialization = CosmosConfigEntry[String](key = CosmosConfigNames.ProactiveConnectionInitialization,
412+
mandatory = false,
413+
defaultValue = None,
414+
parseFromStringFunction = proactiveConnectionInitializationText => {
415+
// force parsing and validation of config string. CosmosContainerIdentity is not serializable
416+
// so delaying the actual conversion
417+
parseProactiveConnectionInitConfigs(proactiveConnectionInitializationText)
418+
proactiveConnectionInitializationText
419+
},
420+
helpMessage = "Enable proactive connection initialization. This will result in keeping warmed-up connections "
421+
+ "to each replica. Config should be formatted like "
422+
+ "`DBName1/ContainerName1;DBName2/ContainerName2;DBName1/ContainerName3`")
423+
424+
private val ProactiveConnectionInitializationDurationInSeconds = CosmosConfigEntry[Int](key = CosmosConfigNames.ProactiveConnectionInitializationDurationInSeconds,
425+
mandatory = false,
426+
defaultValue = Some(120),
427+
parseFromStringFunction = secondsText => secondsText.toInt,
428+
helpMessage = "The duration in seconds that Cosmos client initialization should wait and allow connections "
429+
+ "being warmed-up aggressively. After this duration the remaining connections will be slowly opened "
430+
+ "on a single thread in the background.")
431+
405432
private val HttpConnectionPoolSize = CosmosConfigEntry[Integer](key = CosmosConfigNames.GatewayConnectionPoolSize,
406433
mandatory = false,
407434
defaultValue = Some(Configs.getDefaultHttpPoolSize),
@@ -451,12 +478,32 @@ private object CosmosAccountConfig {
451478
},
452479
helpMessage = "The azure environment of the CosmosDB account: `Azure`, `AzureChina`, `AzureUsGovernment`, `AzureGermany`.")
453480

481+
private[spark] def parseProactiveConnectionInitConfigs(config: String): java.util.List[CosmosContainerIdentity] = {
482+
val result = new java.util.ArrayList[CosmosContainerIdentity]
483+
try {
484+
val identities = config.split(";")
485+
for (identity: String <- identities) {
486+
val parts = identity.split("/")
487+
result.add(new CosmosContainerIdentity(parts.apply(0).trim, parts.apply(1).trim))
488+
}
489+
490+
result
491+
}
492+
catch {
493+
case e: Exception => throw new IllegalArgumentException(
494+
s"Invalid proactive connection initialization config $config. The string must be a list of containers to "
495+
+ "be warmed-up in the format of `DBName1/ContainerName1;DBName2/ContainerName2;DBName1/ContainerName3`")
496+
}
497+
}
498+
454499
def parseCosmosAccountConfig(cfg: Map[String, String]): CosmosAccountConfig = {
455500
val endpointOpt = CosmosConfigEntry.parse(cfg, CosmosAccountEndpointUri)
456501
val authConfig = CosmosAuthConfig.parseCosmosAuthConfig(cfg)
457502
val accountName = CosmosConfigEntry.parse(cfg, CosmosAccountName)
458503
val applicationName = CosmosConfigEntry.parse(cfg, ApplicationName)
459504
val useGatewayMode = CosmosConfigEntry.parse(cfg, UseGatewayMode)
505+
val proactiveConnectionInitialization = CosmosConfigEntry.parse(cfg, ProactiveConnectionInitialization)
506+
val proactiveConnectionInitializationDurationInSeconds = CosmosConfigEntry.parse(cfg, ProactiveConnectionInitializationDurationInSeconds)
460507
val httpConnectionPoolSize = CosmosConfigEntry.parse(cfg, HttpConnectionPoolSize)
461508
val subscriptionIdOpt = CosmosConfigEntry.parse(cfg, SubscriptionId)
462509
val resourceGroupNameOpt = CosmosConfigEntry.parse(cfg, ResourceGroupName)
@@ -511,6 +558,8 @@ private object CosmosAccountConfig {
511558
accountName.get,
512559
applicationName,
513560
useGatewayMode.get,
561+
proactiveConnectionInitialization,
562+
proactiveConnectionInitializationDurationInSeconds.get,
514563
httpConnectionPoolSize.get,
515564
disableTcpConnectionEndpointRediscovery.get,
516565
preferredRegionsListOpt,

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosPartitionPlanner.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,6 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {
656656
cosmosContainerConfig,
657657
clientCacheItems(0).get,
658658
clientCacheItems(1))
659-
SparkUtils.safeOpenConnectionInitCaches(container, (msg, e) => logWarning(msg, e))
660659

661660
ContainerFeedRangesCache
662661
.getFeedRanges(container)

0 commit comments

Comments
 (0)