Skip to content

Commit

Permalink
[KafkaDataset] Add KafkaGroupIODataset to support consumer rebalance. (
Browse files Browse the repository at this point in the history
  • Loading branch information
fuhailin authored Jan 21, 2022
1 parent 54546e0 commit 17c228d
Show file tree
Hide file tree
Showing 5 changed files with 1,422 additions and 4 deletions.
92 changes: 91 additions & 1 deletion docs/KafkaDataset.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## 功能

1. kafka dataset支持配置多partition,并按时序消费kafka 消息
1. kafka dataset支持保存/恢复状态信息
2. kafka dataset支持保存/恢复状态信息

## 接口介绍

Expand Down Expand Up @@ -57,3 +57,93 @@ with tf.Session() as sess:
print("Data", sess.run(get_next))
saver.save(sess, "ckpt/1")
```


# KafkaGroupIODataset


## 功能

1. KafkaGroupIODataset 支持配置多partition,并按时序消费kafka消息
2. KafkaGroupIODataset 支持消费者组内负载均衡

## 接口介绍

### API说明

```python
class KafkaGroupIODataset(dataset_ops.Dataset):
def __init__(
self,
topics,
group_id,
servers,
stream_timeout=0,
message_poll_timeout=10000,
configuration=None,
internal=True,
)
```

### 参数说明

- topics: A `tf.string` tensor containing topic names in [topic] format.
For example: ["topic1", "topic2"].
- group_id: The id of the consumer group. For example: cgstream.
- servers: An optional list of bootstrap servers.
For example: `localhost:9092`.
- stream_timeout: An optional timeout duration (in milliseconds) to block until the new messages from kafka are fetched. By default it is set to 0 milliseconds and doesn't block for new messages. To block indefinitely, set it to -1.
- message_poll_timeout: An optional timeout duration (in milliseconds) after which the kafka consumer throws a timeout error while fetching a single message. This value also represents the intervals at which the kafka topic(s) are polled for new messages while using the `stream_timeout`.
- configuration: An optional `tf.string` tensor containing configurations in [Key=Value] format.
- Global configuration: please refer to 'Global configuration properties' in librdkafka doc. Examples include ["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
- Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"]
- Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.
- internal: Whether the dataset is being created from within the named scope. Default: True.

## 使用示例

```python
from tensorflow.python.data.ops import dataset_ops
from tensorflow.python.data.ops import iterator_ops
from tensorflow.python.framework import dtypes
from tensorflow.python.ops import array_ops
from tensorflow.python.data.ops import readers
import tensorflow as tf


def make_initializable_iterator(ds):
r"""Wrapper of make_initializable_iterator.
"""
if hasattr(dataset_ops, 'make_initializable_iterator'):
return dataset_ops.make_initializable_iterator(ds)
return ds.make_initializable_iterator()

dataset = readers.KafkaGroupIODataset(
topics=["topic1", "topic2"],
group_id="cgstream",
servers="localhost:9092",
stream_timeout=3000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
"enable.auto.commit=true",
],
)

# create the iterators from the dataset
train_iterator = make_initializable_iterator(dataset)
handle = array_ops.placeholder(dtypes.string, shape=[])

iter = iterator_ops.Iterator.from_string_handle(
handle, train_iterator.output_types, train_iterator.output_shapes,
train_iterator.output_classes)
next_elements = iter.get_next()

with tf.Session() as sess:
train_handle = sess.run(train_iterator.string_handle())
sess.run([train_iterator.initializer])
for _ in range(100):
x = sess.run(next_elements, feed_dict={handle: train_handle})
print(x)
```
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,5 @@ oneDNN
WorkQueue
KafkaDataset
KafkaGroupIODataset
```
Loading

0 comments on commit 17c228d

Please sign in to comment.