Skip to content

Commit b03098e

Browse files
Refactoring to make CosmosReadManyRequestOptions not have a base class (#38849)
* Refactoring to make CosmosReadManyRequestOptions not have a base class * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosReadManyRequestOptions.java Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com> * Update CosmosReadManyRequestOptions.java --------- Co-authored-by: Annie Liang <64233642+xinlian12@users.noreply.github.com>
1 parent 1c5ab25 commit b03098e

26 files changed

+772
-609
lines changed

sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ public final class CosmosEncryptionAsyncContainer {
8282
private final CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient;
8383
private final static ImplementationBridgeHelpers.CosmosItemResponseHelper.CosmosItemResponseBuilderAccessor cosmosItemResponseBuilderAccessor = ImplementationBridgeHelpers.CosmosItemResponseHelper.getCosmosItemResponseBuilderAccessor();
8484
private final static ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.CosmosItemRequestOptionsAccessor cosmosItemRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosItemRequestOptionsHelper.getCosmosItemRequestOptionsAccessor();
85-
private final static ImplementationBridgeHelpers.CosmosQueryRequestOptionsBaseHelper.CosmosQueryRequestOptionsBaseAccessor cosmosQueryRequestOptionsBaseAccessor = ImplementationBridgeHelpers.CosmosQueryRequestOptionsBaseHelper.getCosmosQueryRequestOptionsBaseAccessor();
8685
private final static ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.CosmosQueryRequestOptionsAccessor cosmosQueryRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosQueryRequestOptionsHelper.getCosmosQueryRequestOptionsAccessor();
8786
private final static ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.CosmosChangeFeedRequestOptionsAccessor cosmosChangeFeedRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor();
8887
private final static ImplementationBridgeHelpers.CosmosAsyncContainerHelper.CosmosAsyncContainerAccessor cosmosAsyncContainerAccessor = ImplementationBridgeHelpers.CosmosAsyncContainerHelper.getCosmosAsyncContainerAccessor();
@@ -1496,21 +1495,22 @@ private <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperat
14961495
});
14971496
}
14981497

1499-
15001498
private void setRequestHeaders(CosmosItemRequestOptions requestOptions) {
15011499
cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
15021500
cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
15031501
}
15041502

