Closed
Description
I'm sure my problem is I'm using this wrong rather than that this is a bug.
I'm trying to train a model using a Kafka dataset, and continually refresh/retrain the model as new events appear on the topic.
A simplified version of my setup is:
batch_size=10
dataset = kafka_io.KafkaDataset([ "TRAINING.TOPIC:0" ],
servers="kafka.bootstrap:9092",
group="TRAINING.GROUP",
eof=False,
config_global=[
"api.version.request=true",
"enable.auto.commit=true"
])
dataset = dataset.map(deserialize).batch(batch_size)
model = keras.Sequential([
keras.layers.Flatten(input_shape=(28, 28)),
keras.layers.Dense(128, activation="relu"),
keras.layers.Dense(10, activation="softmax")
])
model.compile(optimizer="adam",
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
I'm using eof=False
to keep the consumer going as new messages arrive, and then repeatedly calling model.fit
to keep improving the model using the new data.
model.fit(dataset, epochs=1, steps_per_epoch=50)
model.fit(dataset, epochs=1, steps_per_epoch=50)
model.fit(dataset, epochs=1, steps_per_epoch=50)
...
model.fit(dataset, epochs=1, steps_per_epoch=50)
I was assuming it was safe to keep re-calling model.fit
using the same open consumer dataset, but that assumption doesn't appear to be correct - it works okay for a little while before failing with a Too many open files
error.
2020-02-21 08:12:17.026857: E tensorflow_io/kafka/kernels/kafka_dataset_ops.cc:156]
EVENT_ERROR: (Local: Broker transport failure): sasl_ssl://9.20.193.37:31029/2:
Failed to create socket: Too many open files (after 354960171ms in state INIT)
I guess it must be creating multiple new Kafka clients?
Can you help let me know what I'm doing wrong, please?
Metadata
Metadata
Assignees
Labels
No labels