Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded input and output plugins #5638

Closed
rbkasat opened this issue Mar 26, 2019 · 24 comments
Closed

Multithreaded input and output plugins #5638

rbkasat opened this issue Mar 26, 2019 · 24 comments
Labels
area/kafka feature request Requests for new plugin and for new features to existing plugins

Comments

@rbkasat
Copy link

rbkasat commented Mar 26, 2019

Feature Request

Opening a feature request kicks off a discussion.

Proposal:

Kafka Input plugin should be multithreaded to support multiple partitions.

Current behavior:

performs very badly with a single process

Desired behavior:

Use case: [Why is this important (helps with prioritizing requests)]

@danielnelson
Copy link
Contributor

Can you try specifying the plugin multiple times with the same consumer group and see if that gives you the performance you are expecting?

@danielnelson danielnelson added feature request Requests for new plugin and for new features to existing plugins area/kafka labels Mar 27, 2019
@rbkasat
Copy link
Author

rbkasat commented Mar 27, 2019

No It does not help ,I think it needs to be something similar to logstash pipelines where one flow is completely logically separate from others.
I tried embedding 5 kafka input plugins with the same consumer group, but unfortunately, it can't keep up with the incoming flow to kafka. If I spin up 5 separate processes, I do get a huge performance bump but managing multiple processes is chaotic.

@danielnelson
Copy link
Contributor

About how many messages are being processed with a single Telegraf and which data_format are you using? If possible, can you show some sample message data?

@rbkasat
Copy link
Author

rbkasat commented Mar 27, 2019

from Influxdb points measurements, we are processing 1.6 million points/node/min. in total =~ 27million/min

@danielnelson
Copy link
Contributor

Could enable the internal input plugin and check internal_gather,plugin=kafka_consumer?

@rbkasat
Copy link
Author

rbkasat commented Mar 28, 2019

Screen Shot 2019-03-27 at 5 58 45 PM

PFA, this is for one cluster and taken after 2 pm [7am-2pm] is normally peak hours. The graph has 10s interval configured so rollup is 10 sec. I have 3 clusters currently other two are bit smaller but eventually if possible, I want to merge all configs in one multithreaded telegraf config , [I tried that initially but performs extremely poorly]

Agent configs

[agent]
  interval = "1s"
  round_interval = true
  metric_batch_size = 20000
  metric_buffer_limit = 1000000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "1s"
  omit_hostname = true
  debug = true
  quiet = false
  logfile = "/var/log/relay/app.log"

@danielnelson
Copy link
Contributor

Can you show just a single Telegraf instance? I'm trying to get an idea of how badly it is performing with a single plugin. Also, can you add you kafka_consumer configuration (remove any sensitive info).

@rbkasat
Copy link
Author

rbkasat commented Mar 28, 2019

SELECT non_negative_derivative(sum("metrics_gathered") , 10s) FROM "telegraf_internal_gather" WHERE "input" = 'kafka_consumer' AND "host" = "XXXXXXx"
 AND $timeFilter GROUP BY host, time($__interval) fill(null)

Output of above query:
Screen Shot 2019-03-28 at 8 42 37 AM

[[inputs.kafka_consumer]]
  ## topic(s) to consume
  topics = ["topic"]
  brokers =  [" "]
  ## the name of the consumer group
  consumer_group = "metricsrelay"
  offset = "oldest"
  data_format = "influx"
  max_message_len = 765536

@danielnelson
Copy link
Contributor

I'm not able to tell from the image the actual rate, could you switch it to a table panel?

What is your Telegraf version?

@rbkasat
Copy link
Author

rbkasat commented Mar 28, 2019

I am using 1.8 telegraf
on an average, I see an ingest rate per node is 1.3millon points/min, ingest is a bit spiky.

@danielnelson
Copy link
Contributor

In Telegraf 1.8, the speed of all inputs is limited by the speed of the output. This prevents you from reading faster than you can write. If you are running up against this limit then adding additional inputs wouldn't help.

In 1.9 the shared input limit is removed but a new option max_undelivered_messages limits the number of unsent metrics.

Either way, the kafka_consumer plugin Telegraf will refuse to read faster than it can write, and this means we should also look at the performance of the output too. The easiest way to see if this helps is to write the metrics out to a /dev/null file output and compare, but obviously this would mean those metrics are gone, probably not something you want.

Can you show your output configurations and also let's take a look at the internal_write measurements, in particular write_time_ns and metrics_written.

@rbkasat
Copy link
Author

rbkasat commented Mar 29, 2019

@danielnelson I tripled the number of partitions in kafka and duplicated input plugin to match the partition count, it seems the input plugin is keeping up well but now output plugin seems to be running slow. I see millions of objects in memory and log message shows a million messages in buffer.
How can I improve influxdb output performance?
From influxdb side, I don't see any hinted handoffs or heavy load on the cluster. I think it can easily take more.

@danielnelson
Copy link
Contributor

A single Telegraf does send sequentially to InfluxDB, usually this isn't a problem since it is common to multiple Telegraf sending to the database, which usually gives plenty of concurrency. How many total Telegraf are sending to InfluxDB, is it just this one instance now?

@rbkasat
Copy link
Author

rbkasat commented Mar 29, 2019

no, I have 20 nodes sending to the influx. how can make influx writes go concurrent on the same node?
I changed the flush time to 5s which improved performance a little and then removed it altogether.
I need influxdb output to go parallelly. Else spinning up multiple consumers won't help much, it is just filling up the queue..

@danielnelson
Copy link
Contributor

If you have 20 nodes then I would expect plenty of concurrency in aggregate, so it may be that adding more concurrency will be offset by longer request times.

But to answer your question, in a single Telegraf process the only way is to define the output plugin multiple times, and then use the namepass/namedrop or tagpass/tagdrop options to split the data. It can be quite difficult to balance the outputs, but this is the only way currently to "shard" the data. You can see an example of this in the configuration docs (second example from the bottom).

Telegraf flushes immediately after receiving a full metric_batch_size worth of new metrics, and the flush_interval really only comes into play during periods of low throughput, think of it as the max time until metrics will be sent. The main configuration option you have is the metric_batch_size, depending on the network and system characteristics this usually should be between 1000 and 10000, but the best approach is to double/half this setting, which watching the write_time_ns and metrics_written metrics. Try to find the metric_batch_size that gives you the best throughput write_time_ns / metrics_written.

Are you still using Telegraf 1.8? Normally this version shouldn't fill the metric_buffer unless an error occurs in the output. If the buffer is filling up you should check the logfile for errors.

@rbkasat
Copy link
Author

rbkasat commented Mar 29, 2019

I just moved to the latest telegraph just to see if a new one is better, since then I started seeing full buffer and high reads from kafka. But performance gain was quite significant, could be due to multiple partitions too,

regarding tag pass and tagdrop, if I have multiple clusters and I had a telegraf config with multiple config rules with namepass/drop and tagpass/drop. But since earlier issues of parallel processing, I moved to multiple processes on one box and simplified the configs.

My understanding is, all output plugins share the same queue need to process the same amount of events,
eg:
I have metrics for customer1 [C1] tagged by C1's id and customer2[C2] C2's events tagged by C2's id, ...
c1's metrics are in k_c1 Kafka topic and c2's metrics are in k_c2

now for k_c1, we get 1million points/min whereas for k_c2 we get only 100K/min total 1.1million. I have a Kafka input plugin reading from both topics and defined two output plugins, one for sending to cluster1 and second for sending to cluster 2.

Now my question is,

  1. will output plugin 1 & 2 both need to process 1.1 million points independently or there is some inbuilt smartness?
  2. If cluster2 is down, will it cause the entire pipeline to be down? Since the shared queue won't get flushed once it's filled with cluster2 events while cluster 2 is down and end up impacting other clusters?
  3. In my above scenario, If I spin up 2 processes, one consuming from topic k_c1 a second consuming from k_c2 , would that be a better approach and if yes, how do I manage multiple processes?

Example Config

[[outputs.influxdb]]
  urls = ["http://cluster_c1:8086"]
  database = "C1"
  precision = "s"
  retention_policy = "default"
  write_consistency = "any"
  [outputs.influxdb.tagpass]
   customer = ["c1"]

[[outputs.influxdb]]
  urls = ["http://cluster_c2:8086"]
  database = "C2"
  precision = "s"
  retention_policy = "default"
  write_consistency = "any"
  [outputs.influxdb.tagpass]
   customer = ["c2"]

[[inputs.kafka_avro_consumer]]
  topics = ["k_c1", "k_c2"]
  brokers =  [ ]
  consumer_group = "metricsrelay"
  offset = "newest"
  data_format = "influx"
  max_message_len = 765536

@danielnelson
Copy link
Contributor

Each output plugin actually has it's own separate buffer, it will only contain the metrics which make it past the tagpass step. In the latest Telegraf, one output being down should not effect others, because we have removed the input blocking behavior as mentioned earlier.

@rbkasat
Copy link
Author

rbkasat commented Mar 29, 2019

does it mean, if one output is down and its internal queue is full, input plugin will drop events?

@danielnelson
Copy link
Contributor

Speaking about Telegraf 1.9 and newer. When using most other input plugins, a down output could drop its metrics, but metrics going to another plugin would be unaffected because each output has its own metric buffer. The kafka_consumer input has special handling though where it will pause reading from the queue until its metrics are sent. It is possible the metrics originating from another input plugin and going to the same output could replace the metrics from the kafka_consumer metric buffer, but in this case the queue offset will not be updated and the queue message should be read again.

@rbkasat
Copy link
Author

rbkasat commented Apr 2, 2019

@danielnelson As per your suggestion, I did split the influxdb output into multiple using tag pass/ tag drop mechanism, It seems to be helping a bit but still dropping lot of points. I think I dont have proper distribution of metrics per queue yet. Is it possible to see per plugin queue usage. Eg , If I have 4 influxdb output plugin, metrics should show influxdb plugin1 x number of writes, queue size.. influxdb plugin2 ... and so on.. then I can balance it better.

@danielnelson
Copy link
Contributor

Unfortunately the metrics we keep in the internal plugin are per plugin type, instead of per plugin. So they all get mixed together when you have multiple instances of a plugin. I think we need to add a way to name plugins as suggested in #1815 to solve this.

@rbkasat
Copy link
Author

rbkasat commented May 17, 2019

@danielnelson I think your suggestion helped us get to the scale we need almost doing 30million+/min :), but we need the capability to adjust the queue size per plugin, should I open a feature request for this?

@danielnelson
Copy link
Contributor

Which queue size are we talking about?

@danielnelson
Copy link
Contributor

@rbkasat Glad things are working well overall, I'm going to close this issue but if you find you still need a change then just go ahead and open a feature request, and we can discuss further on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/kafka feature request Requests for new plugin and for new features to existing plugins
Projects
None yet
Development

No branches or pull requests

2 participants