Skip to content

[Async] Multi Exec on cluster #3649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

vladvildanov
Copy link
Collaborator

@vladvildanov vladvildanov commented May 19, 2025

Pull Request check-list

Please make sure to review and check all of these items:

  • Do tests and lints pass with this change?
  • Do the CI tests pass with this change (enable it first in your forked repo and wait for the github action build to finish)?
  • Is the new or changed code fully tested?
  • Is a documentation update included (if this change modifies existing APIs, or introduces new ones)?
  • Is there an example added to the examples folder (if applicable)?
  • Was the change added to CHANGES file?

NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.

Description of change

Async implementation of #3611

except Exception as e:
error = e

thread = threading.Thread(target=runner)
Copy link
Collaborator Author

@vladvildanov vladvildanov May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@petyaslavova @elena-kolevska ClusterPipeline provides only synchronous API for execute_command method, so because of this limitation the only way I found to run asynchronous code inside of synchronous method is to run it inside of short-living thread with it's own event loop. During testing I didn't find a significant performance impact of this (basic set/get test takes around 15ms on my machine).

I think the reason why API is synchronous is the fact that in case of Pipeline execute_command is just about adding command into queue, but for Transactions we need much more that requires asynchronous code execution. Let me know WDYT about this


def __init__(self, client: RedisCluster) -> None:
self._client = client
__slots__ = ("cluster_client",)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you use slots, you should add all properties that the objects needs. Here you should add "_transaction" and "_execution_strategy"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

self._pipe: ClusterPipeline = pipe
self._command_queue: List["PipelineCommand"] = []

async def __aenter__(self) -> "ClusterPipeline":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that those methods are not needed. We only use the strategies internally within the pipeline, so there's no need to support context manager behavior.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return self

async def __aenter__(self) -> "ClusterPipeline":
return await self.initialize()

async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
self._command_stack = []
self._execution_strategy._command_queue = []

def __await__(self) -> Generator[Any, None, "ClusterPipeline"]:
return self.initialize().__await__()

def __enter__(self) -> "ClusterPipeline":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a meaningful use case to use the async ClusterPipeline as a synchronous context manager.
Can we remove those enter and exit methods, or it will be considered a breaking change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be a breaking, not sure how does it works if someone uses it

return self

async def __aenter__(self) -> "ClusterPipeline":
return await self.initialize()

async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
self._command_stack = []
self._execution_strategy._command_queue = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.reset() would probably be a better choice here - it will clear the state of the transactional pipeline as well when the ClusterPipeline should be closed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if self._client._initialize:
await self._client.initialize()
self._command_stack = []
if self.cluster_client._initialize:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the _execution_strategy's initialize method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return self

def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
self._command_stack = []
self._execution_strategy._command_queue = []

def __bool__(self) -> bool:
"Pipeline instances should always evaluate to True on Python 3+"
return True

def __len__(self) -> int:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can expose a len method in the ExecutionStrategy and then call len(_execution_strategy) - this way you won't need to access the private property in the ClusterPipeline class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

await asyncio.sleep(0.25)
else:
# All other errors should be raised.
raise e
finally:
self._command_stack = []
self._command_queue = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.reset()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@vladvildanov vladvildanov requested a review from Copilot May 20, 2025 14:50
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces asynchronous support for cluster transactions, enhancing error handling during operations such as migrations and connection issues.

  • Added comprehensive async tests for cluster transactions covering AskError, ExecAbortError, and connection errors.
  • Updated behavior of the cluster pipeline tests by removing the deprecated transaction flag check.

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
tests/test_asyncio/test_cluster_transaction.py Added extensive async transaction tests with various error cases.
tests/test_asyncio/test_cluster.py Removed deprecated transaction check and adjusted the behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants