Support asave() within apipeline for batched saves#198
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request aims to enable asave() to work within pipeline contexts for batched save operations. The implementation adds a client property to AtomicRedisModel that returns the pipeline context when available, otherwise falls back to the default Redis client.
Changes:
- Added
clientproperty toAtomicRedisModelto support pipeline context detection - Modified
asave()to use the newclientproperty instead of directly accessingMeta.redis - Added changelog entry documenting the feature
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| rapyer/base.py | Added client property and updated asave() to use it for pipeline support |
| CHANGELOG.md | Documented the new pipeline support for asave() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…hod in atomic class
…r nested pipelines
…e for rapyer (not specific model)
…non model specific
… apipeline, prevent duplicated code
… check nested pipeline and asve in pipeline
…redis_error feature in rapyer apipline
…nal in context var
… rapyer.apipeline
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
rapyer/base.py
Outdated
| "ignore_redis_error=True for key %r: %s", | ||
| getattr(self, "key", None), | ||
| exc, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
||
| if noscript_on_first_attempt: | ||
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | ||
| evalsha_commands = [ | ||
| (args, options) | ||
| for args, options in commands_backup | ||
| if args[0] == "EVALSHA" | ||
| ] | ||
| # Retry execute the pipeline actions | ||
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: |
There was a problem hiding this comment.
The variable self is used but doesn't exist in this context. This is a module-level function, not a method, so self is not defined. This will cause a NameError at runtime. Consider removing the getattr(self, "key", None) call or finding an alternative way to log context information.
| "ignore_redis_error=True for key %r: %s", | |
| getattr(self, "key", None), | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: | |
| "ignore_redis_error=True: %s", | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error( | |
| AtomicRedisModel.Meta.redis, | |
| AtomicRedisModel.Meta, | |
| ) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with AtomicRedisModel.Meta.redis.pipeline(transaction=True) as retry_pipe: |
rapyer/base.py
Outdated
| "ignore_redis_error=True for key %r: %s", | ||
| getattr(self, "key", None), | ||
| exc, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
||
| if noscript_on_first_attempt: | ||
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | ||
| evalsha_commands = [ | ||
| (args, options) | ||
| for args, options in commands_backup | ||
| if args[0] == "EVALSHA" | ||
| ] | ||
| # Retry execute the pipeline actions | ||
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: |
There was a problem hiding this comment.
The variable self is used but doesn't exist in this context. This is a module-level function, not a method, so self is not defined. This will cause a NameError at runtime when handling NoScriptError. Replace self.Meta.redis with AtomicRedisModel.Meta.redis to access the redis client.
| "ignore_redis_error=True for key %r: %s", | |
| getattr(self, "key", None), | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: | |
| "ignore_redis_error=True: %s", | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error( | |
| AtomicRedisModel.Meta.redis, | |
| AtomicRedisModel.Meta, | |
| ) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with AtomicRedisModel.Meta.redis.pipeline(transaction=True) as retry_pipe: |
rapyer/base.py
Outdated
| getattr(self, "key", None), | ||
| exc, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
||
| if noscript_on_first_attempt: | ||
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | ||
| evalsha_commands = [ | ||
| (args, options) | ||
| for args, options in commands_backup | ||
| if args[0] == "EVALSHA" | ||
| ] | ||
| # Retry execute the pipeline actions | ||
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: |
There was a problem hiding this comment.
The variable self is used but doesn't exist in this context. This is a module-level function, not a method, so self is not defined. This will cause a NameError at runtime when retrying after NoScriptError. Replace self.Meta.redis with AtomicRedisModel.Meta.redis to access the redis client.
| getattr(self, "key", None), | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: | |
| None, | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error( | |
| AtomicRedisModel.Meta.redis, AtomicRedisModel.Meta | |
| ) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with AtomicRedisModel.Meta.redis.pipeline( | |
| transaction=True | |
| ) as retry_pipe: |
| yield pipe | ||
| commands_backup = list(pipe.command_stack) | ||
| noscript_on_first_attempt = False | ||
| noscript_on_retry = False | ||
|
|
||
| try: | ||
| await pipe.execute() | ||
| except NoScriptError: | ||
| noscript_on_first_attempt = True | ||
| except ResponseError as exc: | ||
| if ignore_redis_error: | ||
| logger.warning( | ||
| "Swallowed ResponseError during pipeline.execute() with " | ||
| "ignore_redis_error=True for key %r: %s", | ||
| getattr(self, "key", None), | ||
| exc, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
||
| if noscript_on_first_attempt: | ||
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | ||
| evalsha_commands = [ | ||
| (args, options) | ||
| for args, options in commands_backup | ||
| if args[0] == "EVALSHA" | ||
| ] | ||
| # Retry execute the pipeline actions | ||
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: | ||
| for args, options in evalsha_commands: | ||
| retry_pipe.execute_command(*args, **options) | ||
| try: | ||
| await retry_pipe.execute() | ||
| except NoScriptError: | ||
| noscript_on_retry = True | ||
|
|
||
| if noscript_on_retry: | ||
| raise PersistentNoScriptError( | ||
| "NOSCRIPT error persisted after re-registering scripts. " | ||
| "This indicates a server-side problem with Redis." | ||
| ) | ||
|
|
||
| _context_var.reset(pipe_prev) |
There was a problem hiding this comment.
The context variable reset should be in a finally block to ensure it's always executed, even if an exception occurs during pipeline execution or error handling. If an exception is raised after line 691 but before line 733, the context variable will not be reset, potentially causing issues with subsequent operations that expect a clean context state.
| yield pipe | |
| commands_backup = list(pipe.command_stack) | |
| noscript_on_first_attempt = False | |
| noscript_on_retry = False | |
| try: | |
| await pipe.execute() | |
| except NoScriptError: | |
| noscript_on_first_attempt = True | |
| except ResponseError as exc: | |
| if ignore_redis_error: | |
| logger.warning( | |
| "Swallowed ResponseError during pipeline.execute() with " | |
| "ignore_redis_error=True for key %r: %s", | |
| getattr(self, "key", None), | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: | |
| for args, options in evalsha_commands: | |
| retry_pipe.execute_command(*args, **options) | |
| try: | |
| await retry_pipe.execute() | |
| except NoScriptError: | |
| noscript_on_retry = True | |
| if noscript_on_retry: | |
| raise PersistentNoScriptError( | |
| "NOSCRIPT error persisted after re-registering scripts. " | |
| "This indicates a server-side problem with Redis." | |
| ) | |
| _context_var.reset(pipe_prev) | |
| try: | |
| yield pipe | |
| commands_backup = list(pipe.command_stack) | |
| noscript_on_first_attempt = False | |
| noscript_on_retry = False | |
| try: | |
| await pipe.execute() | |
| except NoScriptError: | |
| noscript_on_first_attempt = True | |
| except ResponseError as exc: | |
| if ignore_redis_error: | |
| logger.warning( | |
| "Swallowed ResponseError during pipeline.execute() with " | |
| "ignore_redis_error=True for key %r: %s", | |
| getattr(self, "key", None), | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with self.Meta.redis.pipeline(transaction=True) as retry_pipe: | |
| for args, options in evalsha_commands: | |
| retry_pipe.execute_command(*args, **options) | |
| try: | |
| await retry_pipe.execute() | |
| except NoScriptError: | |
| noscript_on_retry = True | |
| if noscript_on_retry: | |
| raise PersistentNoScriptError( | |
| "NOSCRIPT error persisted after re-registering scripts. " | |
| "This indicates a server-side problem with Redis." | |
| ) | |
| finally: | |
| _context_var.reset(pipe_prev) |
…_meta to apipeline, this should be only for Atomic redis, not for users
…ting codebase - STACK.md - Technologies and dependencies - ARCHITECTURE.md - System design and patterns - STRUCTURE.md - Directory layout - CONVENTIONS.md - Code style and patterns - TESTING.md - Test structure - INTEGRATIONS.md - External services - CONCERNS.md - Technical debt and issues Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…develop' into feature/195-support-asave-with-apipeline-for-batched-saves # Conflicts: # CHANGELOG.md
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| yield pipe | ||
| commands_backup = list(pipe.command_stack) | ||
| noscript_on_first_attempt = False | ||
| noscript_on_retry = False | ||
|
|
||
| try: | ||
| await pipe.execute() | ||
| except NoScriptError: | ||
| noscript_on_first_attempt = True | ||
| except ResponseError as exc: | ||
| if ignore_redis_error: | ||
| logger.warning( | ||
| "Swallowed ResponseError during pipeline.execute() with " | ||
| "ignore_redis_error=True: %s", | ||
| exc, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
||
| if noscript_on_first_attempt: | ||
| await scripts_registry.handle_noscript_error(redis, _meta) | ||
| evalsha_commands = [ | ||
| (args, options) | ||
| for args, options in commands_backup | ||
| if args[0] == "EVALSHA" | ||
| ] | ||
| # Retry execute the pipeline actions | ||
| async with redis.pipeline(transaction=True) as retry_pipe: | ||
| for args, options in evalsha_commands: | ||
| retry_pipe.execute_command(*args, **options) | ||
| try: | ||
| await retry_pipe.execute() | ||
| except NoScriptError: | ||
| noscript_on_retry = True | ||
|
|
||
| if noscript_on_retry: | ||
| raise PersistentNoScriptError( | ||
| "NOSCRIPT error persisted after re-registering scripts. " | ||
| "This indicates a server-side problem with Redis." | ||
| ) | ||
|
|
||
| _context_var.reset(pipe_prev) |
There was a problem hiding this comment.
The context variable reset on line 738 only executes if no exception occurs during the function. If an exception is raised (e.g., from lines 703-714), the context variable will not be reset, potentially causing context leakage where subsequent operations incorrectly use the pipeline context. This should be wrapped in a try-finally block to ensure the context is always reset.
| yield pipe | |
| commands_backup = list(pipe.command_stack) | |
| noscript_on_first_attempt = False | |
| noscript_on_retry = False | |
| try: | |
| await pipe.execute() | |
| except NoScriptError: | |
| noscript_on_first_attempt = True | |
| except ResponseError as exc: | |
| if ignore_redis_error: | |
| logger.warning( | |
| "Swallowed ResponseError during pipeline.execute() with " | |
| "ignore_redis_error=True: %s", | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(redis, _meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with redis.pipeline(transaction=True) as retry_pipe: | |
| for args, options in evalsha_commands: | |
| retry_pipe.execute_command(*args, **options) | |
| try: | |
| await retry_pipe.execute() | |
| except NoScriptError: | |
| noscript_on_retry = True | |
| if noscript_on_retry: | |
| raise PersistentNoScriptError( | |
| "NOSCRIPT error persisted after re-registering scripts. " | |
| "This indicates a server-side problem with Redis." | |
| ) | |
| _context_var.reset(pipe_prev) | |
| try: | |
| yield pipe | |
| commands_backup = list(pipe.command_stack) | |
| noscript_on_first_attempt = False | |
| noscript_on_retry = False | |
| try: | |
| await pipe.execute() | |
| except NoScriptError: | |
| noscript_on_first_attempt = True | |
| except ResponseError as exc: | |
| if ignore_redis_error: | |
| logger.warning( | |
| "Swallowed ResponseError during pipeline.execute() with " | |
| "ignore_redis_error=True: %s", | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(redis, _meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with redis.pipeline(transaction=True) as retry_pipe: | |
| for args, options in evalsha_commands: | |
| retry_pipe.execute_command(*args, **options) | |
| try: | |
| await retry_pipe.execute() | |
| except NoScriptError: | |
| noscript_on_retry = True | |
| if noscript_on_retry: | |
| raise PersistentNoScriptError( | |
| "NOSCRIPT error persisted after re-registering scripts. " | |
| "This indicates a server-side problem with Redis." | |
| ) | |
| finally: | |
| _context_var.reset(pipe_prev) |
…text var is always initialized
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try: | ||
| yield pipe | ||
| commands_backup = list(pipe.command_stack) | ||
| noscript_on_first_attempt = False | ||
| noscript_on_retry = False | ||
|
|
||
| try: | ||
| await pipe.execute() | ||
| except NoScriptError: | ||
| noscript_on_first_attempt = True | ||
| except ResponseError as exc: | ||
| if ignore_redis_error: | ||
| logger.warning( | ||
| "Swallowed ResponseError during pipeline.execute() with " | ||
| "ignore_redis_error=True: %s", | ||
| exc, | ||
| ) | ||
| else: | ||
| raise | ||
|
|
||
| if noscript_on_first_attempt: | ||
| await scripts_registry.handle_noscript_error(redis, _meta) | ||
| evalsha_commands = [ | ||
| (args, options) | ||
| for args, options in commands_backup | ||
| if args[0] == "EVALSHA" | ||
| ] | ||
| # Retry execute the pipeline actions | ||
| async with redis.pipeline(transaction=True) as retry_pipe: | ||
| for args, options in evalsha_commands: | ||
| retry_pipe.execute_command(*args, **options) | ||
| try: | ||
| await retry_pipe.execute() | ||
| except NoScriptError: | ||
| noscript_on_retry = True | ||
|
|
||
| if noscript_on_retry: | ||
| raise PersistentNoScriptError( | ||
| "NOSCRIPT error persisted after re-registering scripts. " | ||
| "This indicates a server-side problem with Redis." | ||
| ) | ||
| finally: | ||
| _context_var.reset(pipe_prev) |
There was a problem hiding this comment.
The async with redis.pipeline(transaction=True) as pipe: block exits immediately after line 698, which means the pipeline object is closed before it gets yielded to the caller on line 700. The pipeline will not be usable for any operations. The entire try-finally block (lines 699-741) should be indented to be inside the async with block so that the pipeline remains open while it's being used.
| try: | |
| yield pipe | |
| commands_backup = list(pipe.command_stack) | |
| noscript_on_first_attempt = False | |
| noscript_on_retry = False | |
| try: | |
| await pipe.execute() | |
| except NoScriptError: | |
| noscript_on_first_attempt = True | |
| except ResponseError as exc: | |
| if ignore_redis_error: | |
| logger.warning( | |
| "Swallowed ResponseError during pipeline.execute() with " | |
| "ignore_redis_error=True: %s", | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(redis, _meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with redis.pipeline(transaction=True) as retry_pipe: | |
| for args, options in evalsha_commands: | |
| retry_pipe.execute_command(*args, **options) | |
| try: | |
| await retry_pipe.execute() | |
| except NoScriptError: | |
| noscript_on_retry = True | |
| if noscript_on_retry: | |
| raise PersistentNoScriptError( | |
| "NOSCRIPT error persisted after re-registering scripts. " | |
| "This indicates a server-side problem with Redis." | |
| ) | |
| finally: | |
| _context_var.reset(pipe_prev) | |
| try: | |
| yield pipe | |
| commands_backup = list(pipe.command_stack) | |
| noscript_on_first_attempt = False | |
| noscript_on_retry = False | |
| try: | |
| await pipe.execute() | |
| except NoScriptError: | |
| noscript_on_first_attempt = True | |
| except ResponseError as exc: | |
| if ignore_redis_error: | |
| logger.warning( | |
| "Swallowed ResponseError during pipeline.execute() with " | |
| "ignore_redis_error=True: %s", | |
| exc, | |
| ) | |
| else: | |
| raise | |
| if noscript_on_first_attempt: | |
| await scripts_registry.handle_noscript_error(redis, _meta) | |
| evalsha_commands = [ | |
| (args, options) | |
| for args, options in commands_backup | |
| if args[0] == "EVALSHA" | |
| ] | |
| # Retry execute the pipeline actions | |
| async with redis.pipeline(transaction=True) as retry_pipe: | |
| for args, options in evalsha_commands: | |
| retry_pipe.execute_command(*args, **options) | |
| try: | |
| await retry_pipe.execute() | |
| except NoScriptError: | |
| noscript_on_retry = True | |
| if noscript_on_retry: | |
| raise PersistentNoScriptError( | |
| "NOSCRIPT error persisted after re-registering scripts. " | |
| "This indicates a server-side problem with Redis." | |
| ) | |
| finally: | |
| _context_var.reset(pipe_prev) |
Summary
Enables
asave()to work correctly within pipeline context, allowing batched save operations for improved performance.Changes
clientproperty toAtomicRedisModelthat returns the pipeline context if available, otherwise the default redis clientasave()to useself.clientinstead of directly accessingself.Meta.redisTesting
The changes allow usage like:
Closes #195