Skip to content

Cosmos DB - query_items_change_feed only processes first partition then stops #34922

Open

Description

  • Package Name: azure-cosmos
  • Package Version: 4.6.0
  • Operating System: Windows
  • Python Version: 3.12.2

Describe the bug
Current implementation of query_items_change_feed method returns all documents from the first partition and then it stops processing. It does not automatically advance to next partition.

To Reproduce

  1. Provide account URL and key
  2. Run below snippet. It fails on last assertion statement.
import asyncio
import uuid
from azure.cosmos.aio import CosmosClient
from azure.cosmos.partition_key import PartitionKey
from azure.cosmos import ThroughputProperties

async def main(account_url, account_key):
    
    client = CosmosClient(account_url, account_key)
    async with client:

        # Create random db and container with 2 partitions. Names can be fixed for subsequent tests.
        db = await client.create_database_if_not_exists(id=str(uuid.uuid4())) 
        coll = await db.create_container_if_not_exists(
            id=str(uuid.uuid4()),
            partition_key=PartitionKey(path='/pk', kind='Hash'), 
            offer_throughput=ThroughputProperties(auto_scale_max_throughput=20000, auto_scale_increment_percent=0)
        )

        # Ensure we have 2 partitions.
        pks = []
        async for pk in coll.client_connection._ReadPartitionKeyRanges(coll.container_link):
            pks.append(pk)

        assert len(pks) == 2, f'Expected 2 partitions, found: {len(pks)}.'
        
        # Define docs to insert. 2 distinct logical partition keys.
        item1 = {'pk': 'pk1', 'id': 'id1'}
        item2 = {'pk': 'pk1', 'id': 'id2'}
        item3 = {'pk': 'pk1', 'id': 'id3'}
        item4 = {'pk': 'pk2', 'id': 'id1'}
        
        # Insert docs to db.
        await asyncio.gather(
            coll.create_item(item1), 
            coll.create_item(item2), 
            coll.create_item(item3),
            coll.create_item(item4),
        )

        # We've created 4 items with 2 distinct partition keys, let's confirm they landed on different physical partitions.
        items_partition0, items_partition1 = [], []
        async for item in coll.query_items(query='SELECT * FROM c', partition_key_range_id=pks[0]['id']):
            items_partition0.append(item)
        async for item in coll.query_items(query='SELECT * FROM c', partition_key_range_id=pks[1]['id']):
            items_partition1.append(item)
        assert len(items_partition0) == 3 and len(items_partition1) == 1, \
            f'Expected 3 documents on partiton 0 and \
            1 document on partition 1, found: {len(items_partition0)} documents \
            on partition 0 and {len(items_partition1)} on partition 1.'
        
        # Now that we've confirmed the data layout, let's read change feed for both partitions together and for each separately.
        items_cf_partition0, items_cf_partition1, items_cf_all = [], [], []
        async for item in coll.query_items_change_feed(is_start_from_beginning=True, partition_key_range_id=pks[0]['id']):
            items_cf_partition0.append(item)
        async for item in coll.query_items_change_feed(is_start_from_beginning=True, partition_key_range_id=pks[1]['id']):
            items_cf_partition1.append(item)
        async for item in coll.query_items_change_feed(is_start_from_beginning=True):
            items_cf_all.append(item)

        assert len(items_cf_partition0) == 3 and len(items_cf_partition1) == 1, \
            f'Expected 3 documents on partiton 0 and \
            1 document on partition 1, found: {len(items_partition0)} documents \
            on partition 0 and {len(items_partition1)} on partition 1.'

        assert len(items_cf_all) == 4, f'Expected 4 documents, found: {len(items_cf_all)}.'


if __name__ == '__main__':

    account_url = ''
    account_key = ''

    asyncio.run(main(account_url, account_key))

Expected behavior
I would expect it to automatically pick up on the next partition and continue enumerating documents until the very last partition is processed. In the absence of automatic advancement to next available partition, please provide guidance on how to manually "nudge" it to keep processing.

Screenshots
N/A

Additional context
Discussed with @simorenoh offline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Labels

ClientThis issue points to a problem in the data-plane of the library.CosmosOpenAIService AttentionWorkflow: This issue is responsible by Azure service team.bugThis issue requires a change to an existing behavior in the product in order to be resolved.cosmos-dt-planningConsidered for Dilithium semester planningcustomer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK team

Type

No type

Projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions