Skip to content

Commit

Permalink
Merge pull request logstash-plugins#64 from garyelephant/master
Browse files Browse the repository at this point in the history
Upgrade to Geoip 2.5.0
  • Loading branch information
talevy committed Apr 18, 2016
2 parents 8003ea5 + 1f31edc commit c9b6535
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 236 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
Gemfile.lock
.bundle
vendor
.idea
lib/com
10 changes: 8 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
sudo: false
language: ruby
cache: bundler
cache:
- bundler
- directories:
- $HOME/.m2
rvm:
- jruby-1.7.23
before_script: bundle exec rake vendor
before_install:
- bundle install
- bundle exec rake vendor
- bundle exec rake install_jars
script:
- bundle exec rspec spec
2 changes: 2 additions & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Contributors:
* Vincent Batts (vbatts)
* avleen
* Guy Boertje (guyboertje)
* Thomas Decaux (qwant)
* Gary Gao (garyelephant)

Note: If you've sent us patches, bug reports, or otherwise contributed to
Logstash, and you aren't on the list above and want to be, please let us know
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ bundle install
bundle exec rake vendor
```

- Install jar dependencies

```
bundle exec rake install_jars
```

- Run tests

```sh
Expand Down
7 changes: 7 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,11 @@ task :default do
system("rake -T")
end

require 'jars/installer'
task :install_jars do
ENV['JARS_HOME'] = Dir.pwd + "/vendor/jar-dependencies/runtime-jars"
ENV['JARS_VENDOR'] = "false"
Jars::Installer.vendor_jars!
end

require "logstash/devutils/rake"
262 changes: 130 additions & 132 deletions lib/logstash/filters/geoip.rb
Original file line number Diff line number Diff line change
@@ -1,44 +1,60 @@
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "tempfile"
require "lru_redux"

# The GeoIP filter adds information about the geographical location of IP addresses,
require "logstash-filter-geoip_jars"

java_import "java.net.InetAddress"
java_import "com.maxmind.geoip2.DatabaseReader"
java_import "com.maxmind.geoip2.model.CityResponse"
java_import "com.maxmind.geoip2.record.Country"
java_import "com.maxmind.geoip2.record.Subdivision"
java_import "com.maxmind.geoip2.record.City"
java_import "com.maxmind.geoip2.record.Postal"
java_import "com.maxmind.geoip2.record.Location"
java_import "com.maxmind.db.CHMCache"

def suppress_all_warnings
old_verbose = $VERBOSE
begin
$VERBOSE = nil
yield if block_given?
ensure
# always re-set to old value, even if block raises an exception
$VERBOSE = old_verbose
end
end

# create a new instance of the Java class File without shadowing the Ruby version of the File class
module JavaIO
include_package "java.io"
end


# The GeoIP2 filter adds information about the geographical location of IP addresses,
# based on data from the Maxmind database.
#
# Starting with version 1.3.0 of Logstash, a `[geoip][location]` field is created if
# the GeoIP lookup returns a latitude and longitude. The field is stored in
# http://geojson.org/geojson-spec.html[GeoJSON] format. Additionally,
# the default Elasticsearch template provided with the
# <<plugins-outputs-elasticsearch,`elasticsearch` output>> maps
# the `[geoip][location]` field to an https://www.elastic.co/guide/en/elasticsearch/reference/1.7/mapping-geo-point-type.html#_mapping_options[Elasticsearch geo_point].
# the `[geoip][location]` field to an http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/mapping-geo-point-type.html#_mapping_options[Elasticsearch geo_point].
#
# As this field is a `geo_point` _and_ it is still valid GeoJSON, you get
# the awesomeness of Elasticsearch's geospatial query, facet and filter functions
# and the flexibility of having GeoJSON for all other applications (like Kibana's
# map visualization).
#
# Logstash releases ship with the GeoLiteCity database made available from
# Maxmind with a CCA-ShareAlike 3.0 license. For more details on GeoLite, see
# <http://www.maxmind.com/en/geolite>.
# This product includes GeoLite2 data created by MaxMind, available from
# <http://dev.maxmind.com/geoip/geoip2/geolite2/>.
class LogStash::Filters::GeoIP < LogStash::Filters::Base
LOOKUP_CACHE_INIT_MUTEX = Mutex.new
# Map of lookup caches, keyed by geoip_type
LOOKUP_CACHES = {}

attr_accessor :lookup_cache
attr_reader :threadkey

config_name "geoip"

# The path to the GeoIP database file which Logstash should use. Country, City, ASN, ISP
# and organization databases are supported.
# The path to the GeoIP2 database file which Logstash should use. Only City database is supported by now.
#
# If not specified, this will default to the GeoLiteCity database that ships
# with Logstash.
# Up-to-date databases can be downloaded from here: <https://dev.maxmind.com/geoip/legacy/geolite/>
# Please be sure to download a legacy format database.
config :database, :validate => :path

