Skip to content

Commit fc6fa19

Browse files
vgvolegblinkov
andauthored
Python SDK topic-transactions docs (#16203)
Co-authored-by: Ivan Blinkov <ivan@ydb.tech>
1 parent 093ff06 commit fc6fa19

File tree

2 files changed

+193
-25
lines changed

2 files changed

+193
-25
lines changed

ydb/docs/en/core/reference/ydb-sdk/topic.md

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
127127
{
128128
ProducerId = "ProducerId_Example"
129129
}.Build();
130-
130+
131131
await using var reader = new ReaderBuilder<string>(driver)
132132
{
133133
ConsumerName = "Consumer_Example",
134134
SubscribeSettings = { new SubscribeSettings(topicName) }
135135
}.Build();
136-
```
136+
```
137137

138138
{% endlist %}
139139

@@ -205,7 +205,7 @@ The topic path is mandatory. Other parameters are optional.
205205
.build())
206206
.build());
207207
```
208-
208+
209209
- С#
210210

211211
Example of creating a topic with a list of supported codecs and a minimum number of partitions:
@@ -788,7 +788,7 @@ Only connections with matching [producer and message group](../../concepts/topic
788788
```c#
789789
var writeCts = new CancellationTokenSource();
790790
writeCts.CancelAfter(TimeSpan.FromSeconds(3));
791-
791+
792792
await writer.WriteAsync("Hello, Example YDB Topics!", writeCts.Token);
793793
```
794794

@@ -1005,6 +1005,54 @@ All the metadata provided when writing a message is sent to a consumer with the
10051005
})
10061006
```
10071007

1008+
- Python
1009+
1010+
To write to a topic within a transaction, create a transactional writer by calling `topic_client.tx_writer` with the `tx` argument. Once created, you can send messages as usual. There's no need to close the transactional writer manually, as it will be closed automatically when the transaction ends.
1011+
1012+
In the example below, there is no explicit call to `tx.commit()`; it occurs implicitly upon the successful execution of the `callee` lambda.
1013+
1014+
[Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_example.py)
1015+
1016+
```python
1017+
with ydb.QuerySessionPool(driver) as session_pool:
1018+
1019+
def callee(tx: ydb.QueryTxContext):
1020+
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
1021+
1022+
for i in range(message_count):
1023+
result_stream = tx.execute(query=f"select {i} as res;")
1024+
for result_set in result_stream:
1025+
message = str(result_set.rows[0]["res"])
1026+
tx_writer.write(ydb.TopicWriterMessage(message))
1027+
print(f"Message {message} was written with tx.")
1028+
1029+
session_pool.retry_tx_sync(callee)
1030+
```
1031+
1032+
- Python (asyncio)
1033+
1034+
To write to a topic within a transaction, create a transactional writer by calling `topic_client.tx_writer` with the `tx` argument. Once created, you can send messages as usual. There's no need to close the transactional writer manually, as it will be closed automatically when the transaction ends.
1035+
1036+
In the example below, there is no explicit call to `tx.commit()`; it occurs implicitly upon the successful execution of the `callee` lambda.
1037+
1038+
[Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_async_example.py)
1039+
1040+
```python
1041+
async with ydb.aio.QuerySessionPool(driver) as session_pool:
1042+
1043+
async def callee(tx: ydb.aio.QueryTxContext):
1044+
tx_writer: ydb.TopicTxWriterAsyncIO = driver.topic_client.tx_writer(tx, topic)
1045+
1046+
for i in range(message_count):
1047+
async with await tx.execute(query=f"select {i} as res;") as result_stream:
1048+
async for result_set in result_stream:
1049+
message = str(result_set.rows[0]["res"])
1050+
await tx_writer.write(ydb.TopicWriterMessage(message))
1051+
print(f"Message {result_set.rows[0]['res']} was written with tx.")
1052+
1053+
await session_pool.retry_tx_async(callee)
1054+
```
1055+
10081056
- Java (sync)
10091057

10101058
[Example on GitHub](https://github.com/ydb-platform/ydb-java-examples/blob/develop/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java)
@@ -1285,7 +1333,7 @@ Topic can have several Consumers and for each of them server stores its own read
12851333
{
12861334
ConsumerName = "Consumer_Example",
12871335
SubscribeSettings = { new SubscribeSettings(topicName) }
1288-
}.Build();
1336+
}.Build();
12891337
```
12901338

12911339
{% endlist %}
@@ -1360,7 +1408,7 @@ To establish a connection to the `my-topic` and `my-specific-topic` topics using
13601408
}
13611409
}.Build();
13621410
```
1363-
1411+
13641412
{% endlist %}
13651413

13661414
### Reading messages {#reading-messages}
@@ -1465,7 +1513,7 @@ Data from topics can be read in the context of [transactions](#read-tx). In this
14651513
{
14661514
}
14671515
```
1468-
1516+
14691517
{% endlist %}
14701518

14711519
#### Reading message batches
@@ -1544,7 +1592,7 @@ Data from topics can be read in the context of [transactions](#read-tx). In this
15441592

15451593
foreach (var message in batchMessages.Batch)
15461594
{
1547-
logger.LogInformation("Received message: [{MessageData}]", message.Data);
1595+
logger.LogInformation("Received message: [{MessageData}]", message.Data);
15481596
}
15491597
}
15501598
}
@@ -1644,7 +1692,7 @@ If a commit fails with an error, the application should log it and continue; it
16441692
{
16451693
}
16461694
```
1647-
1695+
16481696
{% endlist %}
16491697

16501698
#### Reading message batches with commits
@@ -1736,7 +1784,7 @@ If a commit fails with an error, the application should log it and continue; it
17361784

17371785
foreach (var message in batchMessages.Batch)
17381786
{
1739-
logger.LogInformation("Received message: [{MessageData}]", message.Data);
1787+
logger.LogInformation("Received message: [{MessageData}]", message.Data);
17401788
}
17411789

17421790
try
@@ -1963,6 +2011,42 @@ Reading progress is usually saved on a server for each Consumer. However, such p
19632011
}
19642012
```
19652013

2014+
- Python
2015+
2016+
To read messages from a topic within a transaction, use the `reader.receive_batch_with_tx` method. It reads a batch of messages and adds their commit to the transaction, so there's no need to commit them separately. The reader can be reused across different transactions. However, it's essential to commit transactions in the same order as the messages are read from the reader, as message commits in the topic must be performed strictly in order - otherwise transaction will get an error during commit. The simplest way to ensure this is by using the reader within a loop.
2017+
2018+
[Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_example.py)
2019+
2020+
```python
2021+
with driver.topic_client.reader(topic, consumer) as reader:
2022+
with ydb.QuerySessionPool(driver) as session_pool:
2023+
for _ in range(message_count):
2024+
2025+
def callee(tx: ydb.QueryTxContext):
2026+
batch = reader.receive_batch_with_tx(tx, max_messages=1)
2027+
print(f"Message {batch.messages[0].data.decode()} was read with tx.")
2028+
2029+
session_pool.retry_tx_sync(callee)
2030+
```
2031+
2032+
- Python (asyncio)
2033+
2034+
To read messages from a topic within a transaction, use the `reader.receive_batch_with_tx` method. It reads a batch of messages and adds their commit to the transaction, so there's no need to commit them separately. The reader can be reused across different transactions. However, it's essential to commit transactions in the same order as the messages are read from the reader, as message commits in the topic must be performed strictly in order - otherwise transaction will get an error during commit. The simplest way to ensure this is by using the reader within a loop.
2035+
2036+
[Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_async_example.py)
2037+
2038+
```python
2039+
async with driver.topic_client.reader(topic, consumer) as reader:
2040+
async with ydb.aio.QuerySessionPool(driver) as session_pool:
2041+
for _ in range(message_count):
2042+
2043+
async def callee(tx: ydb.aio.QueryTxContext):
2044+
batch = await reader.receive_batch_with_tx(tx, max_messages=1)
2045+
print(f"Message {batch.messages[0].data.decode()} was read with tx.")
2046+
2047+
await session_pool.retry_tx_async(callee)
2048+
```
2049+
19662050
- Java (sync)
19672051

19682052
[Example on GitHub](https://github.com/ydb-platform/ydb-java-examples/blob/develop/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java)

0 commit comments

Comments
 (0)