diff --git a/Appraisals b/Appraisals index 01c6dc21f4..669c9fce80 100644 --- a/Appraisals +++ b/Appraisals @@ -195,6 +195,7 @@ elsif Gem::Version.new('2.1.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel', '~> 4.0', '< 4.37' gem 'shoryuken' gem 'sidekiq', '~> 3.5.4' @@ -356,6 +357,7 @@ elsif Gem::Version.new('2.2.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -517,6 +519,7 @@ elsif Gem::Version.new('2.3.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -597,6 +600,7 @@ elsif Gem::Version.new('2.4.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -710,6 +714,7 @@ elsif Gem::Version.new('2.5.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -823,6 +828,7 @@ elsif Gem::Version.new('2.6.0') <= Gem::Version.new(RUBY_VERSION) \ gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' @@ -938,6 +944,7 @@ elsif Gem::Version.new('2.7.0') <= Gem::Version.new(RUBY_VERSION) gem 'redis', '< 4.0' gem 'rest-client' gem 'resque', '< 2.0' + gem 'ruby-kafka', '>= 0.7.10' gem 'sequel' gem 'shoryuken' gem 'sidekiq' diff --git a/Rakefile b/Rakefile index 814665d1ed..74c77a7acb 100644 --- a/Rakefile +++ b/Rakefile @@ -96,6 +96,7 @@ namespace :spec do :grpc, :http, :httprb, + :kafka, :mongodb, :mysql2, :presto, @@ -122,7 +123,7 @@ end namespace :test do task all: [:main, :rails, - :sidekiq, :monkey] + :monkey] Rake::TestTask.new(:main) do |t| t.libs << %w[test lib] @@ -141,7 +142,6 @@ namespace :test do [ :grape, - :sidekiq, :sucker_punch ].each do |contrib| Rake::TestTask.new(contrib) do |t| @@ -200,7 +200,6 @@ task :ci do if RUBY_PLATFORM != 'java' # Contrib minitests sh 'bundle exec appraisal contrib-old rake test:monkey' - sh 'bundle exec appraisal contrib-old rake test:sidekiq' sh 'bundle exec appraisal contrib-old rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib-old rake spec:active_model_serializers' @@ -258,7 +257,6 @@ task :ci do if RUBY_PLATFORM != 'java' # Contrib minitests sh 'bundle exec appraisal contrib-old rake test:monkey' - sh 'bundle exec appraisal contrib-old rake test:sidekiq' sh 'bundle exec appraisal contrib-old rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib-old rake spec:active_model_serializers' @@ -323,7 +321,6 @@ task :ci do if RUBY_PLATFORM != 'java' # Contrib minitests sh 'bundle exec appraisal contrib rake test:grape' - sh 'bundle exec appraisal contrib rake test:sidekiq' sh 'bundle exec appraisal contrib rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib rake spec:action_pack' @@ -399,7 +396,6 @@ task :ci do if RUBY_PLATFORM != 'java' # Contrib minitests sh 'bundle exec appraisal contrib rake test:grape' - sh 'bundle exec appraisal contrib rake test:sidekiq' sh 'bundle exec appraisal contrib rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib rake spec:action_pack' @@ -480,7 +476,6 @@ task :ci do sh 'bundle exec rake benchmark' # Contrib minitests sh 'bundle exec appraisal contrib rake test:grape' - sh 'bundle exec appraisal contrib rake test:sidekiq' sh 'bundle exec appraisal contrib rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib rake spec:action_pack' @@ -546,7 +541,6 @@ task :ci do sh 'bundle exec rake benchmark' # Contrib minitests sh 'bundle exec appraisal contrib rake test:grape' - sh 'bundle exec appraisal contrib rake test:sidekiq' sh 'bundle exec appraisal contrib rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib rake spec:action_pack' @@ -622,7 +616,6 @@ task :ci do sh 'bundle exec rake benchmark' # Contrib minitests sh 'bundle exec appraisal contrib rake test:grape' - sh 'bundle exec appraisal contrib rake test:sidekiq' sh 'bundle exec appraisal contrib rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib rake spec:action_pack' @@ -697,7 +690,6 @@ task :ci do sh 'bundle exec rake benchmark' # Contrib minitests sh 'bundle exec appraisal contrib rake test:grape' - sh 'bundle exec appraisal contrib rake test:sidekiq' sh 'bundle exec appraisal contrib rake test:sucker_punch' # Contrib specs sh 'bundle exec appraisal contrib rake spec:action_pack' diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index e78698d693..8bf2563ee2 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -345,7 +345,8 @@ For a list of available integrations, and their configuration options, please re | Grape | `grape` | `>= 1.0` | *[Link](#grape)* | *[Link](https://github.com/ruby-grape/grape)* | | GraphQL | `graphql` | `>= 1.7.9` | *[Link](#graphql)* | *[Link](https://github.com/rmosolgo/graphql-ruby)* | | gRPC | `grpc` | `>= 1.7` | *[Link](#grpc)* | *[Link](https://github.com/grpc/grpc/tree/master/src/rubyc)* | -| http.rb | `httprb` | `>= 2.0` | *[Link](#http.rb)* | *[Link](https://github.com/httprb/http)* | +| http.rb | `httprb` | `>= 2.0` | *[Link](#http.rb)* | *[Link](https://github.com/httprb/http)* | +| Kafka | `ruby-kafka` | `>= 0.7.10` | *[Link](#kafka)* | | | MongoDB | `mongo` | `>= 2.1` | *[Link](#mongodb)* | *[Link](https://github.com/mongodb/mongo-ruby-driver)* | | MySQL2 | `mysql2` | `>= 0.3.21` | *[Link](#mysql2)* | *[Link](https://github.com/brianmario/mysql2)* | | Net/HTTP | `http` | *(Any supported Ruby)* | *[Link](#nethttp)* | *[Link](https://ruby-doc.org/stdlib-2.4.0/libdoc/net/http/rdoc/Net/HTTP.html)* | @@ -938,10 +939,8 @@ The http.rb integration will trace any HTTP call using the Http.rb gem. ```ruby require 'http' require 'ddtrace' - Datadog.configure do |c| c.use :httprb, options - # optionally, specify a different service name for hostnames matching a regex c.use :httprb, describes: /user-[^.]+\.example\.com/ do |httprb| httprb.service_name = 'user.example.com' @@ -959,6 +958,30 @@ Where `options` is an optional `Hash` that accepts the following parameters: | `service_name` | Service name for `httprb` instrumentation. | `'httprb'` | | `split_by_domain` | Uses the request domain as the service name when set to `true`. | `false` | +### Kafka + +The Kafka integration provides tracing of the `ruby-kafka` gem: + +You can enable it through `Datadog.configure`: + +```ruby +require 'active_support/notifications' # required to enable 'ruby-kafka' instrumentation +require 'kafka' +require 'ddtrace' + +Datadog.configure do |c| + c.use :kafka, options +end +``` + +Where `options` is an optional `Hash` that accepts the following parameters: + +| Key | Description | Default | +| --- | ----------- | ------- | +| `analytics_enabled` | Enable analytics for spans produced by this integration. `true` for on, `nil` to defer to global setting, `false` for off. | `false` | +| `service_name` | Service name used for `kafka` instrumentation | `'kafka'` | +| `tracer` | `Datadog::Tracer` used to perform instrumentation. Usually you don't need to set this. | `Datadog.tracer` | + ### MongoDB The integration traces any `Command` that is sent from the [MongoDB Ruby Driver](https://github.com/mongodb/mongo-ruby-driver) to a MongoDB cluster. By extension, Object Document Mappers (ODM) such as Mongoid are automatically instrumented if they use the official Ruby driver. To activate the integration, simply: diff --git a/lib/ddtrace.rb b/lib/ddtrace.rb index 523d8b5956..4b1e22971e 100644 --- a/lib/ddtrace.rb +++ b/lib/ddtrace.rb @@ -57,6 +57,7 @@ module Datadog require 'ddtrace/contrib/http/integration' require 'ddtrace/contrib/httprb/integration' require 'ddtrace/contrib/integration' +require 'ddtrace/contrib/kafka/integration' require 'ddtrace/contrib/presto/integration' require 'ddtrace/contrib/mysql2/integration' require 'ddtrace/contrib/mongodb/integration' diff --git a/lib/ddtrace/contrib/kafka/configuration/settings.rb b/lib/ddtrace/contrib/kafka/configuration/settings.rb new file mode 100644 index 0000000000..f2bafd8b9c --- /dev/null +++ b/lib/ddtrace/contrib/kafka/configuration/settings.rb @@ -0,0 +1,25 @@ +require 'ddtrace/contrib/configuration/settings' +require 'ddtrace/contrib/kafka/ext' + +module Datadog + module Contrib + module Kafka + module Configuration + # Custom settings for the Kafka integration + class Settings < Contrib::Configuration::Settings + option :analytics_enabled do |o| + o.default { env_to_bool(Ext::ENV_ANALYTICS_ENABLED, false) } + o.lazy + end + + option :analytics_sample_rate do |o| + o.default { env_to_float(Ext::ENV_ANALYTICS_SAMPLE_RATE, 1.0) } + o.lazy + end + + option :service_name, default: Ext::SERVICE_NAME + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/consumer_event.rb b/lib/ddtrace/contrib/kafka/consumer_event.rb new file mode 100644 index 0000000000..d06eaf7bb1 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/consumer_event.rb @@ -0,0 +1,14 @@ +module Datadog + module Contrib + module Kafka + # Defines basic behaviors for an event for a consumer. + module ConsumerEvent + def process(span, _event, _id, payload) + super + + span.set_tag(Ext::TAG_GROUP, payload[:group_id]) + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/consumer_group_event.rb b/lib/ddtrace/contrib/kafka/consumer_group_event.rb new file mode 100644 index 0000000000..46da336182 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/consumer_group_event.rb @@ -0,0 +1,14 @@ +module Datadog + module Contrib + module Kafka + # Defines basic behaviors for an event for a consumer group. + module ConsumerGroupEvent + def process(span, _event, _id, payload) + super + + span.resource = payload[:group_id] + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/event.rb b/lib/ddtrace/contrib/kafka/event.rb new file mode 100644 index 0000000000..608e5ae468 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/event.rb @@ -0,0 +1,51 @@ +require 'ddtrace/contrib/analytics' +require 'ddtrace/contrib/active_support/notifications/event' +require 'ddtrace/contrib/kafka/ext' + +module Datadog + module Contrib + module Kafka + # Defines basic behaviors for an ActiveSupport event. + module Event + def self.included(base) + base.send(:include, ActiveSupport::Notifications::Event) + base.send(:extend, ClassMethods) + end + + # Class methods for Kafka events. + module ClassMethods + def event_name + self::EVENT_NAME + end + + def span_options + { service: configuration[:service_name] } + end + + def tracer + -> { configuration[:tracer] } + end + + def configuration + Datadog.configuration[:kafka] + end + + def process(span, _event, _id, payload) + span.service = configuration[:service_name] + span.set_tag(Ext::TAG_CLIENT, payload[:client_id]) + + # Set analytics sample rate + if Contrib::Analytics.enabled?(configuration[:analytics_enabled]) + Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate]) + end + + # Measure service stats + Contrib::Analytics.set_measured(span) + + span.set_error(payload[:exception_object]) if payload[:exception_object] + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events.rb b/lib/ddtrace/contrib/kafka/events.rb new file mode 100644 index 0000000000..3b961d596c --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events.rb @@ -0,0 +1,44 @@ +require 'ddtrace/contrib/kafka/events/connection/request' +require 'ddtrace/contrib/kafka/events/consumer/process_batch' +require 'ddtrace/contrib/kafka/events/consumer/process_message' +require 'ddtrace/contrib/kafka/events/consumer_group/heartbeat' +require 'ddtrace/contrib/kafka/events/consumer_group/join_group' +require 'ddtrace/contrib/kafka/events/consumer_group/leave_group' +require 'ddtrace/contrib/kafka/events/consumer_group/sync_group' +require 'ddtrace/contrib/kafka/events/produce_operation/send_messages' +require 'ddtrace/contrib/kafka/events/producer/deliver_messages' + +module Datadog + module Contrib + module Kafka + # Defines collection of instrumented Kafka events + module Events + ALL = [ + Events::Connection::Request, + Events::Consumer::ProcessBatch, + Events::Consumer::ProcessMessage, + Events::ConsumerGroup::Heartbeat, + Events::ConsumerGroup::JoinGroup, + Events::ConsumerGroup::LeaveGroup, + Events::ConsumerGroup::SyncGroup, + Events::ProduceOperation::SendMessages, + Events::Producer::DeliverMessages + ].freeze + + module_function + + def all + self::ALL + end + + def subscriptions + all.collect(&:subscriptions).collect(&:to_a).flatten + end + + def subscribe! + all.each(&:subscribe!) + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/connection/request.rb b/lib/ddtrace/contrib/kafka/events/connection/request.rb new file mode 100644 index 0000000000..f7ae74cb15 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/connection/request.rb @@ -0,0 +1,34 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' + +module Datadog + module Contrib + module Kafka + module Events + module Connection + # Defines instrumentation for request.connection.kafka event + module Request + include Kafka::Event + + EVENT_NAME = 'request.connection.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.resource = payload[:api] + + span.set_tag(Ext::TAG_REQUEST_SIZE, payload[:request_size]) if payload.key?(:request_size) + span.set_tag(Ext::TAG_RESPONSE_SIZE, payload[:response_size]) if payload.key?(:response_size) + end + + module_function + + def span_name + Ext::SPAN_CONNECTION_REQUEST + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb b/lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb new file mode 100644 index 0000000000..b6001bc936 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer/process_batch.rb @@ -0,0 +1,41 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' + +module Datadog + module Contrib + module Kafka + module Events + module Consumer + # Defines instrumentation for process_batch.consumer.kafka event + module ProcessBatch + include Kafka::Event + extend Kafka::ConsumerEvent + + EVENT_NAME = 'process_batch.consumer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.resource = payload[:topic] + + span.set_tag(Ext::TAG_TOPIC, payload[:topic]) if payload.key?(:topic) + span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count) + span.set_tag(Ext::TAG_PARTITION, payload[:partition]) if payload.key?(:partition) + if payload.key?(:highwater_mark_offset) + span.set_tag(Ext::TAG_HIGHWATER_MARK_OFFSET, payload[:highwater_mark_offset]) + end + span.set_tag(Ext::TAG_OFFSET_LAG, payload[:offset_lag]) if payload.key?(:offset_lag) + end + + module_function + + def span_name + Ext::SPAN_PROCESS_BATCH + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer/process_message.rb b/lib/ddtrace/contrib/kafka/events/consumer/process_message.rb new file mode 100644 index 0000000000..d732fda5a4 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer/process_message.rb @@ -0,0 +1,39 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' + +module Datadog + module Contrib + module Kafka + module Events + module Consumer + # Defines instrumentation for process_message.consumer.kafka event + module ProcessMessage + include Kafka::Event + extend Kafka::ConsumerEvent + + EVENT_NAME = 'process_message.consumer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.resource = payload[:topic] + + span.set_tag(Ext::TAG_TOPIC, payload[:topic]) if payload.key?(:topic) + span.set_tag(Ext::TAG_MESSAGE_KEY, payload[:key]) if payload.key?(:key) + span.set_tag(Ext::TAG_PARTITION, payload[:partition]) if payload.key?(:partition) + span.set_tag(Ext::TAG_OFFSET, payload[:offset]) if payload.key?(:offset) + span.set_tag(Ext::TAG_OFFSET_LAG, payload[:offset_lag]) if payload.key?(:offset_lag) + end + + module_function + + def span_name + Ext::SPAN_PROCESS_MESSAGE + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb new file mode 100644 index 0000000000..bc31a717c6 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/heartbeat.rb @@ -0,0 +1,39 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for heartbeat.consumer.kafka event + module Heartbeat + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'heartbeat.consumer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + if payload.key?(:topic_partitions) + payload[:topic_partitions].each do |topic, partitions| + span.set_tag("#{Ext::TAG_TOPIC_PARTITIONS}.#{topic}", partitions) + end + end + end + + module_function + + def span_name + Ext::SPAN_CONSUMER_HEARTBEAT + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb new file mode 100644 index 0000000000..cbdb4fb1ef --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/join_group.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for join_group.consumer.kafka event + module JoinGroup + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'join_group.consumer.kafka'.freeze + + module_function + + def span_name + Ext::SPAN_CONSUMER_JOIN_GROUP + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb new file mode 100644 index 0000000000..99b9a51d1c --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/leave_group.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for leave_group.consumer.kafka event + module LeaveGroup + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'leave_group.consumer.kafka'.freeze + + module_function + + def span_name + Ext::SPAN_CONSUMER_LEAVE_GROUP + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb b/lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb new file mode 100644 index 0000000000..6809a94cdf --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/consumer_group/sync_group.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' +require 'ddtrace/contrib/kafka/consumer_event' +require 'ddtrace/contrib/kafka/consumer_group_event' + +module Datadog + module Contrib + module Kafka + module Events + module ConsumerGroup + # Defines instrumentation for sync_group.consumer.kafka event + module SyncGroup + include Kafka::Event + extend Kafka::ConsumerEvent + extend Kafka::ConsumerGroupEvent + + EVENT_NAME = 'sync_group.consumer.kafka'.freeze + + module_function + + def span_name + Ext::SPAN_CONSUMER_SYNC_GROUP + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb b/lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb new file mode 100644 index 0000000000..db16642848 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/produce_operation/send_messages.rb @@ -0,0 +1,32 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' + +module Datadog + module Contrib + module Kafka + module Events + module ProduceOperation + # Defines instrumentation for send_messages.producer.kafka event + module SendMessages + include Kafka::Event + + EVENT_NAME = 'send_messages.producer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count) + span.set_tag(Ext::TAG_SENT_MESSAGE_COUNT, payload[:sent_message_count]) if payload.key?(:sent_message_count) + end + + module_function + + def span_name + Ext::SPAN_SEND_MESSAGES + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb b/lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb new file mode 100644 index 0000000000..fe81299f90 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/events/producer/deliver_messages.rb @@ -0,0 +1,35 @@ +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/event' + +module Datadog + module Contrib + module Kafka + module Events + module Producer + # Defines instrumentation for deliver_messages.producer.kafka event + module DeliverMessages + include Kafka::Event + + EVENT_NAME = 'deliver_messages.producer.kafka'.freeze + + def self.process(span, _event, _id, payload) + super + + span.set_tag(Ext::TAG_ATTEMPTS, payload[:attempts]) if payload.key?(:attempts) + span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count) + if payload.key?(:delivered_message_count) + span.set_tag(Ext::TAG_DELIVERED_MESSAGE_COUNT, payload[:delivered_message_count]) + end + end + + module_function + + def span_name + Ext::SPAN_DELIVER_MESSAGES + end + end + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/ext.rb b/lib/ddtrace/contrib/kafka/ext.rb new file mode 100644 index 0000000000..2b7440c238 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/ext.rb @@ -0,0 +1,38 @@ +module Datadog + module Contrib + module Kafka + # Kafka integration constants + module Ext + APP = 'kafka'.freeze + ENV_ANALYTICS_ENABLED = 'DD_KAFKA_ANALYTICS_ENABLED'.freeze + ENV_ANALYTICS_SAMPLE_RATE = 'DD_KAFKA_ANALYTICS_SAMPLE_RATE'.freeze + SERVICE_NAME = 'kafka'.freeze + SPAN_CONNECTION_REQUEST = 'kafka.connection.request'.freeze + SPAN_CONSUMER_HEARTBEAT = 'kafka.consumer.heartbeat'.freeze + SPAN_CONSUMER_JOIN_GROUP = 'kafka.consumer.join_group'.freeze + SPAN_CONSUMER_LEAVE_GROUP = 'kafka.consumer.leave_group'.freeze + SPAN_CONSUMER_SYNC_GROUP = 'kafka.consumer.sync_group'.freeze + SPAN_DELIVER_MESSAGES = 'kafka.producer.deliver_messages'.freeze + SPAN_PROCESS_BATCH = 'kafka.consumer.process_batch'.freeze + SPAN_PROCESS_MESSAGE = 'kafka.consumer.process_message'.freeze + SPAN_SEND_MESSAGES = 'kafka.producer.send_messages'.freeze + TAG_ATTEMPTS = 'kafka.attempts'.freeze + TAG_API = 'kafka.api'.freeze + TAG_CLIENT = 'kafka.client'.freeze + TAG_GROUP = 'kafka.group'.freeze + TAG_HIGHWATER_MARK_OFFSET = 'kafka.highwater_mark_offset'.freeze + TAG_MESSAGE_COUNT = 'kafka.message_count'.freeze + TAG_MESSAGE_KEY = 'kafka.message_key'.freeze + TAG_DELIVERED_MESSAGE_COUNT = 'kafka.delivered_message_count'.freeze + TAG_OFFSET = 'kafka.offset'.freeze + TAG_OFFSET_LAG = 'kafka.offset_lag'.freeze + TAG_PARTITION = 'kafka.partition'.freeze + TAG_REQUEST_SIZE = 'kafka.request_size'.freeze + TAG_RESPONSE_SIZE = 'kafka.response_size'.freeze + TAG_SENT_MESSAGE_COUNT = 'kafka.sent_message_count'.freeze + TAG_TOPIC = 'kafka.topic'.freeze + TAG_TOPIC_PARTITIONS = 'kafka.topic_partitions'.freeze + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/integration.rb b/lib/ddtrace/contrib/kafka/integration.rb new file mode 100644 index 0000000000..a1d4ef632b --- /dev/null +++ b/lib/ddtrace/contrib/kafka/integration.rb @@ -0,0 +1,39 @@ +require 'ddtrace/contrib/integration' +require 'ddtrace/contrib/kafka/configuration/settings' +require 'ddtrace/contrib/kafka/patcher' + +module Datadog + module Contrib + module Kafka + # Description of Kafka integration + class Integration + include Contrib::Integration + + MINIMUM_VERSION = Gem::Version.new('0.7.10') + + register_as :kafka, auto_patch: false + + def self.version + Gem.loaded_specs['ruby-kafka'] && Gem.loaded_specs['ruby-kafka'].version + end + + def self.loaded? + !defined?(::Kafka).nil? \ + && !defined?(::ActiveSupport::Notifications).nil? + end + + def self.compatible? + super && version >= MINIMUM_VERSION + end + + def default_configuration + Configuration::Settings.new + end + + def patcher + Patcher + end + end + end + end +end diff --git a/lib/ddtrace/contrib/kafka/patcher.rb b/lib/ddtrace/contrib/kafka/patcher.rb new file mode 100644 index 0000000000..4ba2836744 --- /dev/null +++ b/lib/ddtrace/contrib/kafka/patcher.rb @@ -0,0 +1,26 @@ +require 'ddtrace/contrib/patcher' +require 'ddtrace/ext/app_types' +require 'ddtrace/contrib/kafka/ext' +require 'ddtrace/contrib/kafka/events' + +module Datadog + module Contrib + module Kafka + # Patcher enables patching of 'kafka' module. + module Patcher + include Contrib::Patcher + + module_function + + def target_version + Integration.version + end + + def patch + # Subscribe to Kafka events + Events.subscribe! + end + end + end + end +end diff --git a/lib/ddtrace/contrib/sidekiq/ext.rb b/lib/ddtrace/contrib/sidekiq/ext.rb index 040c7e78a2..2c115b2ea5 100644 --- a/lib/ddtrace/contrib/sidekiq/ext.rb +++ b/lib/ddtrace/contrib/sidekiq/ext.rb @@ -15,6 +15,7 @@ module Ext TAG_JOB_ID = 'sidekiq.job.id'.freeze TAG_JOB_QUEUE = 'sidekiq.job.queue'.freeze TAG_JOB_RETRY = 'sidekiq.job.retry'.freeze + TAG_JOB_RETRY_COUNT = 'sidekiq.job.retry_count'.freeze TAG_JOB_WRAPPER = 'sidekiq.job.wrapper'.freeze TAG_JOB_ARGS = 'sidekiq.job.args'.freeze end diff --git a/lib/ddtrace/contrib/sidekiq/server_tracer.rb b/lib/ddtrace/contrib/sidekiq/server_tracer.rb index a6050f8c11..893a025168 100644 --- a/lib/ddtrace/contrib/sidekiq/server_tracer.rb +++ b/lib/ddtrace/contrib/sidekiq/server_tracer.rb @@ -31,6 +31,7 @@ def call(worker, job, queue) span.set_tag(Ext::TAG_JOB_ID, job['jid']) span.set_tag(Ext::TAG_JOB_RETRY, job['retry']) + span.set_tag(Ext::TAG_JOB_RETRY_COUNT, job['retry_count']) span.set_tag(Ext::TAG_JOB_QUEUE, job['queue']) span.set_tag(Ext::TAG_JOB_WRAPPER, job['class']) if job['wrapped'] span.set_tag(Ext::TAG_JOB_DELAY, 1000.0 * (Time.now.utc.to_f - job['enqueued_at'].to_f)) diff --git a/lib/ddtrace/pipeline/span_filter.rb b/lib/ddtrace/pipeline/span_filter.rb index aa5b5d9e17..ce2952da5c 100644 --- a/lib/ddtrace/pipeline/span_filter.rb +++ b/lib/ddtrace/pipeline/span_filter.rb @@ -10,12 +10,22 @@ def initialize(filter = nil, &block) @criteria = filter || block end + # Note: this SpanFilter implementation only handles traces in which child spans appear + # after parent spans in the trace array. If in the future child spans can be before + # parent spans, then the code below will need to be updated. def call(trace) - black_list = trace.select(&method(:drop_it?)) - - clean_trace(black_list, trace) while black_list.any? - - trace + deleted = Set.new + + trace.delete_if do |span| + if deleted.include?(span.parent) + deleted << span + true + else + drop = drop_it?(span) + deleted << span if drop + drop + end + end end private @@ -23,16 +33,6 @@ def call(trace) def drop_it?(span) @criteria.call(span) rescue false end - - def clean_trace(black_list, trace) - current = black_list.shift - - trace.delete(current) - - trace.each do |span| - black_list << span if span.parent == current - end - end end end end diff --git a/lib/ddtrace/writer.rb b/lib/ddtrace/writer.rb index abbe4d0f5b..b0c07492f2 100644 --- a/lib/ddtrace/writer.rb +++ b/lib/ddtrace/writer.rb @@ -43,9 +43,18 @@ def initialize(options = {}) @worker = nil end - # spawns a worker for spans; they share the same transport which is thread-safe def start - @pid = Process.pid + @mutex_after_fork.synchronize do + pid = Process.pid + return if @worker && pid == @pid + @pid = pid + start_worker + true + end + end + + # spawns a worker for spans; they share the same transport which is thread-safe + def start_worker @trace_handler = ->(items, transport) { send_spans(items, transport) } @worker = Datadog::Workers::AsyncTransport.new( transport: @transport, @@ -57,14 +66,19 @@ def start @worker.start end - # stops worker for spans. def stop - return if worker.nil? + @mutex_after_fork.synchronize { stop_worker } + end + + def stop_worker + return if @worker.nil? @worker.stop @worker = nil true end + private :start_worker, :stop_worker + # flush spans to the trace-agent, handles spans only def send_spans(traces, transport) return true if traces.empty? @@ -106,13 +120,7 @@ def write(trace, services = nil) # # This check ensures that if a process doesn't own the current +Writer+, async workers # will be initialized again (but only once for each process). - pid = Process.pid - if pid != @pid # avoid using Mutex when pids are equal - @mutex_after_fork.synchronize do - # we should start threads because the worker doesn't own this - start if pid != @pid - end - end + start if @worker.nil? || @pid != Process.pid # TODO: Remove this, and have the tracer pump traces directly to runtime metrics # instead of working through the trace writer. @@ -121,7 +129,13 @@ def write(trace, services = nil) Datadog.runtime_metrics.associate_with_span(trace.first) end - @worker.enqueue_trace(trace) + worker_local = @worker + + if worker_local + worker_local.enqueue_trace(trace) + else + Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete') + end end # stats returns a dictionary of stats about the writer. diff --git a/spec/ddtrace/contrib/kafka/integration_spec.rb b/spec/ddtrace/contrib/kafka/integration_spec.rb new file mode 100644 index 0000000000..fc594c8aac --- /dev/null +++ b/spec/ddtrace/contrib/kafka/integration_spec.rb @@ -0,0 +1,94 @@ +require 'ddtrace/contrib/support/spec_helper' + +require 'ddtrace/contrib/kafka/integration' + +RSpec.describe Datadog::Contrib::Kafka::Integration do + extend ConfigurationHelpers + + let(:integration) { described_class.new(:kafka) } + + describe '.version' do + subject(:version) { described_class.version } + + context 'when the "ruby-kafka" gem is loaded' do + include_context 'loaded gems', :'ruby-kafka' => described_class::MINIMUM_VERSION + it { is_expected.to be_a_kind_of(Gem::Version) } + end + + context 'when "ruby-kafka" gem is not loaded' do + include_context 'loaded gems', :'ruby-kafka' => nil + it { is_expected.to be nil } + end + end + + describe '.loaded?' do + subject(:loaded?) { described_class.loaded? } + + context 'when neither Kafka or ActiveSupport::Notifications are defined' do + before do + hide_const('Kafka') + hide_const('ActiveSupport::Notifications') + end + + it { is_expected.to be false } + end + + context 'when only Kafka is defined' do + before do + stub_const('Kafka', Class.new) + hide_const('ActiveSupport::Notifications') + end + + it { is_expected.to be false } + end + + context 'when only ActiveSupport::Notifications is defined' do + before do + hide_const('Kafka') + stub_const('ActiveSupport::Notifications', Class.new) + end + + it { is_expected.to be false } + end + + context 'when both Kafka and ActiveSupport::Notifications are defined' do + before do + stub_const('Kafka', Class.new) + stub_const('ActiveSupport::Notifications', Class.new) + end + + it { is_expected.to be true } + end + end + + describe '.compatible?' do + subject(:compatible?) { described_class.compatible? } + + context 'when "ruby-kafka" gem is loaded with a version' do + context 'that is less than the minimum' do + include_context 'loaded gems', :'ruby-kafka' => decrement_gem_version(described_class::MINIMUM_VERSION) + it { is_expected.to be false } + end + + context 'that meets the minimum version' do + include_context 'loaded gems', :'ruby-kafka' => described_class::MINIMUM_VERSION + it { is_expected.to be true } + end + end + + context 'when gem is not loaded' do + include_context 'loaded gems', :'ruby-kafka' => nil + it { is_expected.to be false } + end + end + + describe '#default_configuration' do + subject(:default_configuration) { integration.default_configuration } + it { is_expected.to be_a_kind_of(Datadog::Contrib::Kafka::Configuration::Settings) } + end + + describe '#patcher' do + subject(:patcher) { integration.patcher } + it { is_expected.to be Datadog::Contrib::Kafka::Patcher } + end +end diff --git a/spec/ddtrace/contrib/kafka/patcher_spec.rb b/spec/ddtrace/contrib/kafka/patcher_spec.rb new file mode 100644 index 0000000000..dae43ca7da --- /dev/null +++ b/spec/ddtrace/contrib/kafka/patcher_spec.rb @@ -0,0 +1,659 @@ +require 'ddtrace/contrib/support/spec_helper' +require 'ddtrace/contrib/analytics_examples' + +require 'ruby-kafka' +require 'active_support' +require 'ddtrace' + +RSpec.describe 'Kafka patcher' do + let(:tracer) { get_test_tracer } + let(:configuration_options) { { tracer: tracer } } + let(:client_id) { SecureRandom.uuid } + let(:span) do + all_spans.select { |s| s.name == span_name }.first + end + + def all_spans + tracer.writer.spans(:keep) + end + + before(:each) do + Datadog.configure do |c| + c.use :kafka, configuration_options + end + end + + around do |example| + # Reset before and after each example; don't allow global state to linger. + Datadog.registry[:kafka].reset_configuration! + example.run + Datadog.registry[:kafka].reset_configuration! + end + + describe 'connection.request' do + let(:api) { 'api' } + let(:request_size) { rand(1..1000) } + let(:response_size) { rand(1..1000) } + let(:payload) do + { + client_id: client_id, + api: api, + request_size: request_size, + response_size: response_size + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONNECTION_REQUEST } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('request.connection.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.connection.request') + expect(span.resource).to eq(api) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.request_size')).to eq(request_size) + expect(span.get_tag('kafka.response_size')).to eq(response_size) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('request.connection.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.connection.request') + expect(span.resource).to eq(api) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.request_size')).to eq(request_size) + expect(span.get_tag('kafka.response_size')).to eq(response_size) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('request.connection.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('request.connection.kafka', payload) } + end + end + + describe 'consumer.process_batch' do + let(:group_id) { SecureRandom.uuid } + let(:topic) { 'my-topic' } + let(:message_count) { rand(1..10) } + let(:partition) { rand(0..100) } + let(:highwater_mark_offset) { rand(100..1000) } + let(:offset_lag) { rand(1..1000) } + let(:payload) do + { + client_id: client_id, + group_id: group_id, + topic: topic, + message_count: message_count, + partition: partition, + highwater_mark_offset: highwater_mark_offset, + offset_lag: offset_lag + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_PROCESS_BATCH } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_batch') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.highwater_mark_offset')).to eq(highwater_mark_offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_batch') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.highwater_mark_offset')).to eq(highwater_mark_offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('process_batch.consumer.kafka', payload) } + end + end + + describe 'consumer.process_message' do + let(:group_id) { SecureRandom.uuid } + let(:topic) { 'my-topic' } + let(:key) { SecureRandom.hex } + let(:partition) { rand(0..100) } + let(:offset) { rand(1..1000) } + let(:offset_lag) { rand(1..1000) } + let(:payload) do + { + client_id: client_id, + group_id: group_id, + key: key, + topic: topic, + partition: partition, + offset: offset, + offset_lag: offset_lag + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_PROCESS_MESSAGE } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_message') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_key')).to eq(key) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.offset')).to eq(offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.process_message') + expect(span.resource).to eq(topic) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic')).to eq(topic) + expect(span.get_tag('kafka.message_key')).to eq(key) + expect(span.get_tag('kafka.partition')).to eq(partition) + expect(span.get_tag('kafka.offset')).to eq(offset) + expect(span.get_tag('kafka.offset_lag')).to eq(offset_lag) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('process_message.consumer.kafka', payload) } + end + end + + describe 'consumer.heartbeat' do + let(:group_id) { SecureRandom.uuid } + let(:topic_partitions) do + { + 'foo' => [0, 2], + 'bar' => [1, 3] + } + end + let(:payload) do + { + client_id: client_id, + group_id: group_id, + topic_partitions: topic_partitions + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_HEARTBEAT } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.heartbeat') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic_partitions.foo')).to eq(topic_partitions['foo'].to_s) + expect(span.get_tag('kafka.topic_partitions.bar')).to eq(topic_partitions['bar'].to_s) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.heartbeat') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span.get_tag('kafka.topic_partitions.foo')).to eq(topic_partitions['foo'].to_s) + expect(span.get_tag('kafka.topic_partitions.bar')).to eq(topic_partitions['bar'].to_s) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('heartbeat.consumer.kafka', payload) } + end + end + + describe 'consumer.join_group' do + let(:group_id) { SecureRandom.uuid } + let(:payload) do + { + client_id: client_id, + group_id: group_id + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_JOIN_GROUP } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.join_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.join_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('join_group.consumer.kafka', payload) } + end + end + + describe 'consumer.leave_group' do + let(:group_id) { SecureRandom.uuid } + let(:payload) do + { + client_id: client_id, + group_id: group_id + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_LEAVE_GROUP } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.leave_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.leave_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('leave_group.consumer.kafka', payload) } + end + end + + describe 'consumer.sync_group' do + let(:group_id) { SecureRandom.uuid } + let(:payload) do + { + client_id: client_id, + group_id: group_id + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_CONSUMER_SYNC_GROUP } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.sync_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.consumer.sync_group') + expect(span.resource).to eq(group_id) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.group')).to eq(group_id) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('sync_group.consumer.kafka', payload) } + end + end + + describe 'producer.send_messages' do + let(:message_count) { rand(10..100) } + let(:sent_message_count) { rand(1..message_count) } + let(:payload) do + { + client_id: client_id, + message_count: message_count, + sent_message_count: sent_message_count + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_SEND_MESSAGES } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.send_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.sent_message_count')).to eq(sent_message_count) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.send_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.sent_message_count')).to eq(sent_message_count) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('send_messages.producer.kafka', payload) } + end + end + + describe 'producer.deliver_messages' do + let(:attempts) { rand(1..10) } + let(:message_count) { rand(10..100) } + let(:delivered_message_count) { rand(1..message_count) } + let(:payload) do + { + client_id: client_id, + attempts: attempts, + message_count: message_count, + delivered_message_count: delivered_message_count + } + end + let(:span_name) { Datadog::Contrib::Kafka::Ext::SPAN_DELIVER_MESSAGES } + + context 'that doesn\'t raise an error' do + it 'is expected to send a span' do + ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.deliver_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.attempts')).to eq(attempts) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.delivered_message_count')).to eq(delivered_message_count) + expect(span).to_not have_error + end + end + end + + context 'that raises an error' do + let(:error_class) { Class.new(StandardError) } + + it 'is expected to send a span' do + # Emulate failure + begin + ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) do + raise error_class + end + rescue error_class + nil + end + + span.tap do |span| + expect(span).to_not be nil + expect(span.service).to eq('kafka') + expect(span.name).to eq('kafka.producer.deliver_messages') + expect(span.resource).to eq(span.name) + expect(span.get_tag('kafka.client')).to eq(client_id) + expect(span.get_tag('kafka.attempts')).to eq(attempts) + expect(span.get_tag('kafka.message_count')).to eq(message_count) + expect(span.get_tag('kafka.delivered_message_count')).to eq(delivered_message_count) + expect(span).to have_error + end + end + end + + it_behaves_like 'analytics for integration' do + before { ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) } + let(:analytics_enabled_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_ENABLED } + let(:analytics_sample_rate_var) { Datadog::Contrib::Kafka::Ext::ENV_ANALYTICS_SAMPLE_RATE } + end + + it_behaves_like 'measured span for integration', true do + before { ActiveSupport::Notifications.instrument('deliver_messages.producer.kafka', payload) } + end + end +end diff --git a/spec/ddtrace/contrib/sidekiq/client_tracer_spec.rb b/spec/ddtrace/contrib/sidekiq/client_tracer_spec.rb new file mode 100644 index 0000000000..cfe88d7902 --- /dev/null +++ b/spec/ddtrace/contrib/sidekiq/client_tracer_spec.rb @@ -0,0 +1,70 @@ +require 'ddtrace/contrib/support/spec_helper' +require_relative 'support/helper' + +RSpec.describe 'ClientTracerTest' do + include_context 'Sidekiq testing' + + subject(:perform_async) { job_class.perform_async } + let(:job_class) { EmptyWorker } + + before do + Sidekiq.configure_client do |config| + config.client_middleware.clear + config.client_middleware do |chain| + chain.add(Datadog::Contrib::Sidekiq::ClientTracer) + end + end + + Sidekiq::Testing.server_middleware.clear + Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0' + end + + it 'traces job push' do + perform_async + + expect(span.service).to eq('sidekiq-client') + expect(span.resource).to eq('EmptyWorker') + expect(span.get_tag('sidekiq.job.queue')).to eq('default') + expect(span.status).to eq(0) + expect(span.parent).to be_nil + expect(span.get_metric('_dd.measured')).to be_nil + end + + context 'with nested trace' do + it 'traces job push' do + tracer.trace('parent.span', service: 'parent-service') do + perform_async + end + + expect(spans).to have(2).items + + parent_span, child_span = spans + + expect(parent_span.name).to eq('parent.span') + expect(parent_span.status).to eq(0) + expect(parent_span.parent).to be_nil + + expect(child_span.service).to eq('sidekiq-client') + expect(child_span.resource).to eq('EmptyWorker') + expect(child_span.get_tag('sidekiq.job.queue')).to eq('default') + expect(child_span.status).to eq(0) + expect(child_span.parent).to eq(parent_span) + expect(child_span.get_metric('_dd.measured')).to be_nil + end + end + + context 'with delayed extensions' do + subject(:do_work) { DelayableClass.delay.do_work } + + before do + stub_const('DelayableClass', Class.new do + def self.do_work; end + end) + end + + it 'traces with correct resource' do + do_work + expect(spans.first.resource).to eq('DelayableClass.do_work') + end + end +end diff --git a/spec/ddtrace/contrib/sidekiq/disabled_tracer_spec.rb b/spec/ddtrace/contrib/sidekiq/disabled_tracer_spec.rb new file mode 100644 index 0000000000..d8a972e4e3 --- /dev/null +++ b/spec/ddtrace/contrib/sidekiq/disabled_tracer_spec.rb @@ -0,0 +1,23 @@ +require 'ddtrace/contrib/support/spec_helper' +require_relative 'support/helper' + +RSpec.describe 'Disabled tracer' do + include_context 'Sidekiq testing' + + subject(:perform_async) { job_class.perform_async } + let(:job_class) { EmptyWorker } + + before do + Sidekiq::Testing.server_middleware.clear + Sidekiq::Testing.server_middleware do |chain| + Datadog.tracer.configure(enabled: false) + chain.add(Datadog::Contrib::Sidekiq::ServerTracer) + end + end + + it 'does not trace' do + perform_async + + expect(spans).to be_empty + end +end diff --git a/spec/ddtrace/contrib/sidekiq/server_tracer_spec.rb b/spec/ddtrace/contrib/sidekiq/server_tracer_spec.rb new file mode 100644 index 0000000000..cdeee073ac --- /dev/null +++ b/spec/ddtrace/contrib/sidekiq/server_tracer_spec.rb @@ -0,0 +1,119 @@ +require 'ddtrace/contrib/support/spec_helper' +require_relative 'support/helper' + +RSpec.describe 'Server tracer' do + include_context 'Sidekiq testing' + + subject(:perform_async) { job_class.perform_async } + let(:job_class) { EmptyWorker } + + before do + Sidekiq::Testing.server_middleware.clear + Sidekiq::Testing.server_middleware do |chain| + chain.add(Datadog::Contrib::Sidekiq::ServerTracer) + end + + Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0' + end + + it 'traces async job run' do + perform_async + + expect(spans).to have(2).items + + span, _push = spans + expect(span.service).to eq('sidekiq') + expect(span.resource).to eq('EmptyWorker') + expect(span.get_tag('sidekiq.job.queue')).to eq('default') + expect(span.get_tag('sidekiq.job.delay')).to_not be_nil + expect(span.status).to eq(0) + expect(span.parent).to be_nil + expect(span.get_tag('sidekiq.job.args')).to be_nil + expect(span.get_metric('_dd.measured')).to eq(1.0) + end + + context 'with job run failing' do + let(:job_class) { ErrorWorker } + + before do + stub_const('ErrorWorker', Class.new do + include Sidekiq::Worker + + def perform + raise ZeroDivisionError, 'job error' + end + end) + end + + it 'traces async job run' do + expect { perform_async }.to raise_error(ZeroDivisionError) + expect(spans).to have(2).items + + span, _push = spans + expect(span.service).to eq('sidekiq') + expect(span.resource).to eq('ErrorWorker') + expect(span.get_tag('sidekiq.job.queue')).to eq('default') + expect(span.get_tag('sidekiq.job.delay')).to_not be_nil + expect(span.status).to eq(1) + expect(span.get_tag(Datadog::Ext::Errors::MSG)).to eq('job error') + expect(span.get_tag(Datadog::Ext::Errors::TYPE)).to eq('ZeroDivisionError') + expect(span.parent).to be_nil + expect(span.get_tag('sidekiq.job.args')).to be_nil + expect(span.get_metric('_dd.measured')).to eq(1.0) + end + end + + context 'with custom job' do + before do + stub_const('CustomWorker', Class.new do + include Sidekiq::Worker + + def self.datadog_tracer_config + { service_name: 'sidekiq-slow', tag_args: true } + end + + def perform(_) end + end) + end + + it 'traces async job run' do + perform_async + CustomWorker.perform_async('random_id') + + expect(spans).to have(4).items + + custom, empty, _push, _push = spans + + expect(empty.service).to eq('sidekiq') + expect(empty.resource).to eq('EmptyWorker') + expect(empty.get_tag('sidekiq.job.queue')).to eq('default') + expect(empty.get_tag('sidekiq.job.delay')).to_not be_nil + expect(empty.status).to eq(0) + expect(empty.parent).to be_nil + expect(empty.get_metric('_dd.measured')).to eq(1.0) + + expect(custom.service).to eq('sidekiq-slow') + expect(custom.resource).to eq('CustomWorker') + expect(custom.get_tag('sidekiq.job.queue')).to eq('default') + expect(custom.status).to eq(0) + expect(custom.parent).to be_nil + expect(custom.get_tag('sidekiq.job.args')).to eq(['random_id'].to_s) + expect(custom.get_metric('_dd.measured')).to eq(1.0) + end + end + + context 'with delayed extensions' do + subject(:do_work) { DelayableClass.delay.do_work } + + before do + stub_const('DelayableClass', Class.new do + def self.do_work; end + end) + end + + it 'traces with correct resource' do + do_work + expect(spans.first.resource).to eq('DelayableClass.do_work') + end + end +end diff --git a/test/contrib/sidekiq/tracer_test_base.rb b/spec/ddtrace/contrib/sidekiq/support/helper.rb similarity index 54% rename from test/contrib/sidekiq/tracer_test_base.rb rename to spec/ddtrace/contrib/sidekiq/support/helper.rb index 73f795225a..8635102d91 100644 --- a/test/contrib/sidekiq/tracer_test_base.rb +++ b/spec/ddtrace/contrib/sidekiq/support/helper.rb @@ -1,22 +1,18 @@ - require 'sidekiq/testing' require 'ddtrace' require 'ddtrace/contrib/sidekiq/client_tracer' require 'ddtrace/contrib/sidekiq/server_tracer' -require 'helper' - -class TracerTestBase < Minitest::Test - include TestTracerHelper - REDIS_HOST = ENV.fetch('TEST_REDIS_HOST', '127.0.0.1').freeze - REDIS_PORT = ENV.fetch('TEST_REDIS_PORT', 6379) +RSpec.shared_context 'Sidekiq testing' do + let(:redis_host) { ENV.fetch('TEST_REDIS_HOST', '127.0.0.1') } + let(:redis_port) { ENV.fetch('TEST_REDIS_PORT', 6379) } - def configure + before do Datadog.configure do |c| c.use :sidekiq end - redis_url = "redis://#{REDIS_HOST}:#{REDIS_PORT}" + redis_url = "redis://#{redis_host}:#{redis_port}" Sidekiq.configure_client do |config| config.redis = { url: redis_url } @@ -29,7 +25,10 @@ def configure Sidekiq::Testing.inline! end - def writer - @tracer.writer + let!(:empty_worker) do + stub_const('EmptyWorker', Class.new do + include Sidekiq::Worker + def perform; end + end) end end diff --git a/spec/ddtrace/contrib/sidekiq/tracer_configure_spec.rb b/spec/ddtrace/contrib/sidekiq/tracer_configure_spec.rb new file mode 100644 index 0000000000..a237803764 --- /dev/null +++ b/spec/ddtrace/contrib/sidekiq/tracer_configure_spec.rb @@ -0,0 +1,29 @@ +require 'ddtrace/contrib/support/spec_helper' +require_relative 'support/helper' + +RSpec.describe 'Tracer configuration' do + include_context 'Sidekiq testing' + + subject(:perform_async) { job_class.perform_async } + let(:job_class) { EmptyWorker } + + context 'with custom middleware configuration' do + before do + Sidekiq::Testing.server_middleware do |chain| + chain.add( + Datadog::Contrib::Sidekiq::ServerTracer, + service_name: 'my-service' + ) + end + end + + it 'instruments with custom values' do + perform_async + + expect(spans).to have(2).items + + span, _push = spans + expect(span.service).to eq('my-service') + end + end +end diff --git a/spec/ddtrace/workers/async_spec.rb b/spec/ddtrace/workers/async_spec.rb index 9f5fc77fad..c7071feb25 100644 --- a/spec/ddtrace/workers/async_spec.rb +++ b/spec/ddtrace/workers/async_spec.rb @@ -257,6 +257,8 @@ describe '#running?' do subject(:running?) { worker.running? } + before { allow(worker_spy).to(receive(:perform)) { sleep 5 } } + context 'by default' do it { is_expected.to be false } end diff --git a/test/contrib/sidekiq/client_tracer_test.rb b/test/contrib/sidekiq/client_tracer_test.rb deleted file mode 100644 index 7e41345193..0000000000 --- a/test/contrib/sidekiq/client_tracer_test.rb +++ /dev/null @@ -1,68 +0,0 @@ -require 'contrib/sidekiq/tracer_test_base' - -class ClientTracerTest < TracerTestBase - class EmptyWorker - include Sidekiq::Worker - - def perform(); end - end - - class DelayableClass - def self.do_work; end - end - - def setup - super - - Sidekiq.configure_client do |config| - config.client_middleware.clear - - config.client_middleware do |chain| - chain.add(Datadog::Contrib::Sidekiq::ClientTracer, enabled: true) - end - end - - Sidekiq::Testing.server_middleware.clear - Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0' - end - - def test_empty - tracer.trace('parent.span', service: 'parent-service') do - EmptyWorker.perform_async - end - - assert_equal(2, spans.length) - - parent_span, child_span = spans - - assert_equal('parent.span', parent_span.name) - assert_equal(0, parent_span.status) - assert_nil(parent_span.parent) - - assert_equal('sidekiq-client', child_span.service) - assert_equal('ClientTracerTest::EmptyWorker', child_span.resource) - assert_equal('default', child_span.get_tag('sidekiq.job.queue')) - assert_equal(0, child_span.status) - assert_equal(parent_span, child_span.parent) - assert_nil(child_span.get_metric('_dd.measured')) - end - - def test_empty_parentless - EmptyWorker.perform_async - - assert_equal(1, spans.length) - - span = spans.first - assert_equal('sidekiq-client', span.service) - assert_equal('ClientTracerTest::EmptyWorker', span.resource) - assert_equal('default', span.get_tag('sidekiq.job.queue')) - assert_equal(0, span.status) - assert_nil(span.parent) - assert_nil(span.get_metric('_dd.measured')) - end - - def test_delayed_extensions - DelayableClass.delay.do_work - assert_equal('ClientTracerTest::DelayableClass.do_work', spans.first.resource) - end -end diff --git a/test/contrib/sidekiq/disabled_tracer_test.rb b/test/contrib/sidekiq/disabled_tracer_test.rb deleted file mode 100644 index 30c5226938..0000000000 --- a/test/contrib/sidekiq/disabled_tracer_test.rb +++ /dev/null @@ -1,25 +0,0 @@ - -require 'contrib/sidekiq/tracer_test_base' - -class DisabledTracerTest < TracerTestBase - class EmptyWorker - include Sidekiq::Worker - - def perform; end - end - - def setup - super - - Sidekiq::Testing.server_middleware do |chain| - Datadog.tracer.configure(enabled: false) - chain.add(Datadog::Contrib::Sidekiq::ServerTracer) - end - end - - def test_empty - EmptyWorker.perform_async() - - assert_equal(0, spans.length) - end -end diff --git a/test/contrib/sidekiq/server_tracer_test.rb b/test/contrib/sidekiq/server_tracer_test.rb deleted file mode 100644 index 5590ebedcf..0000000000 --- a/test/contrib/sidekiq/server_tracer_test.rb +++ /dev/null @@ -1,110 +0,0 @@ -require 'contrib/sidekiq/tracer_test_base' - -class ServerTracerTest < TracerTestBase - class TestError < StandardError; end - - class EmptyWorker - include Sidekiq::Worker - - def perform(); end - end - - class ErrorWorker - include Sidekiq::Worker - - def perform - raise TestError, 'job error' - end - end - - class CustomWorker - include Sidekiq::Worker - - def self.datadog_tracer_config - { service_name: 'sidekiq-slow', tag_args: true } - end - - def perform(args); end - end - - class DelayableClass - def self.do_work; end - end - - def setup - super - - Sidekiq::Testing.server_middleware do |chain| - chain.add(Datadog::Contrib::Sidekiq::ServerTracer, enabled: true) - end - Sidekiq::Extensions.enable_delay! if Sidekiq::VERSION > '5.0.0' - end - - def test_empty - EmptyWorker.perform_async() - - assert_equal(2, spans.length) - - span, _push = spans - assert_equal('sidekiq', span.service) - assert_equal('ServerTracerTest::EmptyWorker', span.resource) - assert_equal('default', span.get_tag('sidekiq.job.queue')) - refute_nil(span.get_tag('sidekiq.job.delay')) - assert_equal(0, span.status) - assert_nil(span.parent) - assert_nil(span.get_tag('sidekiq.job.args')) - assert_equal(span.get_metric('_dd.measured'), 1.0) - end - - # rubocop:disable Lint/HandleExceptions - def test_error - begin - ErrorWorker.perform_async() - rescue TestError - end - - assert_equal(2, spans.length) - - span, _push = spans - assert_equal('sidekiq', span.service) - assert_equal('ServerTracerTest::ErrorWorker', span.resource) - assert_equal('default', span.get_tag('sidekiq.job.queue')) - refute_nil(span.get_tag('sidekiq.job.delay')) - assert_equal(1, span.status) - assert_equal('job error', span.get_tag(Datadog::Ext::Errors::MSG)) - assert_equal('ServerTracerTest::TestError', span.get_tag(Datadog::Ext::Errors::TYPE)) - assert_nil(span.parent) - assert_nil(span.get_tag('sidekiq.job.args')) - assert_equal(span.get_metric('_dd.measured'), 1.0) - end - - def test_custom - EmptyWorker.perform_async() - CustomWorker.perform_async('random_id') - - assert_equal(4, spans.length) - - custom, empty, _push, _push = spans - - assert_equal('sidekiq', empty.service) - assert_equal('ServerTracerTest::EmptyWorker', empty.resource) - assert_equal('default', empty.get_tag('sidekiq.job.queue')) - refute_nil(empty.get_tag('sidekiq.job.delay')) - assert_equal(0, empty.status) - assert_nil(empty.parent) - assert_equal(empty.get_metric('_dd.measured'), 1.0) - - assert_equal('sidekiq-slow', custom.service) - assert_equal('ServerTracerTest::CustomWorker', custom.resource) - assert_equal('default', custom.get_tag('sidekiq.job.queue')) - assert_equal(0, custom.status) - assert_nil(custom.parent) - assert_equal(['random_id'].to_s, custom.get_tag('sidekiq.job.args')) - assert_equal(custom.get_metric('_dd.measured'), 1.0) - end - - def test_delayed_extensions - DelayableClass.delay.do_work - assert_equal('ServerTracerTest::DelayableClass.do_work', spans.first.resource) - end -end diff --git a/test/contrib/sidekiq/tracer_configure_test.rb b/test/contrib/sidekiq/tracer_configure_test.rb deleted file mode 100644 index ca3b404d26..0000000000 --- a/test/contrib/sidekiq/tracer_configure_test.rb +++ /dev/null @@ -1,29 +0,0 @@ -require 'contrib/sidekiq/tracer_test_base' - -class TracerTest < TracerTestBase - class EmptyWorker - include Sidekiq::Worker - - def perform(); end - end - - def test_configuration_defaults - # it should configure the tracer with reasonable defaults - Sidekiq::Testing.server_middleware do |chain| - chain.add(Datadog::Contrib::Sidekiq::ServerTracer) - end - EmptyWorker.perform_async() - end - - def test_configuration_custom - # it should configure the tracer with users' settings - Datadog.tracer.configure(enabled: false) - Sidekiq::Testing.server_middleware do |chain| - chain.add( - Datadog::Contrib::Sidekiq::ServerTracer, - service_name: 'my-sidekiq' - ) - end - EmptyWorker.perform_async() - end -end