Skip to content

Commit 16c8806

Browse files
authored
Add fingerprint ingest processor (opensearch-project#13724)
* Add fingerprint ingest processor Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Ignore metadata fields Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Add sha3-256 hash method Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Remove unused code Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Add exclude_fields and remove include_all_fields Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Modify processor description Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Make FingerprintProcessor being final Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Optimize error message and check if field name is empty string Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Fix yaml test failure Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Prepend string length to the field value Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Append hash method with version number Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Update supported version in yml test file Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Add more comment Signed-off-by: Gao Binlong <gbinlong@amazon.com> * Prepend hash method to the hash value and add more test cases Signed-off-by: Gao Binlong <gbinlong@amazon.com> --------- Signed-off-by: Gao Binlong <gbinlong@amazon.com>
1 parent 56c5dcc commit 16c8806

File tree

9 files changed

+1416
-0
lines changed

9 files changed

+1416
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 2.x]
77
### Added
8+
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
89

910
### Dependencies
1011
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.common.Nullable;
12+
import org.opensearch.common.hash.MessageDigests;
13+
import org.opensearch.core.common.Strings;
14+
import org.opensearch.ingest.AbstractProcessor;
15+
import org.opensearch.ingest.ConfigurationUtils;
16+
import org.opensearch.ingest.IngestDocument;
17+
import org.opensearch.ingest.Processor;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.security.MessageDigest;
21+
import java.util.Base64;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Locale;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.stream.Collectors;
29+
30+
import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
31+
32+
/**
33+
* Processor that generating hash value for the specified fields or fields not in the specified excluded list
34+
*/
35+
public final class FingerprintProcessor extends AbstractProcessor {
36+
public static final String TYPE = "fingerprint";
37+
// this processor is introduced in 2.16.0, we append the OpenSearch version to the hash method name to ensure
38+
// that this processor always generates same hash value based on a specific hash method, if the processing logic
39+
// of this processor changes in future version, the version number in the hash method should be increased correspondingly.
40+
private static final Set<String> HASH_METHODS = Set.of("MD5@2.16.0", "SHA-1@2.16.0", "SHA-256@2.16.0", "SHA3-256@2.16.0");
41+
42+
// fields used to generate hash value
43+
private final List<String> fields;
44+
// all fields other than the excluded fields are used to generate hash value
45+
private final List<String> excludeFields;
46+
// the target field to store the hash value, defaults to fingerprint
47+
private final String targetField;
48+
// hash method used to generate the hash value, defaults to SHA-1
49+
private final String hashMethod;
50+
private final boolean ignoreMissing;
51+
52+
FingerprintProcessor(
53+
String tag,
54+
String description,
55+
@Nullable List<String> fields,
56+
@Nullable List<String> excludeFields,
57+
String targetField,
58+
String hashMethod,
59+
boolean ignoreMissing
60+
) {
61+
super(tag, description);
62+
if (fields != null && !fields.isEmpty()) {
63+
if (fields.stream().anyMatch(Strings::isNullOrEmpty)) {
64+
throw new IllegalArgumentException("field name in [fields] cannot be null nor empty");
65+
}
66+
if (excludeFields != null && !excludeFields.isEmpty()) {
67+
throw new IllegalArgumentException("either fields or exclude_fields can be set");
68+
}
69+
}
70+
if (excludeFields != null && !excludeFields.isEmpty() && excludeFields.stream().anyMatch(Strings::isNullOrEmpty)) {
71+
throw new IllegalArgumentException("field name in [exclude_fields] cannot be null nor empty");
72+
}
73+
74+
if (!HASH_METHODS.contains(hashMethod.toUpperCase(Locale.ROOT))) {
75+
throw new IllegalArgumentException("hash method must be MD5@2.16.0, SHA-1@2.16.0 or SHA-256@2.16.0 or SHA3-256@2.16.0");
76+
}
77+
this.fields = fields;
78+
this.excludeFields = excludeFields;
79+
this.targetField = targetField;
80+
this.hashMethod = hashMethod;
81+
this.ignoreMissing = ignoreMissing;
82+
}
83+
84+
public List<String> getFields() {
85+
return fields;
86+
}
87+
88+
public List<String> getExcludeFields() {
89+
return excludeFields;
90+
}
91+
92+
public String getTargetField() {
93+
return targetField;
94+
}
95+
96+
public String getHashMethod() {
97+
return hashMethod;
98+
}
99+
100+
public boolean isIgnoreMissing() {
101+
return ignoreMissing;
102+
}
103+
104+
@Override
105+
public IngestDocument execute(IngestDocument document) {
106+
// we should deduplicate and sort the field names to make sure we can get consistent hash value
107+
final List<String> sortedFields;
108+
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
109+
Set<String> metadataFields = document.getMetadata()
110+
.keySet()
111+
.stream()
112+
.map(IngestDocument.Metadata::getFieldName)
113+
.collect(Collectors.toSet());
114+
// metadata fields such as _index, _id and _routing are ignored
115+
if (fields != null && !fields.isEmpty()) {
116+
sortedFields = fields.stream()
117+
.distinct()
118+
.filter(field -> !metadataFields.contains(field))
119+
.sorted()
120+
.collect(Collectors.toList());
121+
} else if (excludeFields != null && !excludeFields.isEmpty()) {
122+
sortedFields = existingFields.stream()
123+
.filter(field -> !metadataFields.contains(field) && !excludeFields.contains(field))
124+
.sorted()
125+
.collect(Collectors.toList());
126+
} else {
127+
sortedFields = existingFields.stream().filter(field -> !metadataFields.contains(field)).sorted().collect(Collectors.toList());
128+
}
129+
assert (!sortedFields.isEmpty());
130+
131+
final StringBuilder concatenatedFields = new StringBuilder();
132+
sortedFields.forEach(field -> {
133+
if (!document.hasField(field)) {
134+
if (ignoreMissing) {
135+
return;
136+
} else {
137+
throw new IllegalArgumentException("field [" + field + "] doesn't exist");
138+
}
139+
}
140+
141+
final Object value = document.getFieldValue(field, Object.class);
142+
if (value instanceof Map) {
143+
@SuppressWarnings("unchecked")
144+
Map<String, Object> flattenedMap = toFlattenedMap((Map<String, Object>) value);
145+
flattenedMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(entry -> {
146+
String fieldValue = String.valueOf(entry.getValue());
147+
concatenatedFields.append("|")
148+
.append(field)
149+
.append(".")
150+
.append(entry.getKey())
151+
.append("|")
152+
.append(fieldValue.length())
153+
.append(":")
154+
.append(fieldValue);
155+
});
156+
} else {
157+
String fieldValue = String.valueOf(value);
158+
concatenatedFields.append("|").append(field).append("|").append(fieldValue.length()).append(":").append(fieldValue);
159+
}
160+
});
161+
// if all specified fields don't exist and ignore_missing is true, then do nothing
162+
if (concatenatedFields.length() == 0) {
163+
return document;
164+
}
165+
concatenatedFields.append("|");
166+
167+
MessageDigest messageDigest = HashMethod.fromMethodName(hashMethod);
168+
assert (messageDigest != null);
169+
messageDigest.update(concatenatedFields.toString().getBytes(StandardCharsets.UTF_8));
170+
document.setFieldValue(targetField, hashMethod + ":" + Base64.getEncoder().encodeToString(messageDigest.digest()));
171+
172+
return document;
173+
}
174+
175+
@Override
176+
public String getType() {
177+
return TYPE;
178+
}
179+
180+
/**
181+
* Convert a map containing nested fields to a flattened map,
182+
* for example, if the original map is
183+
* {
184+
* "a": {
185+
* "b": 1,
186+
* "c": 2
187+
* }
188+
* }, then the converted map is
189+
* {
190+
* "a.b": 1,
191+
* "a.c": 2
192+
* }
193+
* @param map the original map which may contain nested fields
194+
* @return a flattened map which has only one level fields
195+
*/
196+
@SuppressWarnings("unchecked")
197+
private Map<String, Object> toFlattenedMap(Map<String, Object> map) {
198+
Map<String, Object> flattenedMap = new HashMap<>();
199+
for (Map.Entry<String, Object> entry : map.entrySet()) {
200+
if (entry.getValue() instanceof Map) {
201+
toFlattenedMap((Map<String, Object>) entry.getValue()).forEach(
202+
(key, value) -> flattenedMap.put(entry.getKey() + "." + key, value)
203+
);
204+
} else {
205+
flattenedMap.put(entry.getKey(), entry.getValue());
206+
}
207+
}
208+
return flattenedMap;
209+
}
210+
211+
/**
212+
* The supported hash methods used to generate hash value
213+
*/
214+
enum HashMethod {
215+
MD5(MessageDigests.md5()),
216+
SHA1(MessageDigests.sha1()),
217+
SHA256(MessageDigests.sha256()),
218+
SHA3256(MessageDigests.sha3256());
219+
220+
private final MessageDigest messageDigest;
221+
222+
HashMethod(MessageDigest messageDigest) {
223+
this.messageDigest = messageDigest;
224+
}
225+
226+
public static MessageDigest fromMethodName(String methodName) {
227+
String name = methodName.toUpperCase(Locale.ROOT);
228+
switch (name) {
229+
case "MD5@2.16.0":
230+
return MD5.messageDigest;
231+
case "SHA-1@2.16.0":
232+
return SHA1.messageDigest;
233+
case "SHA-256@2.16.0":
234+
return SHA256.messageDigest;
235+
case "SHA3-256@2.16.0":
236+
return SHA3256.messageDigest;
237+
default:
238+
return null;
239+
}
240+
}
241+
}
242+
243+
public static final class Factory implements Processor.Factory {
244+
@Override
245+
public FingerprintProcessor create(
246+
Map<String, Processor.Factory> registry,
247+
String processorTag,
248+
String description,
249+
Map<String, Object> config
250+
) throws Exception {
251+
List<String> fields = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "fields");
252+
List<String> excludeFields = ConfigurationUtils.readOptionalList(TYPE, processorTag, config, "exclude_fields");
253+
if (fields != null && !fields.isEmpty()) {
254+
if (fields.stream().anyMatch(Strings::isNullOrEmpty)) {
255+
throw newConfigurationException(TYPE, processorTag, "fields", "field name cannot be null nor empty");
256+
}
257+
if (excludeFields != null && !excludeFields.isEmpty()) {
258+
throw newConfigurationException(TYPE, processorTag, "fields", "either fields or exclude_fields can be set");
259+
}
260+
}
261+
if (excludeFields != null && !excludeFields.isEmpty() && excludeFields.stream().anyMatch(Strings::isNullOrEmpty)) {
262+
throw newConfigurationException(TYPE, processorTag, "exclude_fields", "field name cannot be null nor empty");
263+
}
264+
265+
String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", "fingerprint");
266+
String hashMethod = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "hash_method", "SHA-1@2.16.0");
267+
if (!HASH_METHODS.contains(hashMethod.toUpperCase(Locale.ROOT))) {
268+
throw newConfigurationException(
269+
TYPE,
270+
processorTag,
271+
"hash_method",
272+
"hash method must be MD5@2.16.0, SHA-1@2.16.0, SHA-256@2.16.0 or SHA3-256@2.16.0"
273+
);
274+
}
275+
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
276+
return new FingerprintProcessor(processorTag, description, fields, excludeFields, targetField, hashMethod, ignoreMissing);
277+
}
278+
}
279+
}

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
109109
processors.put(CopyProcessor.TYPE, new CopyProcessor.Factory(parameters.scriptService));
110110
processors.put(RemoveByPatternProcessor.TYPE, new RemoveByPatternProcessor.Factory());
111111
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
112+
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
112113
return Collections.unmodifiableMap(processors);
113114
}
114115

0 commit comments

Comments
 (0)