Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GitHub-issue#253 : Implemented GeoIP processor functionality #2925

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
eff06fc
GitHub-issue#253 : Implemented GeoIP processor functionality
venkataraopasyavula Jun 22, 2023
b8b3717
GitHub-issue#253 : Implemented GeoIP processor functionality
venkataraopasyavula Jun 23, 2023
a7806fe
GitHub-issue#253 : Implemented GeoIP processor functionality
venkataraopasyavula Jun 23, 2023
a316033
Merge branch 'opensearch-project:main' into geoip-processor-functiona…
venkataraopasyavula Jun 26, 2023
3ff0264
GitHub-issue#253 : Implemented GeoIP processor functionality
venkataraopasyavula Jun 26, 2023
1cfe596
GitHub-issue#253 : Implemented GeoIP processor functionality
venkataraopasyavula Jun 26, 2023
48d4049
Merge branch 'geoip-processor-functionality' of https://github.com/ve…
venkataraopasyavula Jun 27, 2023
5c40221
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jun 27, 2023
aa76f42
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jun 27, 2023
abc107b
Merge branch 'opensearch-project:main' into geoip-processor-functiona…
deepaksahu562 Jun 28, 2023
88ad406
GitHub-issue#253 : Fixed the test-case-failed issue.
deepaksahu562 Jun 28, 2023
bca4e3c
Merge branch 'opensearch-project:main' into geoip-processor-functiona…
venkataraopasyavula Jul 3, 2023
25d55be
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jul 3, 2023
e5da094
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jul 3, 2023
456560b
Merge branch 'opensearch-project:main' into geoip-processor-functiona…
venkataraopasyavula Jul 10, 2023
2599461
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jul 10, 2023
a11b8af
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jul 10, 2023
9510e71
GitHub-issue#253 : Implemented GeoIP processor functionality. Address…
venkataraopasyavula Jul 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions data-prepper-plugins/geoip-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# GeoIP Processor

This is the Data Prepper GeoIP processor plugin which can enrich Data Prepper events with location information using a provided IP address.
Additionally, this plugin should be able to use either a MaxMind GeoIP Lite2 database or the GeoIP2 Commercial Licensing database.
The Data Prepper author must provide information for configuring the commercial license.


## Usages

The GeoIP processor should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
processor:
- geoip:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
keys:
- key:
source: "/peer/ip"
target: "target1"
- key:
source: "/peer/ip2"
target: "target2"
attributes: ["city_name","country_name"]
service_type:
maxmind:
database_path:
- url:
load_type: "in_memory"
cache_size: 4096
cache_refresh_schedule: P30D
```

oeyh marked this conversation as resolved.
Show resolved Hide resolved
## AWS Configuration

- `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).

- `sts_role_arn` (Optional) : The AWS STS role to assume for requests to S3. which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).

## Properties Configuration

- `keys` (Required) : List of properties like source, target and attributes can be specified where the location fields are written

- `source` (Required) : source IP for which enrichment will be done. Public IP can be either IPV4 or IPV6.

- `target` (Optional) : Property used to specify the key for the enriched fields.

- `attributes` (Optional) : Used to specify the properties which are included in the enrichment of data. By default all attributes are considered.

## Service type Configuration

- `database_path` (Required) : Used to provide either S3 path, maxmind URL or local file path where the .mmdb file is available.

- `url` (Required) : Provide URL for all three S3, maxmind URL or local file path.

- `load_type` (Required) : Load type used for better performance while enrich the data. There are two type load_type are present i.e "memory_map" or "cache".

- `cache_size` (Optional) : Used to mention the cache size. Default cache size is 2MB. Cache size applicable when load_type is cache.

- `cache_refresh_schedule` (Required) : Switch the DatabaseReader when ever Refresh schedule threshold is met.

- `tags_on_source_not_found` (Optional): A `List` of `String`s that specifies the tags to be set in the event the processor fails to parse or an unknown exception occurs while parsing. This tag may be used in conditional expressions in other parts of the configuration

## Sample JSON input:

"peer" : {
"ip" : "1.2.3.4"
"host" : "example.org"
}
"status" : "success"

## Sample JSON Output:

"peer" : {
"ip" : "1.2.3.4"
"host" : "example.org"
}
"location" : {
"status" : "success"
"country" : "United States"
"city_name" : "Seattle"
"latitude" : "47.64097"
"longitude" : "122.25894"
"zip_code" : "98115"
}




## Developer Guide

