Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'change_feed_mode' to 'query_items_change_feed' API #38105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
165 commits
Select commit Hold shift + click to select a range
d82870e
Add 'change_feed_mode' to 'query_items_change_feed' API
allenkim0129 Oct 25, 2024
ced92d3
remove unnecessary import
allenkim0129 Oct 25, 2024
06a8e2f
Fix lint
allenkim0129 Oct 28, 2024
c1bd9bb
Updated CHANGELOG.md
allenkim0129 Oct 28, 2024
b663998
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Oct 28, 2024
cf9e4b0
Removed _feed_range.py
allenkim0129 Oct 28, 2024
e0a1b18
Addressed comments
allenkim0129 Oct 29, 2024
357b4f8
Fixed lint
allenkim0129 Oct 29, 2024
3592bac
Add kwargs back to 'call'QueryItemsChangeFeed'
allenkim0129 Oct 29, 2024
e36392c
Fixed syntax error with f-string
allenkim0129 Oct 29, 2024
e57aef3
Removed StrEnum to support earlier Python versions
allenkim0129 Oct 29, 2024
0071042
Fixed f-string error
allenkim0129 Oct 31, 2024
6d65a5d
addressed comments
allenkim0129 Nov 1, 2024
c7d51af
Addressed comments
allenkim0129 Nov 5, 2024
c87ec71
Removed unnecessary tests
allenkim0129 Nov 5, 2024
cf6f518
Fix tests for emulator
allenkim0129 Nov 5, 2024
8f462ce
Generating SDK with model renames (#38108)
hamshavathimunibyraiah Oct 28, 2024
92d01cb
[Identity][Monitor] Update live test setup (#37943)
pvaneck Oct 28, 2024
c4bc225
Clean-up cosmos test pipeline (#38126)
weshaggard Oct 28, 2024
2e57e49
Multi modal eval fix (#38134)
w-javed Oct 28, 2024
e6f9bc1
azure-ai-evaluation release 1.0.0b5 2024-10-28 (#38138)
nagkumar91 Oct 29, 2024
c328607
open apiview for mgmt sdk (#38143)
msyyc Oct 29, 2024
bf4db51
[AutoRelease] t2-appplatform-2024-10-25-72111(can only be merged by S…
azure-sdk Oct 29, 2024
1e5f7e9
AzurePipelinesCredential | adding mlflow uri func (#36580)
kshitij-microsoft Oct 29, 2024
71aa1a3
Update changelog (#38133)
sanchez-alex Oct 29, 2024
faf53ba
Increment package version after release of azure-ai-evaluation (#38142)
azure-sdk Oct 29, 2024
984f89d
Remove psycopg2-binary from dev_requirements.txt (#38103)
lzchen Oct 29, 2024
80eb904
[Evaluation] Error improve for service-based evaluator/simulator (#38…
ninghu Oct 29, 2024
5741d7b
AzMon exporter: Serialize complex log bodies to json and set dependen…
lmolkova Oct 29, 2024
cd55049
Experimental tags on ADV scenarios (#38166)
nagkumar91 Oct 29, 2024
1381ce3
Sync eng/common directory with azure-sdk-tools for PR 9259 (#38160)
azure-sdk Oct 29, 2024
bb075f5
Re-generated REST client after re-copying Swagger folder for `2024-10…
u9009 Oct 29, 2024
5ca4f33
[Evaluation] Change RougeType to Enum (#38131)
ninghu Oct 29, 2024
2063360
Auto-enable Azure AI Inference instrumentation in Azure Monitor, upda…
lmolkova Oct 30, 2024
6c83494
[AutoRelease] t2-redhatopenshift-2024-10-30-81004(can only be merged …
azure-sdk Oct 30, 2024
69fc484
[AutoRelease] t2-resourcehealth-2024-10-30-72592(can only be merged b…
azure-sdk Oct 30, 2024
19ca69b
[AutoRelease] t2-appconfiguration-2024-10-30-38914(can only be merged…
azure-sdk Oct 30, 2024
c2cdda1
[AutoRelease] t2-databox-2024-10-30-61405(can only be merged by SDK o…
azure-sdk Oct 30, 2024
af19113
[AutoRelease] t2-edgeorder-2024-10-30-57522(can only be merged by SDK…
azure-sdk Oct 30, 2024
142a578
[AutoRelease] t2-extendedlocation-2024-10-30-79235(can only be merged…
azure-sdk Oct 30, 2024
b214335
[AutoRelease] t2-digitaltwins-2024-10-30-74766(can only be merged by …
azure-sdk Oct 30, 2024
e3852f6
Added get_arm_info (#38018)
xiangyan99 Oct 30, 2024
a3c3f4d
Update CHANGELOG.md (#38170)
changliu2 Oct 30, 2024
20a251f
Minor Readme fix (#38191)
nagkumar91 Oct 30, 2024
6603ca3
Minor fixes in vanilla OTel tracing sample (#38194)
lmolkova Oct 30, 2024
1c3ffe2
Add test for get_arm_endpoints (#38196)
xiangyan99 Oct 30, 2024
0619608
Add overloads for __call__ methods that accept query/response and con…
needuv Oct 30, 2024
0c48225
[Monitor] Apply black formatting (#38129)
pvaneck Oct 30, 2024
9ec0264
[CI] Update autorest CI to use Python 3.9 (#38175)
pvaneck Oct 30, 2024
3f3ab5e
Eval qr json lines now has context from both turns and category if it…
nagkumar91 Oct 30, 2024
83dfe39
Fix doc issues (#38204)
YalinLi0312 Oct 30, 2024
2c3ad69
Evaluation: Remove `parallel` from composite evaluators (#38168)
ninghu Oct 30, 2024
052acb8
[Core] Allow operation-level tracing attributes (#38164)
pvaneck Oct 30, 2024
e44cb7c
Sync eng/common directory with azure-sdk-tools for PR 9281 (#38213)
azure-sdk Oct 30, 2024
070285e
Sync eng/common directory with azure-sdk-tools for PR 9290 (#38223)
azure-sdk Oct 31, 2024
7ab343b
update (#38220)
msyyc Oct 31, 2024
596b0f5
[AutoRelease] t2-containerservicefleet-2024-10-31-68497(can only be m…
azure-sdk Oct 31, 2024
090851a
[BatchAI] deprecate azure-mgmt-batchai (#38226)
swathipil Oct 31, 2024
a2dfe14
[ModelsRepository] deprecating azure-iot-modelrepository (#38225)
swathipil Oct 31, 2024
20e67ee
[ServerManager] deprecating azure-mgmt-servermanager (#38229)
swathipil Oct 31, 2024
2d4b381
[DocumentDB] deprecate azure-mgmt-documentdb (#38227)
swathipil Oct 31, 2024
499c99c
[EH/SB] ran black (#38210)
l0lawrence Oct 31, 2024
62ff0b7
Update randomization pattern for Adversarial simulation (#38211)
slister1001 Oct 31, 2024
e6c2c82
amqp msg (#38122)
l0lawrence Oct 31, 2024
25d4b5d
Implement live metrics filtering for charts (part 1) (#37998)
lzchen Oct 31, 2024
88ddb5d
Update CODEOWNERS for graphrbac owner (#38236)
swathipil Oct 31, 2024
75f9093
Multi modal docstring improvements (#38193)
w-javed Oct 31, 2024
23b5424
Increment package version after release of azure-core (#38240)
azure-sdk Oct 31, 2024
17ecb3c
kwarg type hints (#38214)
mrm9084 Oct 31, 2024
f9b29ee
[Evaluation] add environment variable for API token refresh rate (#38…
slister1001 Oct 31, 2024
e73ed33
[Evaluation] Default to non-randomized order of template parameters (…
slister1001 Oct 31, 2024
deaabd2
resolve issue with language-settings handling additional service chan…
scbedd Oct 31, 2024
20d8d9d
Reduce unnecessary delete calls to ARM for storage accounts (#38246)
azure-sdk Oct 31, 2024
9c380c9
clean up unused python script (#38128)
scbedd Oct 31, 2024
e3f68e1
Sync eng/common directory with azure-sdk-tools for PR 9288 (#38243)
azure-sdk Oct 31, 2024
127741d
[Scheduler] deprecate azure-mgmt-scheduler (#38228)
swathipil Oct 31, 2024
5f9f6f4
[ServiceManagement] deprecate azure-servicemanagement-legacy (#38230)
swathipil Oct 31, 2024
b678cd1
[GraphRBAC] deprecating package (#38224)
swathipil Oct 31, 2024
f1e8d6a
Enable py2docfx docs gen tool, remove the dockerimage docs validation…
JimSuplizio Oct 31, 2024
0655653
Sync eng/common directory with azure-sdk-tools for PR 9294 (#38251)
azure-sdk Oct 31, 2024
c9ef152
[core] add servicemanagement legacy to ci for release (#38253)
swathipil Oct 31, 2024
4fecbe3
Session Token Management APIs (#36971)
allenkim0129 Nov 5, 2024
e92a163
[AutoRelease] t2-network-2024-10-31-29845(can only be merged by SDK o…
azure-sdk Nov 1, 2024
7acf41b
[ServiceBus/EventHub] add service specific message annotations to rec…
swathipil Nov 1, 2024
9d2c7fc
Updating CODEOWNERS for Synapse (#38255)
swathipil Nov 1, 2024
d9ea7a8
Evaluation: Fix the `output_path` parameter of `evaluate` API doesn't…
ninghu Nov 1, 2024
0009679
[synapse] deprecate azure-synapse (#38262)
swathipil Nov 1, 2024
4c2ad17
[DocumentDB] update deprecation release date (#38265)
swathipil Nov 1, 2024
08107df
[CognitiveServices] deprecate vision packages (#38206)
swathipil Nov 1, 2024
677832b
RAI service input sanitization (#38247)
MilesHolland Nov 1, 2024
d9324d5
pass params from ci.yml to cosmos-sdk-client appropriately (#38272)
scbedd Nov 1, 2024
7d09bd3
Fix __call__ Overload Types (#38238)
needuv Nov 1, 2024
cc5c394
Update deprecation_process.md (#38270)
swathipil Nov 1, 2024
d49ac47
[DocumentDB] add changelog to manifest.ini (#38273)
swathipil Nov 1, 2024
e0dac67
[evaluation] Add support for using evaluate() with evaluators that ha…
diondrapeck Nov 2, 2024
d12aece
disabled black in pyproject.toml for all packages (#38271)
weirongw23-msft Nov 2, 2024
c1ef0e8
[AutoRelease] t2-postgresqlflexibleservers-2024-10-30-49242(can only …
azure-sdk Nov 4, 2024
b056604
[AutoRelease] t2-devtestlabs-2024-11-04-17468(can only be merged by S…
azure-sdk Nov 4, 2024
f1c1a0a
[AutoRelease] t2-sql-2024-10-03-42323(can only be merged by SDK owner…
azure-sdk Nov 4, 2024
a7d5ca0
[EventHub] add ssl_context kwarg to clients (#37702)
swathipil Nov 4, 2024
5eedf8b
Update CHANGELOG.md (#38301)
lzchen Nov 4, 2024
96ba2ea
download_file is fully annotated (#38284)
weirongw23-msft Nov 4, 2024
620fd8c
Release azure-monitor-opentelemetry-exporter (#38310)
lzchen Nov 4, 2024
d9d8ca8
Increment package version after release of azure-monitor-opentelemetr…
azure-sdk Nov 4, 2024
3560c15
Eval/bugfix/content safety parallel (#38307)
MilesHolland Nov 4, 2024
9a453d6
target newly released proxy version (#38282)
azure-sdk Nov 4, 2024
63dd783
[Storage] Added connection pool note to `max_concurrency` kwarg for u…
weirongw23-msft Nov 4, 2024
cab8727
Sync eng/common directory with azure-sdk-tools for PR 9308 (#38311)
azure-sdk Nov 4, 2024
4b85898
Version/location updates for stress script usage (#38281)
azure-sdk Nov 5, 2024
e6855c2
[AutoRelease] t2-loganalytics-2024-11-04-45063(can only be merged by …
azure-sdk Nov 5, 2024
e70e2b4
[AutoRelease] t2-automation-2024-11-04-74277(can only be merged by SD…
azure-sdk Nov 5, 2024
c3ce7dc
Broker on mac support (#38274)
xiangyan99 Nov 5, 2024
02c72b7
Add firewallsku as ManagedNetwork property (#37885)
Nethracs Nov 5, 2024
4f1a889
[AutoRelease] t2-managementgroups-2024-11-04-45946(can only be merged…
azure-sdk Nov 5, 2024
61228e6
[AutoRelease] t2-managedservices-2024-11-04-44075(can only be merged …
azure-sdk Nov 5, 2024
7c8af04
[AutoRelease] t2-marketplaceordering-2024-11-04-08673(can only be mer…
azure-sdk Nov 5, 2024
b5b8d8c
[AutoRelease] t2-servicebus-2024-11-04-58886(can only be merged by SD…
azure-sdk Nov 5, 2024
53b79b0
[Synapse] azure-synapse post deprecation (#38315)
swathipil Nov 5, 2024
3a539de
[Cognitive Services] vision post-deprecation (#38304)
swathipil Nov 5, 2024
5a3a914
[Cosmos] documentdb post deprecation (#38314)
swathipil Nov 5, 2024
a726059
[sdk generation pipeline] fix logic to extract swagger file (#38334)
msyyc Nov 5, 2024
c78ea56
Update deprecation_process.md for Verify Readmes failure (#38333)
swathipil Nov 5, 2024
5d20ca2
Increment package version after release of azure-identity-broker (#38…
azure-sdk Nov 5, 2024
f7926ae
Edit pass on Azure Identity Broker README (#38339)
scottaddie Nov 5, 2024
eca23e6
[Core] Deprecate OpenCensus tracing plugin (#37975)
pvaneck Nov 5, 2024
98822e2
[Core] servicemanagement-legacy post deprecation (#38319)
swathipil Nov 5, 2024
a0ea744
Prompt support for Inference SDK (#37917)
YusakuNo1 Nov 5, 2024
6077526
Remove a defunct variable from docindex.yml (#38342)
JimSuplizio Nov 5, 2024
c34c723
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 5, 2024
b20c6dc
Fix errors from sphinx and mypy
allenkim0129 Nov 6, 2024
db24ede
Changed parameter to `mode`
allenkim0129 Nov 6, 2024
394d7b3
Fixed typo
allenkim0129 Nov 6, 2024
8e54e8f
Changed 'mode' to be string type
allenkim0129 Nov 6, 2024
cff439f
Reverted necessary type def
allenkim0129 Nov 7, 2024
e43b025
Addressed comments
allenkim0129 Nov 7, 2024
fcf4c04
Added samples for change_feed_mode
allenkim0129 Nov 8, 2024
e8b7fcd
Addressed comments
allenkim0129 Nov 11, 2024
f67326b
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 11, 2024
a5f8e46
Removed unnecessary docstring
allenkim0129 Nov 13, 2024
b2c0a5e
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 14, 2024
b3126fb
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 15, 2024
5b49d0f
Remove mode if 'continuation' was in override definition
allenkim0129 Nov 15, 2024
8b29b72
add test samples tracking (#38502)
kristapratico Nov 15, 2024
ae91385
Add OpenTelemetry LoggingHandler conditionally (#38549)
lzchen Nov 15, 2024
faf6625
Add helpers to log a GitHub "notice" (#38574)
azure-sdk Nov 16, 2024
1fed8a5
[AutoRelease] t2-cosmosdb-2024-11-14-60943(can only be merged by SDK …
azure-sdk Nov 18, 2024
dfe0768
[AutoRelease] t2-mysqlflexibleservers-2024-11-05-47456(can only be me…
azure-sdk Nov 18, 2024
1154089
[AutoRelease] t2-netapp-2024-11-08-58381(can only be merged by SDK ow…
azure-sdk Nov 18, 2024
d3700a9
Shrike (#38560)
achauhan-scc Nov 18, 2024
808fd67
Datastore auth bug (#38586)
achauhan-scc Nov 18, 2024
839ee96
Increment package version after release of azure-search-documents (#3…
azure-sdk Nov 18, 2024
ea1c691
Merge App Config Provider Beta to Main (#38579)
mrm9084 Nov 18, 2024
e716f4b
batching adjustments for create-prjobmatrix (#38597)
azure-sdk Nov 18, 2024
228fbcd
[EG] resource notification event (#38100)
l0lawrence Nov 18, 2024
9549143
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 18, 2024
c6e2f1b
Updated doc strings
allenkim0129 Nov 18, 2024
217dabc
Merged main
allenkim0129 Nov 21, 2024
4334dc1
Addressed comments
allenkim0129 Nov 21, 2024
19ebb4e
Revert "Merged main"
allenkim0129 Nov 22, 2024
4ffd502
Merge branch 'main' into users/allekim/feature/feedRangeAllVersionsAn…
allenkim0129 Nov 22, 2024
8a3d3dd
Added comment why it is safe to raise exception if mode was missing
allenkim0129 Nov 22, 2024
ba938f6
Added comment why it is safe to raise exception if mode was missing
allenkim0129 Nov 22, 2024
3674c6d
Merge remote-tracking branch 'origin/users/allekim/feature/feedRangeA…
allenkim0129 Nov 22, 2024
4231057
Moved the feature update log under unreleased features
allenkim0129 Nov 22, 2024
eb093a4
Add missing period in changelog
allenkim0129 Nov 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.9.1 (Unreleased)

#### Features Added
* Added change feed mode support in `query_items_change_feed`. See [PR 38105](https://github.com/Azure/azure-sdk-for-python/pull/38105)

#### Breaking Changes

Expand Down
4 changes: 3 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@
'supported_query_features': 'supportedQueryFeatures',
'query_version': 'queryVersion',
'priority': 'priorityLevel',
'no_response': 'responsePayloadOnWriteDisabled'
'no_response': 'responsePayloadOnWriteDisabled',
'max_item_count': 'maxItemCount',
}

# Cosmos resource ID validation regex breakdown:
Expand Down Expand Up @@ -170,6 +171,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
# set consistency level. check if set via options, this will override the default
if options.get("consistencyLevel"):
consistency_level = options["consistencyLevel"]
# TODO: move this line outside of if-else cause to remove the code duplication
headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
elif default_client_consistency_level is not None:
consistency_level = default_client_consistency_level
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Union, List, Any, Dict, Deque
import logging
from typing_extensions import Literal
allenkim0129 marked this conversation as resolved.
Show resolved Hide resolved

from azure.cosmos import http_constants
from azure.cosmos._change_feed.change_feed_start_from import ChangeFeedStartFromInternal, \
Expand Down Expand Up @@ -176,18 +178,18 @@ def apply_server_response_continuation(self, continuation: str, has_modified_res

class ChangeFeedStateV2(ChangeFeedState):
container_rid_property_name = "containerRid"
change_feed_mode_property_name = "mode"
mode_property_name = "mode"
change_feed_start_from_property_name = "startFrom"
continuation_property_name = "continuation"

# TODO: adding change feed mode
def __init__(
self,
container_link: str,
container_rid: str,
feed_range: FeedRangeInternal,
change_feed_start_from: ChangeFeedStartFromInternal,
continuation: Optional[FeedRangeCompositeContinuation]
continuation: Optional[FeedRangeCompositeContinuation],
mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]]
) -> None:

self._container_link = container_link
Expand All @@ -208,6 +210,8 @@ def __init__(
else:
self._continuation = continuation

self._mode = "LatestVersion" if mode is None else mode

super(ChangeFeedStateV2, self).__init__(ChangeFeedStateVersion.V2)

@property
Expand All @@ -218,17 +222,14 @@ def to_dict(self) -> Dict[str, Any]:
return {
self.version_property_name: ChangeFeedStateVersion.V2.value,
self.container_rid_property_name: self._container_rid,
self.change_feed_mode_property_name: "LatestVersion",
self.mode_property_name: self._mode,
self.change_feed_start_from_property_name: self._change_feed_start_from.to_dict(),
self.continuation_property_name: self._continuation.to_dict() if self._continuation is not None else None
}

def populate_request_headers(
def set_start_from_request_headers(
self,
routing_provider: SmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue

# When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time
# of the documents may not be sequential.
# So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
Expand All @@ -243,11 +244,10 @@ def populate_request_headers(
self._continuation.current_token.feed_range)
change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
over_lapping_ranges =\
routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range])
def set_pk_range_id_request_headers(
self,
over_lapping_ranges,
request_headers: Dict[str, Any]) -> None:

if len(over_lapping_ranges) > 1:
raise self.get_feed_range_gone_error(over_lapping_ranges)
Expand All @@ -260,53 +260,51 @@ def populate_request_headers(
# the current token feed range spans less than single physical partition
# for this case, need to set both the partition key range id and epk filter headers
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"]
request_headers[
http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min
request_headers[
http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max
request_headers[http_constants.HttpHeaders.StartEpkString] = self._continuation.current_token.feed_range.min
request_headers[http_constants.HttpHeaders.EndEpkString] = self._continuation.current_token.feed_range.max

async def populate_request_headers_async(
def set_mode_request_headers(
self,
async_routing_provider: AsyncSmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
if self._mode == "AllVersionsAndDeletes":
request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.FullFidelityFeedHeaderValue
request_headers[http_constants.HttpHeaders.ChangeFeedWireFormatVersion] = \
http_constants.HttpHeaders.SeparateMetaWithCrts
else:
request_headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue

# When a merge happens, the child partition will contain documents ordered by LSN but the _ts/creation time
# of the documents may not be sequential.
# So when reading the changeFeed by LSN, it is possible to encounter documents with lower _ts.
# In order to guarantee we always get the documents after customer's point start time,
# we will need to always pass the start time in the header.
self._change_feed_start_from.populate_request_headers(request_headers)
def populate_request_headers(
self,
routing_provider: SmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
self.set_start_from_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
over_lapping_ranges = \
routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range])

self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers)

self.set_mode_request_headers(request_headers)

if self._continuation.current_token is not None and self._continuation.current_token.token is not None:
change_feed_start_from_feed_range_and_etag = \
ChangeFeedStartFromETagAndFeedRange(
self._continuation.current_token.token,
self._continuation.current_token.feed_range)
change_feed_start_from_feed_range_and_etag.populate_request_headers(request_headers)

async def populate_request_headers_async(
self,
async_routing_provider: AsyncSmartRoutingMapProvider,
request_headers: Dict[str, Any]) -> None:
self.set_start_from_request_headers(request_headers)

# based on the feed range to find the overlapping partition key range id
over_lapping_ranges = \
await async_routing_provider.get_overlapping_ranges(
self._container_link,
[self._continuation.current_token.feed_range])

if len(over_lapping_ranges) > 1:
raise self.get_feed_range_gone_error(over_lapping_ranges)
self.set_pk_range_id_request_headers(over_lapping_ranges, request_headers)

overlapping_feed_range = Range.PartitionKeyRangeToRange(over_lapping_ranges[0])
if overlapping_feed_range == self._continuation.current_token.feed_range:
# exactly mapping to one physical partition, only need to set the partitionKeyRangeId
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_ranges[0]["id"]
else:
# the current token feed range spans less than single physical partition
# for this case, need to set both the partition key range id and epk filter headers
request_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = \
over_lapping_ranges[0]["id"]
request_headers[http_constants.HttpHeaders.StartEpkString] = \
self._continuation.current_token.feed_range.min
request_headers[http_constants.HttpHeaders.EndEpkString] = \
self._continuation.current_token.feed_range.max
self.set_mode_request_headers(request_headers)

def populate_feed_options(self, feed_options: Dict[str, Any]) -> None:
pass
Expand Down Expand Up @@ -367,12 +365,20 @@ def from_continuation(
if continuation_data is None:
raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.continuation_property_name}]")
continuation = FeedRangeCompositeContinuation.from_json(continuation_data)
return ChangeFeedStateV2(

mode = continuation_json.get(ChangeFeedStateV2.mode_property_name)
# All 'continuation_json' from ChangeFeedStateV2 must contain 'mode' property. For the 'continuation_json'
# from older ChangeFeedState versions won't even hit this point, since their version is not 'v2'.
if mode is None:
raise ValueError(f"Invalid continuation: [Missing {ChangeFeedStateV2.mode_property_name}]")
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved

return cls(
container_link=container_link,
container_rid=container_rid,
feed_range=continuation.feed_range,
change_feed_start_from=change_feed_start_from,
continuation=continuation)
continuation=continuation,
mode=mode)

@classmethod
def from_initial_state(
Expand All @@ -394,6 +400,7 @@ def from_initial_state(
raise ValueError("partitionKey is in the changeFeedStateContext, but missing partitionKeyFeedRange")
else:
# default to full range
logging.info("'feed_range' empty. Using full range by default.")
feed_range = FeedRangeInternalEpk(
Range(
"",
Expand All @@ -405,11 +412,12 @@ def from_initial_state(
change_feed_start_from = (
ChangeFeedStartFromInternal.from_start_time(change_feed_state_context.get("startTime")))

if feed_range is not None:
return cls(
container_link=container_link,
container_rid=collection_rid,
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None)
raise ValueError("feed_range is empty")
mode = change_feed_state_context.get("mode")

return cls(
container_link=container_link,
container_rid=collection_rid,
feed_range=feed_range,
change_feed_start_from=change_feed_start_from,
continuation=None,
mode=mode)
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal Helper functions in the Azure Cosmos database change_feed service.
"""

import warnings
from datetime import datetime
from typing import Any, Dict, Tuple

CHANGE_FEED_MODES = ["LatestVersion", "AllVersionsAndDeletes"]

def add_args_to_kwargs(
args: Tuple[Any, ...],
kwargs: Dict[str, Any]
) -> None:
"""Add positional arguments(args) to keyword argument dictionary(kwargs).
Since 'query_items_change_feed' method only allows the following 4 positional arguments in the exact order
and types, if the order and types don't match, errors will be raised.
If the positional arguments are in the correct orders and types, the arguments will be added to keyword arguments.

4 positional arguments:
- str 'partition_key_range_id': [Deprecated] ChangeFeed requests can be executed against specific partition
key ranges. This is used to process the change feed in parallel across multiple consumers.
- bool 'is_start_from_beginning': [Deprecated] Get whether change feed should start from
beginning (true) or from current (false). By default, it's start from current (false).
- str 'continuation': e_tag value to be used as continuation for reading change feed.
- int 'max_item_count': Max number of items to be returned in the enumeration operation.

:param args: Positional arguments. Arguments must be in the following order:
1. partition_key_range_id
2. is_start_from_beginning
3. continuation
4. max_item_count
:type args: Tuple[Any, ...]
:param kwargs: Keyword arguments
:type kwargs: dict[str, Any]
"""
if len(args) > 4:
raise TypeError(f"'query_items_change_feed()' takes 4 positional arguments but {len(args)} were given.")

if len(args) > 0:
keys = [
'partition_key_range_id',
'is_start_from_beginning',
'continuation',
'max_item_count',
]
for i, value in enumerate(args):
key = keys[i]

if key in kwargs:
raise TypeError(f"'query_items_change_feed()' got multiple values for argument '{key}'.")

kwargs[key] = value

def validate_kwargs(
kwargs: Dict[str, Any]
) -> None:
"""Validate keyword arguments(kwargs).
The values of keyword arguments must match the expect type and conditions. If the conditions do not match,
errors will be raised with the error messages and possible ways to correct the errors.

:param kwargs: Keyword arguments to verify for query_items_change_feed API
:keyword mode: Must be one of the values in the Enum, 'ChangeFeedMode'.
If the value is 'ALL_VERSIONS_AND_DELETES', the following keywords must be in the right condition:
- 'partition_key_range_id': Cannot be used at any time
- 'is_start_from_beginning': Must be 'False'
- 'start_time': Must be "Now"
:paramtype mode: Optional[Literal["LatestVersion", "AllVersionsAndDeletes"]]
:keyword partition_key_range_id: Deprecated Warning.
:paramtype partition_key_range_id: str
:keyword is_start_from_beginning: Deprecated Warning. Cannot be used with 'start_time'.
:paramtype is_start_from_beginning: bool
:keyword start_time: Must be in supported types.
:paramtype start_time: Union[~datetime.datetime, Literal["Now", "Beginning"]]
:type kwargs: dict[str, Any]
"""
# Filter items with value None
kwargs = {key: value for key, value in kwargs.items() if value is not None}

# Validate the keyword arguments
if "mode" in kwargs:
mode = kwargs["mode"]
if mode not in CHANGE_FEED_MODES:
raise ValueError(
f"Invalid mode was used: '{kwargs['mode']}'."
f" Supported modes are {CHANGE_FEED_MODES}.")

if mode == 'AllVersionsAndDeletes':
if "partition_key_range_id" in kwargs:
raise ValueError(
"'AllVersionsAndDeletes' mode is not supported if 'partition_key_range_id'"
" was used. Please use 'feed_range' instead.")
if "is_start_from_beginning" in kwargs and kwargs["is_start_from_beginning"] is not False:
raise ValueError(
"'AllVersionsAndDeletes' mode is only supported if 'is_start_from_beginning'"
" is 'False'. Please use 'is_start_from_beginning=False' or 'continuation' instead.")
if "start_time" in kwargs and kwargs["start_time"] != "Now":
raise ValueError(
"'AllVersionsAndDeletes' mode is only supported if 'start_time' is 'Now'."
" Please use 'start_time=\"Now\"' or 'continuation' instead.")

if "partition_key_range_id" in kwargs:
warnings.warn(
"'partition_key_range_id' is deprecated. Please pass in 'feed_range' instead.",
DeprecationWarning
)

if "is_start_from_beginning" in kwargs:
warnings.warn(
"'is_start_from_beginning' is deprecated. Please pass in 'start_time' instead.",
DeprecationWarning
)

if not isinstance(kwargs["is_start_from_beginning"], bool):
raise TypeError(
f"'is_start_from_beginning' must be 'bool' type,"
f" but given '{type(kwargs['is_start_from_beginning']).__name__}'.")

if kwargs["is_start_from_beginning"] is True and "start_time" in kwargs:
raise ValueError("'is_start_from_beginning' and 'start_time' are exclusive, please only set one of them.")

if "start_time" in kwargs:
if not isinstance(kwargs['start_time'], datetime):
if kwargs['start_time'].lower() not in ["now", "beginning"]:
raise ValueError(
f"'start_time' must be either 'Now' or 'Beginning', but given '{kwargs['start_time']}'.")
Loading