Skip to content

Support asave() within apipeline for batched saves#198

Merged
yedidyakfir merged 23 commits intodevelopfrom
feature/195-support-asave-with-apipeline-for-batched-saves
Feb 5, 2026
Merged

Support asave() within apipeline for batched saves#198
yedidyakfir merged 23 commits intodevelopfrom
feature/195-support-asave-with-apipeline-for-batched-saves

Conversation

@yedidyakfir
Copy link
Collaborator

Summary

Enables asave() to work correctly within pipeline context, allowing batched save operations for improved performance.

Changes

  • Added client property to AtomicRedisModel that returns the pipeline context if available, otherwise the default redis client
  • Modified asave() to use self.client instead of directly accessing self.Meta.redis
  • Added changelog entry documenting the new feature

Testing

The changes allow usage like:

async with model.apipeline() as m:
    await m.asave()

Closes #195

Copilot AI review requested due to automatic review settings February 5, 2026 08:48
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request aims to enable asave() to work within pipeline contexts for batched save operations. The implementation adds a client property to AtomicRedisModel that returns the pipeline context when available, otherwise falls back to the default Redis client.

Changes:

  • Added client property to AtomicRedisModel to support pipeline context detection
  • Modified asave() to use the new client property instead of directly accessing Meta.redis
  • Added changelog entry documenting the feature

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
rapyer/base.py Added client property and updated asave() to use it for pipeline support
CHANGELOG.md Documented the new pipeline support for asave()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings February 5, 2026 10:30
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

rapyer/base.py Outdated
Comment on lines 704 to 719
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise

if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable self is used but doesn't exist in this context. This is a module-level function, not a method, so self is not defined. This will cause a NameError at runtime. Consider removing the getattr(self, "key", None) call or finding an alternative way to log context information.

Suggested change
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
"ignore_redis_error=True: %s",
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(
AtomicRedisModel.Meta.redis,
AtomicRedisModel.Meta,
)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with AtomicRedisModel.Meta.redis.pipeline(transaction=True) as retry_pipe:

Copilot uses AI. Check for mistakes.
rapyer/base.py Outdated
Comment on lines 704 to 719
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise

if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable self is used but doesn't exist in this context. This is a module-level function, not a method, so self is not defined. This will cause a NameError at runtime when handling NoScriptError. Replace self.Meta.redis with AtomicRedisModel.Meta.redis to access the redis client.

Suggested change
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
"ignore_redis_error=True: %s",
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(
AtomicRedisModel.Meta.redis,
AtomicRedisModel.Meta,
)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with AtomicRedisModel.Meta.redis.pipeline(transaction=True) as retry_pipe:

Copilot uses AI. Check for mistakes.
rapyer/base.py Outdated
Comment on lines 705 to 719
getattr(self, "key", None),
exc,
)
else:
raise

if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable self is used but doesn't exist in this context. This is a module-level function, not a method, so self is not defined. This will cause a NameError at runtime when retrying after NoScriptError. Replace self.Meta.redis with AtomicRedisModel.Meta.redis to access the redis client.

Suggested change
getattr(self, "key", None),
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
None,
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(
AtomicRedisModel.Meta.redis, AtomicRedisModel.Meta
)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with AtomicRedisModel.Meta.redis.pipeline(
transaction=True
) as retry_pipe:

Copilot uses AI. Check for mistakes.
Comment on lines 691 to 733
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False

try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise

if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True

if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)

_context_var.reset(pipe_prev)
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context variable reset should be in a finally block to ensure it's always executed, even if an exception occurs during pipeline execution or error handling. If an exception is raised after line 691 but before line 733, the context variable will not be reset, potentially causing issues with subsequent operations that expect a clean context state.

Suggested change
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False
try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True
if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
_context_var.reset(pipe_prev)
try:
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False
try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True for key %r: %s",
getattr(self, "key", None),
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(self.Meta.redis, self.Meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with self.Meta.redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True
if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
finally:
_context_var.reset(pipe_prev)

Copilot uses AI. Check for mistakes.
yedidyakfir and others added 2 commits February 5, 2026 12:43
…_meta to apipeline, this should be only for Atomic redis, not for users
…ting codebase

- STACK.md - Technologies and dependencies
- ARCHITECTURE.md - System design and patterns
- STRUCTURE.md - Directory layout
- CONVENTIONS.md - Code style and patterns
- TESTING.md - Test structure
- INTEGRATIONS.md - External services
- CONCERNS.md - Technical debt and issues

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings February 5, 2026 15:09
…develop' into feature/195-support-asave-with-apipeline-for-batched-saves

# Conflicts:
#	CHANGELOG.md
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 697 to 738
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False

try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True: %s",
exc,
)
else:
raise

if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(redis, _meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True

if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)

_context_var.reset(pipe_prev)
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context variable reset on line 738 only executes if no exception occurs during the function. If an exception is raised (e.g., from lines 703-714), the context variable will not be reset, potentially causing context leakage where subsequent operations incorrectly use the pipeline context. This should be wrapped in a try-finally block to ensure the context is always reset.

Suggested change
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False
try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True: %s",
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(redis, _meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True
if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
_context_var.reset(pipe_prev)
try:
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False
try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True: %s",
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(redis, _meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True
if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
finally:
_context_var.reset(pipe_prev)

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings February 5, 2026 15:19
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +699 to +741
try:
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False

try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True: %s",
exc,
)
else:
raise

if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(redis, _meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True

if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
finally:
_context_var.reset(pipe_prev)
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The async with redis.pipeline(transaction=True) as pipe: block exits immediately after line 698, which means the pipeline object is closed before it gets yielded to the caller on line 700. The pipeline will not be usable for any operations. The entire try-finally block (lines 699-741) should be indented to be inside the async with block so that the pipeline remains open while it's being used.

Suggested change
try:
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False
try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True: %s",
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(redis, _meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True
if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
finally:
_context_var.reset(pipe_prev)
try:
yield pipe
commands_backup = list(pipe.command_stack)
noscript_on_first_attempt = False
noscript_on_retry = False
try:
await pipe.execute()
except NoScriptError:
noscript_on_first_attempt = True
except ResponseError as exc:
if ignore_redis_error:
logger.warning(
"Swallowed ResponseError during pipeline.execute() with "
"ignore_redis_error=True: %s",
exc,
)
else:
raise
if noscript_on_first_attempt:
await scripts_registry.handle_noscript_error(redis, _meta)
evalsha_commands = [
(args, options)
for args, options in commands_backup
if args[0] == "EVALSHA"
]
# Retry execute the pipeline actions
async with redis.pipeline(transaction=True) as retry_pipe:
for args, options in evalsha_commands:
retry_pipe.execute_command(*args, **options)
try:
await retry_pipe.execute()
except NoScriptError:
noscript_on_retry = True
if noscript_on_retry:
raise PersistentNoScriptError(
"NOSCRIPT error persisted after re-registering scripts. "
"This indicates a server-side problem with Redis."
)
finally:
_context_var.reset(pipe_prev)

Copilot uses AI. Check for mistakes.
@yedidyakfir yedidyakfir merged commit 0bb8cf7 into develop Feb 5, 2026
53 checks passed
@yedidyakfir yedidyakfir deleted the feature/195-support-asave-with-apipeline-for-batched-saves branch February 5, 2026 15:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant