Skip to content

Commit

Permalink
Support sending logs to Datadog v2 endpoints.
Browse files Browse the repository at this point in the history
This PR is also adding a public variable containing current Gem version.
  • Loading branch information
remeh committed Oct 18, 2021
1 parent b6f8bf7 commit 3a9f935
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 9 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ foo/
*.iml
.idea/

fluent/
Gemfile.lock
6 changes: 4 additions & 2 deletions fluent-plugin-datadog.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

require "fluent/plugin/version.rb"

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-datadog"
spec.version = "0.13.0"
spec.version = Datadog::FluentPlugin::GEM_VERSION
spec.authors = ["Datadog Solutions Team"]
spec.email = ["support@datadoghq.com"]
spec.summary = "Datadog output plugin for Fluent event collector"
spec.homepage = "http://datadoghq.com"
spec.license = "Apache-2.0"

spec.files = [".gitignore", "Gemfile", "LICENSE", "README.md", "Rakefile", "fluent-plugin-datadog.gemspec", "lib/fluent/plugin/out_datadog.rb"]
spec.files = [".gitignore", "Gemfile", "LICENSE", "README.md", "Rakefile", "fluent-plugin-datadog.gemspec", "lib/fluent/plugin/version.rb", "lib/fluent/plugin/out_datadog.rb"]
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]
Expand Down
24 changes: 18 additions & 6 deletions lib/fluent/plugin/out_datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require "zlib"
require "fluent/plugin/output"

require_relative "version"

class Fluent::DatadogOutput < Fluent::Plugin::Output
class RetryableError < StandardError;
end
Expand Down Expand Up @@ -50,6 +52,7 @@ class RetryableError < StandardError;
config_param :compression_level, :integer, :default => 6
config_param :no_ssl_validation, :bool, :default => false
config_param :http_proxy, :string, :default => nil
config_param :force_v1_routes, :bool, :default => false

# Format settings
config_param :use_json, :bool, :default => true
Expand Down Expand Up @@ -89,7 +92,7 @@ def formatted_to_msgpack_binary?

def start
super
@client = new_client(log, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @ssl_port, @port, @http_proxy, @use_compression)
@client = new_client(log, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @ssl_port, @port, @http_proxy, @use_compression, @force_v1_routes)
end

def shutdown
Expand Down Expand Up @@ -261,9 +264,9 @@ def gzip_compress(payload, compression_level)
end

# Build a new transport client
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression)
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, force_v1_routes)
if use_http
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key, force_v1_routes
else
DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, ssl_port, port
end
Expand Down Expand Up @@ -301,20 +304,29 @@ class DatadogHTTPClient < DatadogClient
require 'net/http'
require 'net/http/persistent'

def initialize(logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key)
def initialize(logger, use_ssl, no_ssl_validation, host, ssl_port, port, http_proxy, use_compression, api_key, force_v1_routes = false)
@logger = logger
protocol = use_ssl ? "https" : "http"
port = use_ssl ? ssl_port : port
@uri = URI("#{protocol}://#{host}:#{port.to_s}/v1/input/#{api_key}")
if force_v1_routes
@uri = URI("#{protocol}://#{host}:#{port.to_s}/v1/input/#{api_key}")
else
@uri = URI("#{protocol}://#{host}:#{port.to_s}/api/v2/logs")
end
proxy_uri = :ENV
if http_proxy
proxy_uri = URI.parse(http_proxy)
elsif ENV['HTTP_PROXY'] || ENV['http_proxy']
logger.info("Using HTTP proxy defined in `HTTP_PROXY`/`http_proxy` env vars")
end
logger.info("Starting HTTP connection to #{protocol}://#{host}:#{port.to_s} with compression " + (use_compression ? "enabled" : "disabled"))
logger.info("Starting HTTP connection to #{protocol}://#{host}:#{port.to_s} with compression " + (use_compression ? "enabled" : "disabled") + (force_v1_routes ? " using v1 routes" : " using v2 routes"))
@client = Net::HTTP::Persistent.new name: "fluent-plugin-datadog-logcollector", proxy: proxy_uri
@client.verify_mode = OpenSSL::SSL::VERIFY_NONE if no_ssl_validation
unless force_v1_routes
@client.override_headers["DD-API-KEY"] = api_key
@client.override_headers["DD-EVP-ORIGIN"] = "fluent"
@client.override_headers["DD-EVP-ORIGIN-VERSION"] = Datadog::FluentPlugin::GEM_VERSION
end
@client.override_headers["Content-Type"] = "application/json"
if use_compression
@client.override_headers["Content-Encoding"] = "gzip"
Expand Down
7 changes: 7 additions & 0 deletions lib/fluent/plugin/version.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

module Datadog
class FluentPlugin
GEM_VERSION = '0.14.0'
end
end

0 comments on commit 3a9f935

Please sign in to comment.