# The field containing the IP address or hostname to map via geoip. If
Expand All @@ -53,7 +69,11 @@ class LogStash::Filters::GeoIP < LogStash::Filters::Base
# For the built-in GeoLiteCity database, the following are available:
# `city_name`, `continent_code`, `country_code2`, `country_code3`, `country_name`,
# `dma_code`, `ip`, `latitude`, `longitude`, `postal_code`, `region_name` and `timezone`.
config :fields, :validate => :array
config :fields, :validate => :array, :default => ['city_name', 'continent_code',
'country_code2', 'country_code3', 'country_name',
'dma_code', 'ip', 'latitude',
'longitude', 'postal_code', 'region_name',
'region_code', 'timezone', 'location']

# Specify the field into which Logstash should store the geoip data.
# This can be useful, for example, if you have `src\_ip` and `dst\_ip` fields and
Expand All @@ -68,6 +88,22 @@ class LogStash::Filters::GeoIP < LogStash::Filters::Base
# is still valid GeoJSON.
config :target, :validate => :string, :default => 'geoip'

# GeoIP lookup is surprisingly expensive. This filter uses an cache to take advantage of the fact that
# IPs agents are often found adjacent to one another in log files and rarely have a random distribution.
# The higher you set this the more likely an item is to be in the cache and the faster this filter will run.
# However, if you set this too high you can use more memory than desired.
# Since the Geoip API upgraded to v2, there is not any eviction policy so far, if cache is full, no more record can be added.
# Experiment with different values for this option to find the best performance for your dataset.
#
# This MUST be set to a value > 0. There is really no reason to not want this behavior, the overhead is minimal
# and the speed gains are large.
#
# It is important to note that this config value is global to the geoip_type. That is to say all instances of the geoip filter
# of the same geoip_type share the same cache. The last declared cache size will 'win'. The reason for this is that there would be no benefit
# to having multiple caches for different instances at different points in the pipeline, that would just increase the
# number of cache misses and waste memory.
config :cache_size, :validate => :number, :default => 1000

# GeoIP lookup is surprisingly expensive. This filter uses an LRU cache to take advantage of the fact that
# IPs agents are often found adjacent to one another in log files and rarely have a random distribution.
# The higher you set this the more likely an item is to be in the cache and the faster this filter will run.
Expand All @@ -86,135 +122,97 @@ class LogStash::Filters::GeoIP < LogStash::Filters::Base

public
def register
require "geoip"
suppress_all_warnings do
if @database.nil?
@database = ::Dir.glob(::File.join(::File.expand_path("../../../vendor/", ::File.dirname(__FILE__)),"GeoLite2-City.mmdb")).first

if @database.nil?
@database = ::Dir.glob(::File.join(::File.expand_path("../../../vendor/", ::File.dirname(__FILE__)),"GeoLiteCity*.dat")).first
if !File.exists?(@database)
raise "You must specify 'database => ...' in your geoip filter (I looked for '#{@database}'"
if @database.nil? || !File.exists?(@database)
raise "You must specify 'database => ...' in your geoip filter (I looked for '#{@database}')"
end
end
end
@logger.info("Using geoip database", :path => @database)
# For the purpose of initializing this filter, geoip is initialized here but
# not set as a global. The geoip module imposes a mutex, so the filter needs
# to re-initialize this later in the filter() thread, and save that access
# as a thread-local variable.
geoip_initialize = ::GeoIP.new(@database)

@geoip_type = case geoip_initialize.database_type
when GeoIP::GEOIP_CITY_EDITION_REV0, GeoIP::GEOIP_CITY_EDITION_REV1
:city
when GeoIP::GEOIP_COUNTRY_EDITION
:country
when GeoIP::GEOIP_ASNUM_EDITION
:asn
when GeoIP::GEOIP_ISP_EDITION, GeoIP::GEOIP_ORG_EDITION
:isp
else
raise RuntimeException.new "This GeoIP database is not currently supported"
end

@threadkey = "geoip-#{self.object_id}"
@logger.info("Using geoip database", :path => @database)

# This is wrapped in a mutex to make sure the initialization behavior of LOOKUP_CACHES (see def above) doesn't create a dupe
LOOKUP_CACHE_INIT_MUTEX.synchronize do
self.lookup_cache = LOOKUP_CACHES[@geoip_type] ||= LruRedux::ThreadSafeCache.new(1000)
db_file = JavaIO::File.new(@database)
begin
@parser = DatabaseReader::Builder.new(db_file).withCache(CHMCache.new(@cache_size)).build();
rescue Java::ComMaxmindDb::InvalidDatabaseException => e
@logger.error("The Geoip2 MMDB database provided is invalid or corrupted.", :exception => e, :field => @source)
raise e
end
end

@no_fields = @fields.nil? || @fields.empty?
end # def register

public
def filter(event)
geo_data_hash = get_geo_data(event)
if apply_geodata(geo_data_hash, event)
filter_matched(event)
end
end # def filter
return unless filter?(event)

def apply_geodata(geo_data_hash, event)
# don't do anything more if the lookup result is nil?
return false if geo_data_hash.nil?
# only set the event[@target] if the lookup result is not nil: BWC
event[@target] = {} if event[@target].nil?
# don't do anything more if the lookup result is empty?
return false if geo_data_hash.empty?
geo_data_hash.each do |key, value|
if @no_fields || @fields.include?(key)
# can't dup numerics
event["[#{@target}][#{key}]"] = value.is_a?(Numeric) ? value : value.dup
end
end # geo_data_hash.each
true
end

def get_geo_data(event)
# pure function, must control return value
result = {}
ip = event[@source]
ip = ip.first if ip.is_a? Array
return nil if ip.nil?
begin
result = get_geo_data_for_ip(ip)
rescue SocketError => e
@logger.error("IP Field contained invalid IP address or hostname", :field => @source, :event => event)
rescue StandardError => e
@logger.error("Unknown error while looking up GeoIP data", :exception => e, :field => @source, :event => event)
end
result
end

def get_geo_data_for_ip(ip)
ensure_database!
if (cached = lookup_cache[ip])
cached
else
geo_data = Thread.current[threadkey].send(@geoip_type, ip)
converted = prepare_geodata_for_cache(geo_data)
lookup_cache[ip] = converted
converted
end
end

def prepare_geodata_for_cache(geo_data)
# GeoIP returns a nil or a Struct subclass
return nil if !geo_data.respond_to?(:each_pair)
#lets just do this once before caching
result = {}
geo_data.each_pair do |k, v|
next if v.nil? || k == :request
if v.is_a?(String)
next if v.empty?
# Some strings from GeoIP don't have the correct encoding...
result[k.to_s] = case v.encoding
# I have found strings coming from GeoIP that are ASCII-8BIT are actually
# ISO-8859-1...
when Encoding::ASCII_8BIT
v.force_encoding(Encoding::ISO_8859_1).encode(Encoding::UTF_8)
when Encoding::ISO_8859_1, Encoding::US_ASCII
v.encode(Encoding::UTF_8)
ip = event[@source]
ip = ip.first if ip.is_a? Array
ip_address = InetAddress.getByName(ip)
response = @parser.city(ip_address)
country = response.getCountry()
subdivision = response.getMostSpecificSubdivision()
city = response.getCity()
postal = response.getPostal()
location = response.getLocation()

geo_data_hash = Hash.new()

@fields.each do |field|
case field
when "city_name"
geo_data_hash["city_name"] = city.getName()
when "country_name"
geo_data_hash["country_name"] = country.getName()
when "continent_code"
geo_data_hash["continent_code"] = response.getContinent().getCode()
when "continent_name"
geo_data_hash["continent_name"] = response.getContinent().getName()
when "country_code2"
geo_data_hash["country_code2"] = country.getIsoCode()
when "country_code3"
geo_data_hash["country_code3"] = country.getIsoCode()
when "ip"
geo_data_hash["ip"] = ip_address.getHostAddress()
when "postal_code"
geo_data_hash["postal_code"] = postal.getCode()
when "dma_code"
geo_data_hash["dma_code"] = location.getMetroCode()
when "region_name"
geo_data_hash["region_name"] = subdivision.getName()
when "region_code"
geo_data_hash["region_code"] = subdivision.getIsoCode()
when "timezone"
geo_data_hash["timezone"] = location.getTimeZone()
when "location"
geo_data_hash["location"] = [ location.getLongitude(), location.getLatitude() ]
when "latitude"
geo_data_hash["latitude"] = location.getLatitude()
when "longitude"
geo_data_hash["longitude"] = location.getLongitude()
else
v
raise Exception.new("[#{field}] is not a supported field option.")
end
else
result[k.to_s] = v
end
end

lat, lng = result.values_at("latitude", "longitude")
if lat && lng
result["location"] = [ lng.to_f, lat.to_f ]
rescue com.maxmind.geoip2.exception.AddressNotFoundException => e
@logger.debug("IP not found!", :exception => e, :field => @source, :event => event)
event[@target] = {}
return
rescue java.net.UnknownHostException => e
@logger.error("IP Field contained invalid IP address or hostname", :exception => e, :field => @source, :event => event)
event[@target] = {}
return
rescue Exception => e
@logger.error("Unknown error while looking up GeoIP data", :exception => e, :field => @source, :event => event)
event[@target] = {}
return
end

result
end
event[@target] = geo_data_hash

def ensure_database!
# Use thread-local access to GeoIP. The Ruby GeoIP module forces a mutex
# around access to the database, which can be overcome with :pread.
# Unfortunately, :pread requires the io-extra gem, with C extensions that
# aren't supported on JRuby. If / when :pread becomes available, we can stop
# needing thread-local access.
Thread.current[threadkey] ||= ::GeoIP.new(@database)
end
filter_matched(event)
end # def filter
end # class LogStash::Filters::GeoIP
Loading

0 comments on commit c9b6535

Please sign in to comment.