Skip to content

Commit 1a53317

Browse files
authored
Add temporal routing processors for time-based document routing (#18966)
Implements TemporalRoutingProcessor for ingest pipelines and TemporalRoutingSearchProcessor for search pipelines based on RFC #18920. Features: - Route documents to shards based on timestamp fields - Support hour, day, week, and month granularities - Optional hash bucketing for better distribution - Automatic search routing to relevant time ranges - ISO week format support The processors enable efficient time-based data organization for log and metrics workloads by co-locating documents from the same time period on the same shards. --------- Signed-off-by: Atri Sharma <atri.jiit@gmail.com>
1 parent 64da5f1 commit 1a53317

File tree

9 files changed

+1388
-1
lines changed

9 files changed

+1388
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 3.x]
77
### Added
8+
- Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920))
89

910
### Changed
1011

modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
121121
processors.put(CommunityIdProcessor.TYPE, new CommunityIdProcessor.Factory());
122122
processors.put(FingerprintProcessor.TYPE, new FingerprintProcessor.Factory());
123123
processors.put(HierarchicalRoutingProcessor.TYPE, new HierarchicalRoutingProcessor.Factory());
124+
processors.put(TemporalRoutingProcessor.TYPE, new TemporalRoutingProcessor.Factory());
124125
processors.put(AclRoutingProcessor.TYPE, new AclRoutingProcessor.Factory());
125126
return filterForAllowlistSetting(parameters.env.settings(), processors);
126127
}
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest.common;
10+
11+
import org.opensearch.common.Nullable;
12+
import org.opensearch.common.hash.MurmurHash3;
13+
import org.opensearch.common.time.DateFormatter;
14+
import org.opensearch.common.time.DateFormatters;
15+
import org.opensearch.core.common.Strings;
16+
import org.opensearch.ingest.AbstractProcessor;
17+
import org.opensearch.ingest.ConfigurationUtils;
18+
import org.opensearch.ingest.IngestDocument;
19+
import org.opensearch.ingest.Processor;
20+
21+
import java.nio.charset.StandardCharsets;
22+
import java.time.ZoneOffset;
23+
import java.time.ZonedDateTime;
24+
import java.time.temporal.ChronoUnit;
25+
import java.time.temporal.TemporalAccessor;
26+
import java.util.Locale;
27+
import java.util.Map;
28+
29+
import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;
30+
31+
/**
32+
* Processor that sets document routing based on temporal structure.
33+
*
34+
* This processor extracts a timestamp from a specified field, truncates it
35+
* to a configurable granularity (hour/day/week/month), and uses the resulting
36+
* temporal bucket to compute a routing value for improved temporal locality.
37+
*
38+
* Introduced in OpenSearch 3.2.0 to enable intelligent document co-location
39+
* based on time-based patterns for log and metrics workloads.
40+
*/
41+
public final class TemporalRoutingProcessor extends AbstractProcessor {
42+
43+
public static final String TYPE = "temporal_routing";
44+
private static final String DEFAULT_FORMAT = "strict_date_optional_time";
45+
46+
private final String timestampField;
47+
private final Granularity granularity;
48+
private final DateFormatter dateFormatter;
49+
private final boolean ignoreMissing;
50+
private final boolean overrideExisting;
51+
private final boolean hashBucket;
52+
53+
/**
54+
* Supported temporal granularities
55+
*/
56+
public enum Granularity {
57+
/** Hour granularity for hourly bucketing */
58+
HOUR(ChronoUnit.HOURS),
59+
/** Day granularity for daily bucketing */
60+
DAY(ChronoUnit.DAYS),
61+
/** Week granularity for weekly bucketing (ISO week) */
62+
WEEK(ChronoUnit.WEEKS),
63+
/** Month granularity for monthly bucketing */
64+
MONTH(ChronoUnit.MONTHS);
65+
66+
private final ChronoUnit chronoUnit;
67+
68+
Granularity(ChronoUnit chronoUnit) {
69+
this.chronoUnit = chronoUnit;
70+
}
71+
72+
/**
73+
* Gets the ChronoUnit associated with this granularity
74+
* @return the ChronoUnit
75+
*/
76+
public ChronoUnit getChronoUnit() {
77+
return chronoUnit;
78+
}
79+
80+
/**
81+
* Parses a string value to a Granularity enum
82+
* @param value the string representation of the granularity
83+
* @return the corresponding Granularity enum value
84+
* @throws IllegalArgumentException if the value is not valid
85+
*/
86+
public static Granularity fromString(String value) {
87+
try {
88+
return valueOf(value.toUpperCase(Locale.ROOT));
89+
} catch (IllegalArgumentException e) {
90+
throw new IllegalArgumentException("Invalid granularity: " + value + ". Supported values are: hour, day, week, month");
91+
}
92+
}
93+
}
94+
95+
TemporalRoutingProcessor(
96+
String tag,
97+
@Nullable String description,
98+
String timestampField,
99+
Granularity granularity,
100+
String format,
101+
boolean ignoreMissing,
102+
boolean overrideExisting,
103+
boolean hashBucket
104+
) {
105+
super(tag, description);
106+
this.timestampField = timestampField;
107+
this.granularity = granularity;
108+
this.dateFormatter = DateFormatter.forPattern(format);
109+
this.ignoreMissing = ignoreMissing;
110+
this.overrideExisting = overrideExisting;
111+
this.hashBucket = hashBucket;
112+
}
113+
114+
@Override
115+
public IngestDocument execute(IngestDocument document) throws Exception {
116+
// Check if routing already exists and we shouldn't override
117+
if (!overrideExisting) {
118+
try {
119+
Object existingRouting = document.getFieldValue("_routing", Object.class, true);
120+
if (existingRouting != null) {
121+
return document;
122+
}
123+
} catch (Exception e) {
124+
// Field doesn't exist, continue with processing
125+
}
126+
}
127+
128+
Object timestampValue = document.getFieldValue(timestampField, Object.class, ignoreMissing);
129+
130+
if (timestampValue == null && ignoreMissing) {
131+
return document;
132+
}
133+
134+
if (timestampValue == null) {
135+
throw new IllegalArgumentException("field [" + timestampField + "] not present as part of path [" + timestampField + "]");
136+
}
137+
138+
String routingValue = computeRoutingValue(timestampValue.toString());
139+
document.setFieldValue("_routing", routingValue);
140+
141+
return document;
142+
}
143+
144+
/**
145+
* Computes routing value from timestamp by truncating to granularity
146+
* and optionally hashing for distribution
147+
*/
148+
private String computeRoutingValue(String timestamp) {
149+
// Parse timestamp using DateFormatter and convert to ZonedDateTime
150+
TemporalAccessor accessor = dateFormatter.parse(timestamp);
151+
ZonedDateTime dateTime = DateFormatters.from(accessor, Locale.ROOT, ZoneOffset.UTC);
152+
153+
// Truncate to granularity
154+
ZonedDateTime truncated = truncateToGranularity(dateTime);
155+
156+
// Create temporal bucket key
157+
String temporalBucket = createTemporalBucketKey(truncated);
158+
159+
// Optionally hash for distribution
160+
if (hashBucket) {
161+
byte[] bucketBytes = temporalBucket.getBytes(StandardCharsets.UTF_8);
162+
long hash = MurmurHash3.hash128(bucketBytes, 0, bucketBytes.length, 0, new MurmurHash3.Hash128()).h1;
163+
return String.valueOf(hash == Long.MIN_VALUE ? 0L : (hash < 0 ? -hash : hash));
164+
}
165+
166+
return temporalBucket;
167+
}
168+
169+
/**
170+
* Truncates datetime to the specified granularity
171+
*
172+
* IMPORTANT: This logic MUST be kept in sync with TemporalRoutingSearchProcessor.truncateToGranularity()
173+
* in the search-pipeline-common module to ensure consistent temporal bucketing.
174+
*/
175+
private ZonedDateTime truncateToGranularity(ZonedDateTime dateTime) {
176+
switch (granularity) {
177+
case HOUR:
178+
return dateTime.withMinute(0).withSecond(0).withNano(0);
179+
case DAY:
180+
return dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);
181+
case WEEK:
182+
// Truncate to start of week (Monday)
183+
ZonedDateTime dayTruncated = dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0);
184+
return dayTruncated.with(java.time.temporal.TemporalAdjusters.previousOrSame(java.time.DayOfWeek.MONDAY));
185+
case MONTH:
186+
return dateTime.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
187+
default:
188+
throw new IllegalArgumentException("Unsupported granularity: " + granularity);
189+
}
190+
}
191+
192+
/**
193+
* Creates a string key for the temporal bucket
194+
*
195+
* IMPORTANT: This logic MUST be kept in sync with TemporalRoutingSearchProcessor.createTemporalBucket()
196+
* in the search-pipeline-common module. Both processors must generate identical bucket keys for the
197+
* same input to ensure documents are routed to the same shards during ingest and search.
198+
*
199+
* TODO: Consider moving this shared logic to a common module when search and ingest pipelines
200+
* can share code more easily.
201+
*/
202+
private String createTemporalBucketKey(ZonedDateTime truncated) {
203+
switch (granularity) {
204+
case HOUR:
205+
return truncated.getYear()
206+
+ "-"
207+
+ String.format(Locale.ROOT, "%02d", truncated.getMonthValue())
208+
+ "-"
209+
+ String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth())
210+
+ "T"
211+
+ String.format(Locale.ROOT, "%02d", truncated.getHour());
212+
case DAY:
213+
return truncated.getYear()
214+
+ "-"
215+
+ String.format(Locale.ROOT, "%02d", truncated.getMonthValue())
216+
+ "-"
217+
+ String.format(Locale.ROOT, "%02d", truncated.getDayOfMonth());
218+
case WEEK:
219+
// Use ISO week format: YYYY-WNN
220+
int weekOfYear = truncated.get(java.time.temporal.WeekFields.ISO.weekOfWeekBasedYear());
221+
int weekYear = truncated.get(java.time.temporal.WeekFields.ISO.weekBasedYear());
222+
return weekYear + "-W" + String.format(Locale.ROOT, "%02d", weekOfYear);
223+
case MONTH:
224+
return truncated.getYear() + "-" + String.format(Locale.ROOT, "%02d", truncated.getMonthValue());
225+
default:
226+
throw new IllegalArgumentException("Unsupported granularity: " + granularity);
227+
}
228+
}
229+
230+
@Override
231+
public String getType() {
232+
return TYPE;
233+
}
234+
235+
String getTimestampField() {
236+
return timestampField;
237+
}
238+
239+
Granularity getGranularity() {
240+
return granularity;
241+
}
242+
243+
DateFormatter getDateFormatter() {
244+
return dateFormatter;
245+
}
246+
247+
boolean isIgnoreMissing() {
248+
return ignoreMissing;
249+
}
250+
251+
boolean isOverrideExisting() {
252+
return overrideExisting;
253+
}
254+
255+
boolean isHashBucket() {
256+
return hashBucket;
257+
}
258+
259+
/**
260+
* Factory for creating TemporalRoutingProcessor instances
261+
*/
262+
public static final class Factory implements Processor.Factory {
263+
264+
@Override
265+
public TemporalRoutingProcessor create(
266+
Map<String, Processor.Factory> processorFactories,
267+
String tag,
268+
@Nullable String description,
269+
Map<String, Object> config
270+
) throws Exception {
271+
272+
String timestampField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "timestamp_field");
273+
String granularityStr = ConfigurationUtils.readStringProperty(TYPE, tag, config, "granularity");
274+
String format = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "format");
275+
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
276+
boolean overrideExisting = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "override_existing", true);
277+
boolean hashBucket = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "hash_bucket", false);
278+
279+
// Set default format if not provided
280+
if (format == null) {
281+
format = DEFAULT_FORMAT;
282+
}
283+
284+
// Validation
285+
if (Strings.isNullOrEmpty(timestampField)) {
286+
throw newConfigurationException(TYPE, tag, "timestamp_field", "cannot be null or empty");
287+
}
288+
289+
if (Strings.isNullOrEmpty(granularityStr)) {
290+
throw newConfigurationException(TYPE, tag, "granularity", "cannot be null or empty");
291+
}
292+
293+
Granularity granularity;
294+
try {
295+
granularity = Granularity.fromString(granularityStr);
296+
} catch (IllegalArgumentException e) {
297+
throw newConfigurationException(TYPE, tag, "granularity", e.getMessage());
298+
}
299+
300+
// Validate date format
301+
try {
302+
DateFormatter.forPattern(format);
303+
} catch (Exception e) {
304+
throw newConfigurationException(TYPE, tag, "format", "invalid date format: " + e.getMessage());
305+
}
306+
307+
return new TemporalRoutingProcessor(
308+
tag,
309+
description,
310+
timestampField,
311+
granularity,
312+
format,
313+
ignoreMissing,
314+
overrideExisting,
315+
hashBucket
316+
);
317+
}
318+
}
319+
}

modules/ingest-common/src/test/java/org/opensearch/ingest/common/IngestCommonModulePluginTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public void testAllowlistNotSpecified() throws IOException {
7575
"uppercase",
7676
"split",
7777
"hierarchical_routing",
78+
"temporal_routing",
7879
"acl_routing"
7980
);
8081
assertEquals(expected, plugin.getProcessors(createParameters(settings)).keySet());

0 commit comments

Comments
 (0)