Skip to content

Kafka is not supported? #171

Closed
Closed

Description

Hi All,
I tried to use toxiproxy for simulation bad connections to Kafka cluster. And it works like proxy, but when added some toxic it breaks connection and switch connection directly to original url (instead of use proxy, we receive messages without delay) or sometimes it just skips messages (don't receive messages).
I use kafka cluster (https://github.com/wurstmeister/kafka-docker) and toxiproxy-server.

$ docker ps -a
CONTAINER ID        IMAGE                       COMMAND                  CREATED             STATUS              PORTS                                                NAMES
51af67256d9e        kafkadocker_kafka           "start-kafka.sh"         23 hours ago        Up 23 hours         0.0.0.0:9092->9092/tcp                               kafkadocker_kafka_1
ccda8f90998a        wurstmeister/zookeeper      "/bin/sh -c '/usr/..."   23 hours ago        Up 23 hours         22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1 

And add proxy: 9999->9092

{
	"test_proxy": {
		"name": "test_proxy",
		"listen": "127.0.0.1:9999",
		"upstream": "0.0.0.0:9092",
		"enabled": true,
		"toxics": [
			{
				"attributes": {
					"latency": 2000,
					"jitter": 500
				},
				"name": "latency_downstream",
				"type": "latency",
				"stream": "downstream",
				"toxicity": 1
			}
		]
	}
}

I tried to send message(any string) with enabled toxic and then got it.

$ ./kafka-console-producer.sh --broker-list localhost:9999 --topic ptf.inbound.if
...
[2017-04-17 18:44:31,076] WARN Fetching topic metadata with correlation id 40 for topics [Set(ptf.inbound.if)] from broker [id:0,host:localhost,port:9999] failed (kafka.client.ClientUtils$)
java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
	at kafka.utils.Utils$.swallow(Utils.scala:172)
	at kafka.utils.Logging$class.swallowError(Logging.scala:106)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:594)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2017-04-17 18:44:31,077] ERROR fetching topic metadata for topics [Set(ptf.inbound.if)] from broker [ArrayBuffer(id:0,host:localhost,port:9999)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(ptf.inbound.if)] from broker [ArrayBuffer(id:0,host:localhost,port:9999)] failed
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
	at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
	at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
	at kafka.utils.Utils$.swallow(Utils.scala:172)
	at kafka.utils.Logging$class.swallowError(Logging.scala:106)
	at kafka.utils.Utils$.swallowError(Utils.scala:45)
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:594)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.net.SocketTimeoutException
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
	at kafka.utils.Utils$.read(Utils.scala:380)
	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
	at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
	at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
	at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	... 12 more
[2017-04-17 18:44:31,078] ERROR Failed to send requests for topics ptf.inbound.if with correlation ids in [33,40] (kafka.producer.async.DefaultEventHandler)
[2017-04-17 18:44:31,078] ERROR Error in handling batch of 4 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
	at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
	at scala.collection.immutable.Stream.foreach(Stream.scala:594)
	at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
	at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

And tried to consume messages, but without success.

$ ./kafka-console-consumer.sh --zookeeper 0.0.0.0:2181 --topic ptf.inbound.if  --from-beginning
$ toxiproxy-server
...
INFO[0134] Accepted client                               client=127.0.0.1:50349 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0135] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50350->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0136] Accepted client                               client=127.0.0.1:50366 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0137] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50367->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0137] Accepted client                               client=127.0.0.1:50384 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0139] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50385->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0139] Accepted client                               client=127.0.0.1:50401 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0140] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50402->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0140] Accepted client                               client=127.0.0.1:50418 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0142] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50419->127.0.0.1:9092: use of closed network connection name=test_proxy
INFO[0142] Accepted client                               client=127.0.0.1:50435 name=test_proxy proxy=127.0.0.1:9999 upstream=0.0.0.0:9092
WARN[0143] Source terminated                             bytes=83 err=read tcp 127.0.0.1:50436->127.0.0.1:9092: use of closed network connection name=test_proxy

Is it okay or I miss something?
Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions