Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add `StreamNumericTermsAggregator` to allow numeric term aggregation streaming ([#19335](https://github.com/opensearch-project/OpenSearch/pull/19335))
- Query planning to determine flush mode for streaming aggregations ([#19488](https://github.com/opensearch-project/OpenSearch/pull/19488))
- Harden the circuit breaker and failure handle logic in query result consumer ([#19396](https://github.com/opensearch-project/OpenSearch/pull/19396))
- Add streaming cardinality aggregator ([#19484](https://github.com/opensearch-project/OpenSearch/pull/19484))

### Changed
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.opensearch.search.aggregations.bucket.terms.StreamStringTermsAggregator;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.metrics.Cardinality;
import org.opensearch.search.aggregations.metrics.Max;
import org.opensearch.search.aggregations.metrics.StreamCardinalityAggregator;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedDynamicSettingsOpenSearchIntegTestCase;
Expand Down Expand Up @@ -104,7 +106,8 @@ public void setUp() throws Exception {
"{\n"
+ " \"properties\": {\n"
+ " \"field1\": { \"type\": \"keyword\" },\n"
+ " \"field2\": { \"type\": \"integer\" }\n"
+ " \"field2\": { \"type\": \"integer\" },\n"
+ " \"field3\": { \"type\": \"keyword\" }\n"
+ " }\n"
+ "}",
XContentType.JSON
Expand All @@ -115,35 +118,35 @@ public void setUp() throws Exception {
BulkRequest bulkRequest = new BulkRequest();

// We'll create 3 segments per shard by indexing docs into each segment and forcing a flush
// Segment 1 - we'll add docs with field2 values in 1-3 range
// Segment 1 - we'll add docs with field2 values in 1-3 range, field3 values type1-3
for (int i = 0; i < 10; i++) {
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 1));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 2));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 3));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 1, "field3", "type1"));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 2, "field3", "type1"));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 3, "field3", "type1"));
}
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
assertFalse(bulkResponse.hasFailures()); // Verify ingestion was successful
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();

// Segment 2 - we'll add docs with field2 values in 11-13 range
// Segment 2 - we'll add docs with field2 values in 11-13 range, field3 values type4-6
bulkRequest = new BulkRequest();
for (int i = 0; i < 10; i++) {
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 11));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 12));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 13));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 11, "field3", "type2"));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 12, "field3", "type2"));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 13, "field3", "type2"));
}
bulkResponse = client().bulk(bulkRequest).actionGet();
assertFalse(bulkResponse.hasFailures());
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();

// Segment 3 - we'll add docs with field2 values in 21-23 range
// Segment 3 - we'll add docs with field2 values in 21-23 range, field3 values type7-9
bulkRequest = new BulkRequest();
for (int i = 0; i < 10; i++) {
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 21));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 22));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 23));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 21, "field3", "type3"));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 22, "field3", "type3"));
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 23, "field3", "type3"));
}
bulkResponse = client().bulk(bulkRequest).actionGet();
assertFalse(bulkResponse.hasFailures());
Expand Down Expand Up @@ -434,4 +437,135 @@ public void testStreamingAggregationNotUsedWithRestrictiveLimits() throws Except
.get();
}
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingCardinalityAggregationUsed() throws Exception {
// This test validates cardinality streaming aggregation with profile to verify streaming is used
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.addAggregation(AggregationBuilders.cardinality("cardinality_agg").field("field1"))
.setSize(0)
.setRequestCache(false)
.setProfile(true)
.execute();
SearchResponse resp = future.actionGet();
assertNotNull(resp);
assertEquals(NUM_SHARDS, resp.getTotalShards());
assertEquals(90, resp.getHits().getTotalHits().value());

// Validate that streaming cardinality aggregation was actually used
assertNotNull("Profile response should be present", resp.getProfileResults());
boolean foundStreamingCardinality = false;
for (var shardProfile : resp.getProfileResults().values()) {
List<ProfileResult> aggProfileResults = shardProfile.getAggregationProfileResults().getProfileResults();
for (var profileResult : aggProfileResults) {
if (StreamCardinalityAggregator.class.getSimpleName().equals(profileResult.getQueryName())) {
var debug = profileResult.getDebugInfo();
if (debug != null && debug.containsKey("streaming_enabled")) {
foundStreamingCardinality = true;
assertTrue("streaming_enabled should be true", (Boolean) debug.get("streaming_enabled"));
assertTrue("streaming_precision should be positive", ((Number) debug.get("streaming_precision")).intValue() > 0);
break;
}
}
}
if (foundStreamingCardinality) break;
}
assertTrue("Expected to find streaming cardinality in profile", foundStreamingCardinality);

// Also verify the result is correct
Cardinality cardinalityAgg = resp.getAggregations().get("cardinality_agg");
assertNotNull(cardinalityAgg);
// field1 has 3 unique values: value1, value2, value3
assertTrue("Expected cardinality around 3, got " + cardinalityAgg.getValue(), cardinalityAgg.getValue() >= 2);
assertTrue("Expected cardinality around 3, got " + cardinalityAgg.getValue(), cardinalityAgg.getValue() <= 4);
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingCardinalityAggregation() throws Exception {
// Test cardinality of field1 which has 3 unique values (value1, value2, value3)
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.addAggregation(AggregationBuilders.cardinality("cardinality_agg").field("field1").precisionThreshold(1000))
.setSize(0)
.setRequestCache(false)
.execute();
SearchResponse resp = future.actionGet();

assertNotNull(resp);
assertEquals(NUM_SHARDS, resp.getTotalShards());
assertEquals(90, resp.getHits().getTotalHits().value());

Cardinality cardinalityAgg = resp.getAggregations().get("cardinality_agg");
assertNotNull("Cardinality aggregation should not be null", cardinalityAgg);
// field1 has 3 unique values: value1, value2, value3
// HyperLogLog is approximate, so we allow some tolerance
assertTrue("Expected cardinality around 3, got " + cardinalityAgg.getValue(), cardinalityAgg.getValue() >= 2);
assertTrue("Expected cardinality around 3, got " + cardinalityAgg.getValue(), cardinalityAgg.getValue() <= 4);
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingCardinalityWithPrecisionThreshold() throws Exception {
// Test cardinality with different precision thresholds
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.addAggregation(AggregationBuilders.cardinality("cardinality_low").field("field1").precisionThreshold(10))
.addAggregation(AggregationBuilders.cardinality("cardinality_high").field("field1").precisionThreshold(1000))
.setSize(0)
.setRequestCache(false)
.execute();
SearchResponse resp = future.actionGet();

assertNotNull(resp);
assertEquals(NUM_SHARDS, resp.getTotalShards());
assertEquals(90, resp.getHits().getTotalHits().value());

Cardinality lowPrecision = resp.getAggregations().get("cardinality_low");
assertNotNull(lowPrecision);
assertEquals(3, lowPrecision.getValue(), 0.0);

Cardinality highPrecision = resp.getAggregations().get("cardinality_high");
assertNotNull(highPrecision);
assertEquals(3, highPrecision.getValue(), 0.0);

// Both should give the same result for small cardinality
assertEquals(lowPrecision.getValue(), highPrecision.getValue(), 0.0);
}

@LockFeatureFlag(STREAM_TRANSPORT)
public void testStreamingCardinalityAsSubAggregation() throws Exception {
// Test cardinality as a sub-aggregation under terms aggregation
// Using field3 (keyword field) for cardinality since StreamCardinalityAggregator only supports ordinal value sources
TermsAggregationBuilder agg = terms("terms_agg").field("field1")
.subAggregation(AggregationBuilders.cardinality("cardinality_subagg").field("field3").precisionThreshold(1000));

ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
.addAggregation(agg)
.setSize(0)
.setRequestCache(false)
.execute();
SearchResponse resp = future.actionGet();

assertNotNull(resp);
assertEquals(NUM_SHARDS, resp.getTotalShards());
assertEquals(90, resp.getHits().getTotalHits().value());

StringTerms termsAgg = resp.getAggregations().get("terms_agg");
assertNotNull(termsAgg);
List<StringTerms.Bucket> buckets = termsAgg.getBuckets();
assertEquals(3, buckets.size());

buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));

// Each bucket should have cardinality of 3 (each field1 value appears with 3 different field3 values)
// Based on the data: all field1 values→{type1,type2,type3}
for (StringTerms.Bucket bucket : buckets) {
assertEquals(30, bucket.getDocCount());
Cardinality cardinalitySubAgg = bucket.getAggregations().get("cardinality_subagg");
assertNotNull(cardinalitySubAgg);
// Each field1 value appears with exactly 3 field3 values
// HyperLogLog is approximate, allow some tolerance
assertTrue(
"Expected cardinality around 3 for bucket " + bucket.getKeyAsString() + ", got " + cardinalitySubAgg.getValue(),
cardinalitySubAgg.getValue() >= 2 && cardinalitySubAgg.getValue() <= 4
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void reset() {
collectableSubAggregators.reset();
}

protected void doReset() {}
public void doReset() {}

/** Called upon release of the aggregator. */
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public List<InternalAggregation> buildAggBatch(Collector collectorTree) throws I
} else if (currentCollector instanceof BucketCollector) {
// Perform build aggregation during post collection
if (currentCollector instanceof Aggregator) {
// Call postCollection() before building to ensure collectors finalize their data
// This is critical for aggregators like CardinalityAggregator that defer processing until postCollect()
((BucketCollector) currentCollector).postCollection();
aggregations.add(((Aggregator) currentCollector).buildTopLevelBatch());
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,24 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue

private static final Logger logger = LogManager.getLogger(CardinalityAggregator.class);

private final CardinalityAggregatorFactory.ExecutionMode executionMode;
private final int precision;
private final ValuesSource valuesSource;
final CardinalityAggregatorFactory.ExecutionMode executionMode;
final int precision;
final ValuesSource valuesSource;

private final ValuesSourceConfig valuesSourceConfig;

// Expensive to initialize, so we only initialize it when we have an actual value source
@Nullable
private HyperLogLogPlusPlus counts;
HyperLogLogPlusPlus counts;

private Collector collector;
Collector collector;

private int emptyCollectorsUsed;
private int numericCollectorsUsed;
private int ordinalsCollectorsUsed;
private int ordinalsCollectorsOverheadTooHigh;
private int stringHashingCollectorsUsed;
private int dynamicPrunedSegments;
int emptyCollectorsUsed;
int numericCollectorsUsed;
int ordinalsCollectorsUsed;
int ordinalsCollectorsOverheadTooHigh;
int stringHashingCollectorsUsed;
int dynamicPrunedSegments;

public CardinalityAggregator(
String name,
Expand Down Expand Up @@ -349,7 +349,7 @@ public void collectDebugInfo(BiConsumer<String, Object> add) {
*
* @opensearch.internal
*/
private abstract static class Collector extends LeafBucketCollector implements Releasable {
abstract static class Collector extends LeafBucketCollector implements Releasable {

public abstract void postCollect() throws IOException;

Expand Down Expand Up @@ -488,7 +488,7 @@ public long cost() {
*
* @opensearch.internal
*/
private static class EmptyCollector extends Collector {
static class EmptyCollector extends Collector {

@Override
public void collect(int doc, long bucketOrd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {

@Override
protected Aggregator createUnmapped(SearchContext searchContext, Aggregator parent, Map<String, Object> metadata) throws IOException {
if (searchContext.isStreamSearch()) {
return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
}
return new CardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
}

Expand All @@ -114,6 +117,9 @@ protected Aggregator doCreateInternal(
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
if (searchContext.isStreamSearch()) {
return new StreamCardinalityAggregator(name, config, precision(), searchContext, parent, metadata, executionMode);
}
return queryShardContext.getValuesSourceRegistry()
.getAggregator(CardinalityAggregationBuilder.REGISTRY_KEY, config)
.build(name, config, precision(), searchContext, parent, metadata, executionMode);
Expand Down
Loading
Loading