15051503
private CosmosQueryRequestOptions setRequestHeaders(CosmosQueryRequestOptions requestOptions) {
1506-
cosmosQueryRequestOptionsBaseAccessor.setHeader(
1507-
cosmosQueryRequestOptionsAccessor.getImpl(requestOptions),
1508-
Constants.IS_CLIENT_ENCRYPTED_HEADER,
1509-
"true");
1510-
cosmosQueryRequestOptionsBaseAccessor.setHeader(
1511-
cosmosQueryRequestOptionsAccessor.getImpl(requestOptions),
1512-
Constants.INTENDED_COLLECTION_RID_HEADER,
1513-
this.encryptionProcessor.getContainerRid());
1504+
cosmosQueryRequestOptionsAccessor
1505+
.getImpl(requestOptions)
1506+
.setHeader(
1507+
Constants.IS_CLIENT_ENCRYPTED_HEADER,
1508+
"true");
1509+
cosmosQueryRequestOptionsAccessor
1510+
.getImpl(requestOptions)
1511+
.setHeader(
1512+
Constants.INTENDED_COLLECTION_RID_HEADER,
1513+
this.encryptionProcessor.getContainerRid());
15141514
return requestOptions;
15151515
}
15161516

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,12 @@ private class BulkWriter(container: CosmosAsyncContainer,
324324

325325
// for each batch, use readMany to read items from cosmosdb
326326
val requestOptions = new CosmosReadManyRequestOptions()
327-
ThroughputControlHelper.populateThroughputControlGroupName(requestOptions, writeConfig.throughputControlConfig)
327+
val requestOptionsImpl = ImplementationBridgeHelpers
328+
.CosmosReadManyRequestOptionsHelper
329+
.getCosmosReadManyRequestOptionsAccessor.getImpl(requestOptions)
330+
ThroughputControlHelper.populateThroughputControlGroupName(
331+
requestOptionsImpl,
332+
writeConfig.throughputControlConfig)
328333
ImplementationBridgeHelpers
329334
.CosmosAsyncContainerHelper
330335
.getCosmosAsyncContainerAccessor

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,11 @@ private case class ItemsPartitionReader
6767

6868
val ctxAndListener = new OperationContextAndListenerTuple(operationContext, listener)
6969

70-
ImplementationBridgeHelpers.CosmosQueryRequestOptionsBaseHelper
71-
.getCosmosQueryRequestOptionsBaseAccessor
72-
.setOperationContext(
73-
ImplementationBridgeHelpers
74-
.CosmosQueryRequestOptionsHelper
75-
.getCosmosQueryRequestOptionsAccessor
76-
.getImpl(queryOptions),
77-
ctxAndListener)
70+
ImplementationBridgeHelpers
71+
.CosmosQueryRequestOptionsHelper
72+
.getCosmosQueryRequestOptionsAccessor
73+
.getImpl(queryOptions)
74+
.setOperationContextAndListenerTuple(ctxAndListener)
7875

7976
Some(ctxAndListener)
8077
} else {
@@ -156,14 +153,11 @@ private case class ItemsPartitionReader
156153
DiagnosticsLoader.getDiagnosticsProvider(diagnosticsConfig).getLogger(this.getClass)
157154

158155
val operationContextAndListenerTuple = new OperationContextAndListenerTuple(taskDiagnosticsContext, listener)
159-
ImplementationBridgeHelpers.CosmosQueryRequestOptionsBaseHelper
160-
.getCosmosQueryRequestOptionsBaseAccessor
161-
.setOperationContext(
162-
ImplementationBridgeHelpers
163-
.CosmosQueryRequestOptionsHelper
164-
.getCosmosQueryRequestOptionsAccessor
165-
.getImpl(queryOptions),
166-
operationContextAndListenerTuple)
156+
ImplementationBridgeHelpers
157+
.CosmosQueryRequestOptionsHelper
158+
.getCosmosQueryRequestOptionsAccessor
159+
.getImpl(queryOptions)
160+
.setOperationContextAndListenerTuple(operationContextAndListenerTuple)
167161

168162
taskDiagnosticsContext
169163
} else{
@@ -178,13 +172,10 @@ private case class ItemsPartitionReader
178172
queryOptions.setFeedRange(SparkBridgeImplementationInternal.toFeedRange(feedRange))
179173

180174
ImplementationBridgeHelpers
181-
.CosmosQueryRequestOptionsBaseHelper
182-
.getCosmosQueryRequestOptionsBaseAccessor
175+
.CosmosQueryRequestOptionsHelper
176+
.getCosmosQueryRequestOptionsAccessor
177+
.getImpl(queryOptions)
183178
.setItemFactoryMethod(
184-
ImplementationBridgeHelpers
185-
.CosmosQueryRequestOptionsHelper
186-
.getCosmosQueryRequestOptionsAccessor
187-
.getImpl(queryOptions),
188179
jsonNode => {
189180
val objectNode = cosmosRowConverter.ensureObjectNode(jsonNode)
190181

@@ -251,13 +242,10 @@ private case class ItemsPartitionReader
251242
queryOptions.setDedicatedGatewayRequestOptions(readConfig.dedicatedGatewayRequestOptions)
252243

253244
ImplementationBridgeHelpers
254-
.CosmosQueryRequestOptionsBaseHelper
255-
.getCosmosQueryRequestOptionsBaseAccessor
245+
.CosmosQueryRequestOptionsHelper
246+
.getCosmosQueryRequestOptionsAccessor
247+
.getImpl(queryOptions)
256248
.setCorrelationActivityId(
257-
ImplementationBridgeHelpers
258-
.CosmosQueryRequestOptionsHelper
259-
.getCosmosQueryRequestOptionsAccessor
260-
.getImpl(queryOptions),
261249
diagnosticsContext.correlationActivityId)
262250

263251
cosmosAsyncContainer.queryItems(cosmosQuery.toSqlQuerySpec, queryOptions, classOf[SparkRowItem])

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReaderWithReadMany.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@ private[spark] case class ItemsPartitionReaderWithReadMany
3535
private lazy val log = LoggerHelper.getLogger(diagnosticsConfig, this.getClass)
3636

3737
private val readManyOptions = new CosmosReadManyRequestOptions()
38+
private val readManyOptionsImpl = ImplementationBridgeHelpers
39+
.CosmosReadManyRequestOptionsHelper
40+
.getCosmosReadManyRequestOptionsAccessor
41+
.getImpl(readManyOptions)
3842

3943
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
40-
ThroughputControlHelper.populateThroughputControlGroupName(readManyOptions, readConfig.throughputControlConfig)
44+
ThroughputControlHelper.populateThroughputControlGroupName(readManyOptionsImpl, readConfig.throughputControlConfig)
4145

4246
private val operationContext = {
4347
assert(taskContext != null)
@@ -56,9 +60,8 @@ private[spark] case class ItemsPartitionReaderWithReadMany
5660

5761
val ctxAndListener = new OperationContextAndListenerTuple(operationContext, listener)
5862

59-
ImplementationBridgeHelpers.CosmosQueryRequestOptionsBaseHelper
60-
.getCosmosQueryRequestOptionsBaseAccessor
61-
.setOperationContext(readManyOptions, ctxAndListener)
63+
readManyOptionsImpl
64+
.setOperationContextAndListenerTuple(ctxAndListener)
6265

6366
Some(ctxAndListener)
6467
} else {
@@ -116,11 +119,8 @@ private[spark] case class ItemsPartitionReaderWithReadMany
116119
readConfig.readManyFilteringConfig,
117120
partitionKeyDefinition)
118121

119-
ImplementationBridgeHelpers
120-
.CosmosQueryRequestOptionsBaseHelper
121-
.getCosmosQueryRequestOptionsBaseAccessor
122+
readManyOptionsImpl
122123
.setItemFactoryMethod(
123-
readManyOptions,
124124
jsonNode => {
125125
val objectNode = cosmosRowConverter.ensureObjectNode(jsonNode)
126126
val idValue = objectNode.get(IdAttributeName).asText()

sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/ThroughputControlHelper.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
package com.azure.cosmos.spark
55

6-
import com.azure.cosmos.implementation.ImplementationBridgeHelpers
7-
import com.azure.cosmos.models.{CosmosBulkExecutionOptions, CosmosChangeFeedRequestOptions, CosmosItemRequestOptions, CosmosQueryRequestOptionsBase, PriorityLevel}
6+
import com.azure.cosmos.implementation.{CosmosQueryRequestOptionsBase, ImplementationBridgeHelpers}
7+
import com.azure.cosmos.models.{CosmosBulkExecutionOptions, CosmosChangeFeedRequestOptions, CosmosItemRequestOptions, PriorityLevel}
88
import com.azure.cosmos.spark.diagnostics.BasicLoggingTrait
99
import com.azure.cosmos.{CosmosAsyncContainer, ThroughputControlGroupConfigBuilder}
1010
import org.apache.spark.broadcast.Broadcast

sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorITest.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,10 @@ class TransientIOErrorsRetryingIteratorITest
5555
val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig)
5656
val queryOptions = new CosmosQueryRequestOptions()
5757
ImplementationBridgeHelpers
58-
.CosmosQueryRequestOptionsBaseHelper
59-
.getCosmosQueryRequestOptionsBaseAccessor
58+
.CosmosQueryRequestOptionsHelper
59+
.getCosmosQueryRequestOptionsAccessor
60+
.getImpl(queryOptions)
6061
.setItemFactoryMethod(
61-
ImplementationBridgeHelpers
62-
.CosmosQueryRequestOptionsHelper
63-
.getCosmosQueryRequestOptionsAccessor
64-
.getImpl(queryOptions),
6562
jsonNode => {
6663
val row = cosmosRowConverter.fromObjectNodeToRow(
6764
ItemsTable.defaultSchemaForInferenceDisabled,
@@ -184,13 +181,10 @@ class TransientIOErrorsRetryingIteratorITest
184181
val cosmosRowConverter = CosmosRowConverter.get(cosmosSerializationConfig)
185182
val queryOptions = new CosmosQueryRequestOptions()
186183
ImplementationBridgeHelpers
187-
.CosmosQueryRequestOptionsBaseHelper
188-
.getCosmosQueryRequestOptionsBaseAccessor
184+
.CosmosQueryRequestOptionsHelper
185+
.getCosmosQueryRequestOptionsAccessor
186+
.getImpl(queryOptions)
189187
.setItemFactoryMethod(
190-
ImplementationBridgeHelpers
191-
.CosmosQueryRequestOptionsHelper
192-
.getCosmosQueryRequestOptionsAccessor
193-
.getImpl(queryOptions),
194188
jsonNode => {
195189
val row = cosmosRowConverter.fromObjectNodeToRow(
196190
ItemsTable.defaultSchemaForInferenceDisabled,

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import reactor.core.scheduler.Schedulers;
5252

5353
import java.nio.charset.StandardCharsets;
54-
import java.rmi.ConnectIOException;
5554
import java.time.Duration;
5655
import java.util.ArrayList;
5756
import java.util.Arrays;
@@ -925,14 +924,10 @@ public void queryItemsWithCustomCorrelationActivityId() throws Exception{
925924

926925
UUID correlationId = UUID.randomUUID();
927926
ImplementationBridgeHelpers
928-
.CosmosQueryRequestOptionsBaseHelper
929-
.getCosmosQueryRequestOptionsBaseAccessor()
930-
.setCorrelationActivityId(
931-
ImplementationBridgeHelpers
932-
.CosmosQueryRequestOptionsHelper
933-
.getCosmosQueryRequestOptionsAccessor()
934-
.getImpl(cosmosQueryRequestOptions),
935-
correlationId);
927+
.CosmosQueryRequestOptionsHelper
928+
.getCosmosQueryRequestOptionsAccessor()
929+
.getImpl(cosmosQueryRequestOptions)
930+
.setCorrelationActivityId(correlationId);
936931

937932
CosmosPagedIterable<InternalObjectNode> feedResponseIterator1 =
938933
container.queryItems(query, cosmosQueryRequestOptions, InternalObjectNode.class);

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/FetcherTest.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,11 @@ public void query(CosmosQueryRequestOptions options, int top) {
9898
ServerSideOnlyContinuationFetcherImpl<Document> fetcher =
9999
new ServerSideOnlyContinuationFetcherImpl<>(createRequestFunc, executeFunc, ModelBridgeInternal.getRequestContinuationFromQueryRequestOptions(options), false, top,
100100
ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(options),
101-
ImplementationBridgeHelpers
102-
.CosmosQueryRequestOptionsBaseHelper
103-
.getCosmosQueryRequestOptionsBaseAccessor()
104-
.getOperationContext(
105-
ImplementationBridgeHelpers
106-
.CosmosQueryRequestOptionsHelper
107-
.getCosmosQueryRequestOptionsAccessor()
108-
.getImpl(options)
109-
),
101+
ImplementationBridgeHelpers
102+
.CosmosQueryRequestOptionsHelper
103+
.getCosmosQueryRequestOptionsAccessor()
104+
.getImpl(options)
105+
.getOperationContextAndListenerTuple(),
110106
ImplementationBridgeHelpers
111107
.CosmosQueryRequestOptionsHelper
112108
.getCosmosQueryRequestOptionsAccessor()

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,19 +196,16 @@ public void queryOrderByWithValueAndCustomFactoryMethod(String sortOrder) throws
196196
String query = String.format("SELECT value r.propInt FROM r where r.propInt != null ORDER BY r.propInt %s", sortOrder);
197197
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
198198
ImplementationBridgeHelpers
199-
.CosmosQueryRequestOptionsBaseHelper
200-
.getCosmosQueryRequestOptionsBaseAccessor()
199+
.CosmosQueryRequestOptionsHelper
200+
.getCosmosQueryRequestOptionsAccessor()
201+
.getImpl(options)
201202
// Custom Factory Method will always get the ObjectNode - so if VALUE function is used
202203
// the value needs to be extracted manually. This is intentional right now
203204
// to allow late-binding the decision whether we really want to surface JsonNode or ObjectNode to
204205
// customers if we ever make the custom factory method public
205206
// For now in Spark don't need to worry about extracting values - we would need a wrapper to
206207
// allow inferring schema anyway.
207208
.setItemFactoryMethod(
208-
ImplementationBridgeHelpers
209-
.CosmosQueryRequestOptionsHelper
210-
.getCosmosQueryRequestOptionsAccessor()
211-
.getImpl(options),
212209
(node) -> node.get("_value").intValue());
213210

214211
int pageSize = 3;

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -748,9 +748,10 @@ public void readManyWithFactoryMethod() {
748748
Function<JsonNode, String> factoryMethod = (objectNode) -> objectNode.get("id").asText();
749749
CosmosReadManyRequestOptions queryRequestOptions = new CosmosReadManyRequestOptions();
750750
ImplementationBridgeHelpers
751-
.CosmosQueryRequestOptionsBaseHelper
752-
.getCosmosQueryRequestOptionsBaseAccessor()
753-
.setItemFactoryMethod(queryRequestOptions, factoryMethod);
751+
.CosmosReadManyRequestOptionsHelper
752+
.getCosmosReadManyRequestOptionsAccessor()
753+
.getImpl(queryRequestOptions)
754+
.setItemFactoryMethod(factoryMethod);
754755

755756
FeedResponse<String> documentFeedResponse =
756757
ImplementationBridgeHelpers

0 commit comments

Comments
 (0)