Skip to content

Training models using streaming Kafka dataset #802

Closed
@dalelane

Description

@dalelane

I'm sure my problem is I'm using this wrong rather than that this is a bug.

I'm trying to train a model using a Kafka dataset, and continually refresh/retrain the model as new events appear on the topic.

A simplified version of my setup is:

batch_size=10

dataset = kafka_io.KafkaDataset([ "TRAINING.TOPIC:0" ],
                                servers="kafka.bootstrap:9092",
                                group="TRAINING.GROUP",
                                eof=False,
                                config_global=[
                                    "api.version.request=true",
                                    "enable.auto.commit=true"
                                ])

dataset = dataset.map(deserialize).batch(batch_size)

model = keras.Sequential([
    keras.layers.Flatten(input_shape=(28, 28)),
    keras.layers.Dense(128, activation="relu"),
    keras.layers.Dense(10, activation="softmax")
])

model.compile(optimizer="adam",
            loss="sparse_categorical_crossentropy",
            metrics=["accuracy"])

I'm using eof=False to keep the consumer going as new messages arrive, and then repeatedly calling model.fit to keep improving the model using the new data.

model.fit(dataset, epochs=1, steps_per_epoch=50)
model.fit(dataset, epochs=1, steps_per_epoch=50)
model.fit(dataset, epochs=1, steps_per_epoch=50)
...
model.fit(dataset, epochs=1, steps_per_epoch=50)

I was assuming it was safe to keep re-calling model.fit using the same open consumer dataset, but that assumption doesn't appear to be correct - it works okay for a little while before failing with a Too many open files error.

2020-02-21 08:12:17.026857: E tensorflow_io/kafka/kernels/kafka_dataset_ops.cc:156] 
EVENT_ERROR: (Local: Broker transport failure): sasl_ssl://9.20.193.37:31029/2: 
Failed to create socket: Too many open files (after 354960171ms in state INIT)

I guess it must be creating multiple new Kafka clients?

Can you help let me know what I'm doing wrong, please?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions