Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement bloom_filter_agg #987

Merged
merged 28 commits into from
Oct 18, 2024
Merged

Conversation

mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Sep 30, 2024

Which issue does this PR close?

Closes #846.

Rationale for this change

What changes are included in this PR?

  • Native implementation (bloom_filter_agg.rs) that uses DataFusion's Accumulator trait. We do not have a GroupsAccumulator implementation and leave it as a possible future optimization.
  • Serde logic (planner.rs, QueryPlanSerde.scala)
  • Serialization and merging logic for underlying data structures (spark_bloom_filter.rs, spark_bit_array.rs)

How are these changes tested?

  • New test in CometExecSuite
  • Spark tests in CI exercise this aggregation
  • Scala benchmark to compare against Spark code path
  • Native benchmark for partial and final aggregation modes
  • Native tests for new bit array logic spark_bit_array.rs

@mbutrovich mbutrovich marked this pull request as draft September 30, 2024 19:39
fn state(&mut self) -> Result<Vec<ScalarValue>> {
// TODO(Matt): There might be a more efficient way to do this by transmuting since calling
// state() on an Accumulator is considered destructive.
let state_sv = ScalarValue::Binary(Some(self.state_as_bytes()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to avoid the copy, which may be too ugly , would be to store bloom filter data as an Option<>

So instead of

pub struct SparkBloomFilter {
    bits: SparkBitArray,
    num_hash_functions: u32,
}

Something like

pub struct SparkBloomFilter {
    bits: Option<SparkBitArray>
    num_hash_functions: u32,
}

And then you could basically use Option::take to take the value and leave a None in its place

let Some(bits) = self.bits.take() else {
  return Err(invalid state)
};

// do whatever you want now you have the owned `bits`

@codecov-commenter
Copy link

codecov-commenter commented Oct 2, 2024

Codecov Report

Attention: Patch coverage is 76.47059% with 4 lines in your changes missing coverage. Please review.

Project coverage is 34.41%. Comparing base (c3023c5) to head (bf22902).
Report is 19 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 76.47% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #987      +/-   ##
============================================
+ Coverage     34.03%   34.41%   +0.38%     
- Complexity      875      889      +14     
============================================
  Files           112      112              
  Lines         43289    43428     +139     
  Branches       9572     9627      +55     
============================================
+ Hits          14734    14947     +213     
+ Misses        25521    25437      -84     
- Partials       3034     3044      +10     
Flag Coverage Δ
?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@mbutrovich
Copy link
Contributor Author

Results from the benchmark I just added:

BloomFilterAggregate Exec (cardinality 100):       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         117            136          18         89.4          11.2       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  117            134          18         89.4          11.2       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)             71             78           9        148.3           6.7       1.7X

BloomFilterAggregate Exec (cardinality 1024):      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         111            128          11         94.7          10.6       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  110            135          15         95.7          10.4       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)             69             78          12        152.9           6.5       1.6X

BloomFilterAggregate Exec (cardinality 1048576):   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         165            183          14         63.6          15.7       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  169            184          11         62.0          16.1       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)            117            126           9         89.2          11.2       1.4X

@mbutrovich mbutrovich marked this pull request as ready for review October 2, 2024 15:49
@alamb
Copy link
Contributor

alamb commented Oct 2, 2024

Results from the benchmark I just added:

Looks like a 40% improvement.

image

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still looking

@mbutrovich
Copy link
Contributor Author

Just putting notes for the test failure. It's failing one Spark test in InjectRuntimeFilterSuite. The test is "Merge runtime bloom filters". It's failing in a check in CometArrayImporter when it's bringing Arrow data from Native back over to JVM.

The plan is a bit of a monster, but I'll provide it below:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [c1#45920, b1#45919], [c2#45926, b2#45925], Inner
   :- Sort [c1#45920 ASC NULLS FIRST, b1#45919 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1#45920, b1#45919, 5), ENSURE_REQUIREMENTS, [plan_id=739]
   :     +- Filter (((isnotnull(c1#45920) AND isnotnull(b1#45919)) AND might_contain(Subquery subquery#45946, [id=#610].bloomFilter, xxhash64(c1#45920, 42))) AND might_contain(Subquery subquery#45949, [id=#678].bloomFilter, xxhash64(b1#45919, 42)))
   :        :  :- Subquery subquery#45946, [id=#610]
   :        :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :        :  :     +- CometProject [mergedValue#45952], [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :  :        +- !CometHashAggregate [buf#45954, buf#45955], Final, [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :  :           +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=605]
   :        :  :              +- !CometHashAggregate [c2#45926, b2#45925], Partial, [partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :  :                 +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :  :                    +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :  :                       +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        :  +- Subquery subquery#45949, [id=#678]
   :        :     +- AdaptiveSparkPlan isFinalPlan=false
   :        :        +- CometProject [mergedValue#45952], [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :           +- !CometHashAggregate [buf#45954, buf#45955], Final, [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :              +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=673]
   :        :                 +- !CometHashAggregate [c2#45926, b2#45925], Partial, [partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :                    +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :                       +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :                          +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        +- CometScan parquet spark_catalog.default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] Batched: true, DataFilters: [isnotnull(c1#45920), isnotnull(b1#45919)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/matt/git/spark/spark-warehouse/org.apache.spark.sql.Inject..., PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(b1)], ReadSchema: struct<a1:int,b1:int,c1:int,d1:int,e1:int,f1:int>
   +- CometSort [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], [c2#45926 ASC NULLS FIRST, b2#45925 ASC NULLS FIRST]
      +- CometExchange hashpartitioning(c2#45926, b2#45925, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=742]
         +- CometFilter [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
            +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>

This is what it looks like on the main branch:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [c1#45920, b1#45919], [c2#45926, b2#45925], Inner
   :- Sort [c1#45920 ASC NULLS FIRST, b1#45919 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1#45920, b1#45919, 5), ENSURE_REQUIREMENTS, [plan_id=729]
   :     +- Filter (((isnotnull(c1#45920) AND isnotnull(b1#45919)) AND might_contain(Subquery subquery#45946, [id=#605].bloomFilter, xxhash64(c1#45920, 42))) AND might_contain(Subquery subquery#45949, [id=#668].bloomFilter, xxhash64(b1#45919, 42)))
   :        :  :- Subquery subquery#45946, [id=#605]
   :        :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :        :  :     +- Project [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :  :        +- ObjectHashAggregate(keys=[], functions=[bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=601]
   :        :  :              +- ObjectHashAggregate(keys=[], functions=[partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :  :                 +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :  :                    +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :  :                       +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/matt/git/spark/spark-warehouse/org.apache.spark.sql.Inject..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        :  +- Subquery subquery#45949, [id=#668]
   :        :     +- AdaptiveSparkPlan isFinalPlan=false
   :        :        +- Project [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :           +- ObjectHashAggregate(keys=[], functions=[bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=664]
   :        :                 +- ObjectHashAggregate(keys=[], functions=[partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :                    +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :                       +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :                          +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        +- CometScan parquet spark_catalog.default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] Batched: true, DataFilters: [isnotnull(c1#45920), isnotnull(b1#45919)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(b1)], ReadSchema: struct<a1:int,b1:int,c1:int,d1:int,e1:int,f1:int>
   +- CometSort [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], [c2#45926 ASC NULLS FIRST, b2#45925 ASC NULLS FIRST]
      +- CometExchange hashpartitioning(c2#45926, b2#45925, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=732]
         +- CometFilter [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
            +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we can merge this as soon as we pass all the tests. We can work on optimizations separately if necessary.

@mbutrovich
Copy link
Contributor Author

mbutrovich commented Oct 4, 2024

Debugger output from the failing state in CometArrayImporter.

this = {CometArrayImporter@18451} 
snapshot = {ArrowArray$Snapshot@18448} 
 length = 1
 null_count = 0
 offset = 0
 n_buffers = 1
 n_children = 2
 buffers = 105553139081104
 children = 105553139081088
 dictionary = 0
 release = 6002611972
 private_data = 105553130135808
children = {long[2]@18449} [105553174820048, 105553174820208]
 0 = 105553174820048
 1 = 105553174820208
childVectors = {ArrayList@18450}  size = 1
 0 = {VarBinaryVector@22881} "[]"
vector = {StructVector@18452} "[]"
 reader = {NullableStructReaderImpl@22883} 
 writer = {NullableStructWriter@22884} "org.apache.comet.shaded.arrow.vector.complex.impl.NullableStructWriter@8a493b[index = 0]"
 validityBuffer = {ArrowBuf@22885} "ArrowBuf[1], address:0, capacity:0"
 validityAllocationSizeInBytes = 497
 NonNullableStructVector.reader = {SingleStructReaderImpl@22886} 
 field = {Field@22887} "Struct not null"
 valueCount = 0
 ephPair = null
 vectors = {PromotableMultiMapWithOrdinal@22888} 
 allowConflictPolicyChanges = true
 conflictPolicy = {AbstractStructVector$ConflictPolicy@22889} "CONFLICT_REPLACE"
 name = null
 allocator = {RootAllocator@18453} "Allocator(ROOT) 0/2176/4196904/9223372036854775807 (res/actual/peak/limit)\n"
 callBack = null
children.length = 2

The entire subquery runs in native code now, so my guess is that the output from that projection, which looks like it should be a struct with two binary values in it, is wrong. I'm not sure if it's a bug in the projection, or something further downstream.

@viirya
Copy link
Member

viirya commented Oct 9, 2024

I do not have time to look at this error yet. I may take a look after the conference.

@mbutrovich
Copy link
Contributor Author

Can't say I see a huge different in TPC-H or TPC-DS locally, but the plans I looked at were typically building filters over very small relations.

@kazuyukitanimura
Copy link
Contributor

@mbutrovich Is it possible to trace back where children and childVectors are populated?

@viirya
Copy link
Member

viirya commented Oct 12, 2024

The Spark SQL test failure can be fixed by #1016.

@viirya
Copy link
Member

viirya commented Oct 13, 2024

I merged the fix. You can rebase and re-trigger CI now.

@mbutrovich
Copy link
Contributor Author

mbutrovich commented Oct 13, 2024

Merged in updated main, thanks for the fix!

(0..arr.len()).try_for_each(|index| {
let v = ScalarValue::try_from_array(arr, index)?;

if let ScalarValue::Int64(Some(value)) = v {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only supports Int64? Spark BloomFilterAggregate supports Byte, Short, Int, Long and String. If Comet BloomFilterAggregate only support Int64 for now. We need to fallback to Spark for other cases in QueryPlanSerde.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was going off of their docs which say it only supports Long.

In their implementation, however, if looks like they can cast the fixed width types directly to Long
https://github.com/apache/spark/blob/b078c0d6e2adf7eb0ee7d4742a6c52864440226e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L238

and for strings their bloom filter implementation has a putBinary method that we don't currently support. The casts should be easy. I'll look at what putBinary on our bloom filter implementation will take.

Copy link
Contributor Author

@mbutrovich mbutrovich Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what happened. 3.4 only supports Long, which was the Spark source I was working off of. 3.5 added support for other types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified it to only generate a native BloomFilterAgg if the child has LongType. I'll open an issue to support more types in the future.

Comment on lines +95 to +96
// Does it make sense to do a std::mem::take of filter_state here? Unclear to me if a deep
// copy of filter_state as a Vec<u64> to a Vec<u8> is happening here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if std::mem::take also does copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_state isn't needed after this function call, so ideally I'd be able to move its contents out instead of the byte slice, but because the underlying type of filter_state is u64 vector and I'm stuffing it into a u8 vector I think the alignment requirements won't be the same and I won't get the clean buffer transfer that I want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u64 vector alignment should be acceptable for u8 vector alignment. At least locally I saw their alignments are same.

Comment on lines 772 to 777
if (childExpr.isDefined &&
child.dataType
.isInstanceOf[LongType] && // Spark 3.4 only supports Long, 3.5+ adds more types.
numItemsExpr.isDefined &&
numBitsExpr.isDefined &&
dataType.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor nit (and no need to address for this PR, because this is a general issue that we already have) is that we are testing for a multiple preconditions here, and we fall back if any of them are false (which is correct) but we do not let the user know the specific reason for the fallback, which makes debugging more difficult.

I would eventually like to explore refactoring how we approach this and see if we can add some utilities to make it easier to report fallback reasons, but this is much lower priority than the performance & stability work for now.

@andygrove
Copy link
Member

I tested with TPC-H q5 and see that we are now running the bloom filter agg natively

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mbutrovich

@andygrove andygrove merged commit e3ac6cf into apache:main Oct 18, 2024
74 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for bloom_filter_agg
6 participants