This plugin is compatible with Java 11. See below

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:geoip-processor:integrationTest -Dtests.geoipprocessor.region=<your-aws-region> -Dtests.geoipprocessor.bucket=<your-bucket>
```
4 changes: 3 additions & 1 deletion data-prepper-plugins/geoip-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ dependencies {
implementation 'com.maxmind.db:maxmind-db:3.0.0'

implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-test-common')
}

Expand Down Expand Up @@ -68,4 +70,4 @@ task integrationTest(type: Test) {
filter {
includeTestsMatching '*IT'
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,27 @@

package org.opensearch.dataprepper.plugins.processor;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import java.net.MalformedURLException;
import org.opensearch.dataprepper.plugins.processor.configuration.KeysConfig;
import org.opensearch.dataprepper.plugins.processor.databaseenrich.EnrichFailedException;
import org.opensearch.dataprepper.plugins.processor.utils.IPValidationcheck;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* Implementation class of geoIP-processor plugin. It is responsible for enrichment of
Expand All @@ -23,19 +34,32 @@
@DataPrepperPlugin(name = "geoip", pluginType = Processor.class, pluginConfigurationType = GeoIPProcessorConfig.class)
public class GeoIPProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {

private static final Logger LOG = LoggerFactory.getLogger(GeoIPProcessor.class);
private static final String GEO_IP_PROCESSING_MATCH = "geoIpProcessingMatch";
private static final String GEO_IP_PROCESSING_MISMATCH = "geoIpProcessingMismatch";
private final Counter geoIpProcessingMatchCounter;
private final Counter geoIpProcessingMismatchCounter;
private final GeoIPProcessorConfig geoIPProcessorConfig;
private final String tempPath;
private final List<String> tagsOnSourceNotFoundFailure;
private GeoIPProcessorService geoIPProcessorService;
private static final String TEMP_PATH_FOLDER = "GeoIP";

/**
* GeoIPProcessor constructor for initialization of required attributes
* @param pluginSetting pluginSetting
* @param geoCodingProcessorConfig geoCodingProcessorConfig
* @param pluginFactory pluginFactory
* @throws MalformedURLException MalformedURLException
*/
@DataPrepperPluginConstructor
public GeoIPProcessor(PluginSetting pluginSetting,
final GeoIPProcessorConfig geoCodingProcessorConfig,
final PluginFactory pluginFactory) throws MalformedURLException {
final GeoIPProcessorConfig geoCodingProcessorConfig) {
super(pluginSetting);
//TODO
this.geoIPProcessorConfig = geoCodingProcessorConfig;
this.tempPath = System.getProperty("java.io.tmpdir")+ File.separator + TEMP_PATH_FOLDER;
geoIPProcessorService = new GeoIPProcessorService(geoCodingProcessorConfig,tempPath);
tagsOnSourceNotFoundFailure = geoCodingProcessorConfig.getTagsOnSourceNotFoundFailure();
this.geoIpProcessingMatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MATCH);
this.geoIpProcessingMismatchCounter = pluginMetrics.counter(GEO_IP_PROCESSING_MISMATCH);
}

/**
Expand All @@ -46,23 +70,49 @@ public GeoIPProcessor(PluginSetting pluginSetting,
@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

//TODO : logic call the enrichment of data class methods
return null;
Map<String, Object> geoData;

for (final Record<Event> eventRecord : records) {
Event event = eventRecord.getData();
for (KeysConfig key : geoIPProcessorConfig.getKeysConfig()) {
String source = key.getKeyConfig().getSource();
List<String> attributes = key.getKeyConfig().getAttributes();
String ipAddress = event.get(source, String.class);

//Lookup from DB
if (ipAddress != null && (!(ipAddress.isEmpty()))) {
try {
if (IPValidationcheck.isPublicIpAddress(ipAddress)) {
geoData = geoIPProcessorService.getGeoData(InetAddress.getByName(ipAddress), attributes);
eventRecord.getData().put(key.getKeyConfig().getTarget(), geoData);
geoIpProcessingMatchCounter.increment();
}
} catch (IOException | EnrichFailedException ex) {
geoIpProcessingMismatchCounter.increment();
event.getMetadata().addTags(tagsOnSourceNotFoundFailure);
LOG.error(DataPrepperMarkers.EVENT, "Failed to get Geo data for event: [{}] for the IP address [{}]", event, ipAddress, ex);
}
} else {
//No Enrichment.
event.getMetadata().addTags(tagsOnSourceNotFoundFailure);
}
}
}
return records;
}

@Override
public void prepareForShutdown() {
//TODO
LOG.info("GeoIP plugin prepare For Shutdown");
}

@Override
public boolean isReadyForShutdown() {
//TODO
return false;
}

@Override
public void shutdown() {
//TODO
LOG.info("GeoIP plugin Shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class GeoIPProcessorConfig {
@NotNull
private List<KeysConfig> keysConfig;

@JsonProperty("tags_on_source_not_found")
private List<String> tagsOnSourceNotFoundFailure;

@JsonProperty("service_type")
@NotNull
private ServiceTypeOptions serviceType;
Expand All @@ -48,6 +51,14 @@ public List<KeysConfig> getKeysConfig() {
return keysConfig;
}

/**
* Get the List of failure tags
* @return List of failure tags
*/
public List<String> getTagsOnSourceNotFoundFailure() {
return tagsOnSourceNotFoundFailure;
}

/**
* Service type Options
* @return ServiceTypeOptions
Expand Down
Loading
Loading