Closed
Description
openedon Apr 18, 2017
Hi All,
I tried to use toxiproxy for simulation bad connections to Kafka cluster. And it works like proxy, but when added some toxic it breaks connection and switch connection directly to original url (instead of use proxy, we receive messages without delay) or sometimes it just skips messages (don't receive messages).
I use kafka cluster (https://github.com/wurstmeister/kafka-docker) and toxiproxy-server.
$ docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 51af67256d9e kafkadocker_kafka "start-kafka.sh" 23 hours ago Up 23 hours 0.0.0.0:9092->9092/tcp kafkadocker_kafka_1 ccda8f90998a wurstmeister/zookeeper "/bin/sh -c '/usr/..." 23 hours ago Up 23 hours 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafkadocker_zookeeper_1
And add proxy: 9999->9092
{ "test_proxy": { "name": "test_proxy", "listen": "127.0.0.1:9999", "upstream": "0.0.0.0:9092", "enabled": true, "toxics": [ { "attributes": { "latency": 2000, "jitter": 500 }, "name": "latency_downstream", "type": "latency", "stream": "downstream", "toxicity": 1 } ] } }
I tried to send message(any string) with enabled toxic and then got it.
$ ./kafka-console-producer.sh --broker-list localhost:9999 --topic ptf.inbound.if ... [2017-04-17 18:44:31,076] WARN Fetching topic metadata with correlation id 40 for topics [Set(ptf.inbound.if)] from broker [id:0,host:localhost,port:9999] failed (kafka.client.ClientUtils$) java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:594) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2017-04-17 18:44:31,077] ERROR fetching topic metadata for topics [Set(ptf.inbound.if)] from broker [ArrayBuffer(id:0,host:localhost,port:9999)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(ptf.inbound.if)] from broker [ArrayBuffer(id:0,host:localhost,port:9999)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:594) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ... 12 more [2017-04-17 18:44:31,078] ERROR Failed to send requests for topics ptf.inbound.if with correlation ids in [33,40] (kafka.producer.async.DefaultEventHandler) [2017-04-17 18:44:31,078] ERROR Error in handling batch of 4 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:594) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
And tried to consume messages, but without success.
$ ./kafka-console-consumer.sh --zookeeper 0.0.0.0:2181 --topic ptf.inbound.if --from-beginning
$ toxiproxy-server ... INFO[0134] Accepted client client=127.0.0.1:50349 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092 WARN[0135] Source terminated bytes=83 err=read tcp 127.0.0.1:50350->127.0.0.1:9092: use of closed network connection name=test_proxy INFO[0136] Accepted client client=127.0.0.1:50366 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092 WARN[0137] Source terminated bytes=83 err=read tcp 127.0.0.1:50367->127.0.0.1:9092: use of closed network connection name=test_proxy INFO[0137] Accepted client client=127.0.0.1:50384 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092 WARN[0139] Source terminated bytes=83 err=read tcp 127.0.0.1:50385->127.0.0.1:9092: use of closed network connection name=test_proxy INFO[0139] Accepted client client=127.0.0.1:50401 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092 WARN[0140] Source terminated bytes=83 err=read tcp 127.0.0.1:50402->127.0.0.1:9092: use of closed network connection name=test_proxy INFO[0140] Accepted client client=127.0.0.1:50418 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092 WARN[0142] Source terminated bytes=83 err=read tcp 127.0.0.1:50419->127.0.0.1:9092: use of closed network connection name=test_proxy INFO[0142] Accepted client client=127.0.0.1:50435 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092 WARN[0143] Source terminated bytes=83 err=read tcp 127.0.0.1:50436->127.0.0.1:9092: use of closed network connection name=test_proxy
Is it okay or I miss something?
Thanks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Metadata
Assignees
Labels
No labels