Skip to content

Add pipeline and transaction #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,54 @@ and pass the command as a `list`.
redis.execute(command=["XLEN", "test_stream"])
```

### Pipelines & Transactions

If you want to submit commands in batches to reduce the number of roundtrips, you can utilize pipelining or
transactions. The difference between pipelines and transactions is that transactions are atomic: no other
command is executed during that transaction. In pipelines there is no such guarantee.

To use a pipeline, simply call the `pipeline` method:

```python
pipeline = redis.pipeline()

pipeline.set("foo", 1)
pipeline.incr("foo")
pipeline.get("foo")

result = pipeline.exec()

print(result)
# prints [True, 2, '2']
```

For transaction, use `mutli`:

```python
pipeline = redis.multi()

pipeline.set("foo", 1)
pipeline.incr("foo")
pipeline.get("foo")

result = pipeline.exec()

print(result)
# prints [True, 2, '2']
```

You can also chain the commands:

```python
pipeline = redis.pipeline()

pipeline.set("foo", 1).incr("foo").get("foo")
result = pipeline.exec()

print(result)
# prints [True, 2, '2']
```

# Encoding
Although Redis can store invalid JSON data, there might be problems with the deserialization.
To avoid this, the Upstash REST proxy is capable of encoding the data as base64 on the server and then sending it to the client to be
Expand Down
103 changes: 103 additions & 0 deletions tests/commands/asyncio/test_asyncio_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import pytest
import pytest_asyncio

from upstash_redis.asyncio import Redis


@pytest_asyncio.fixture(autouse=True)
async def flush_db(async_redis: Redis):
await async_redis.delete("rocket", "space", "marine")

@pytest.mark.asyncio
async def test_pipeline(async_redis: Redis):

pipeline = async_redis.pipeline()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

# can chain commands
pipeline.get("rocket").get("space").get("marine")

res = await pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

@pytest.mark.asyncio
async def test_multi(async_redis: Redis):

pipeline = async_redis.multi()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = await pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

@pytest.mark.asyncio
async def test_context_manager_usage(async_redis: Redis):

async with async_redis.pipeline() as pipeline:
pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
result = await pipeline.exec()

# add a command to the pipeline which will be
# removed from the pipeline when we exit the context
pipeline.set("foo", "bar")

assert result == [1, 2, 1, 3, 2, 4]
assert len(pipeline._command_stack) == 0 # pipeline is empty

# redis still works after pipeline is done
get_result = await async_redis.get("rocket")
assert get_result == "4"

get_pipeline = async_redis.pipeline()
get_pipeline.get("rocket")
get_pipeline.get("space")
get_pipeline.get("marine")

res = await get_pipeline.exec()
assert res == ["4", "2", None]

@pytest.mark.asyncio
async def test_context_manager_raise(async_redis: Redis):
"""
Check that exceptions in context aren't silently ignored

This can happen if we return something in __exit__ method
"""
with pytest.raises(Exception):
async with async_redis.pipeline() as pipeline:
pipeline.incr("rocket")
raise Exception("test")

@pytest.mark.asyncio
async def test_run_pipeline_twice(async_redis: Redis):
"""
Runs a pipeline twice
"""
pipeline = async_redis.pipeline()
pipeline.incr("albatros")
result = await pipeline.exec()
assert result == [1]

pipeline.incrby("albatros", 2)
result = await pipeline.exec()
assert result == [3]
96 changes: 96 additions & 0 deletions tests/commands/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import pytest

from upstash_redis import Redis


@pytest.fixture(autouse=True)
def flush_db(redis: Redis):
redis.delete("rocket", "space", "marine")

def test_pipeline(redis: Redis):

pipeline = redis.pipeline()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket").get("space").get("marine")

res = pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

def test_multi(redis: Redis):

pipeline = redis.multi()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

def test_context_manager_usage(redis: Redis):

with redis.pipeline() as pipeline:
pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
result = pipeline.exec()

# add a command to the pipeline which will be
# removed from the pipeline when we exit the context
pipeline.set("foo", "bar")

assert result == [1, 2, 1, 3, 2, 4]
assert len(pipeline._command_stack) == 0 # pipeline is empty

# redis still works after pipeline is done
result = redis.get("rocket")
assert result == "4"

get_pipeline = redis.pipeline()
get_pipeline.get("rocket")
get_pipeline.get("space")
get_pipeline.get("marine")

res = get_pipeline.exec()
assert res == ["4", "2", None]

def test_context_manager_raise(redis: Redis):
"""
Check that exceptions in context aren't silently ignored

This can happen if we return something in __exit__ method
"""
with pytest.raises(Exception):
with redis.pipeline() as pipeline:
pipeline.incr("rocket")
raise Exception("test")

def test_run_pipeline_twice(redis: Redis):
"""
Runs a pipeline twice
"""
pipeline = redis.pipeline()
pipeline.incr("bird")
result = pipeline.exec()
assert result == [1]

pipeline.incrby("bird", 2)
result = pipeline.exec()
assert result == [3]
Loading
Loading