Skip to content

Conversation

@arov00
Copy link
Contributor

@arov00 arov00 commented May 25, 2025

Some notes for reviewers: In production, we have noticed a consistently high memory usage when OLAP is enabled. After a detailed look into DuckDB's memory usage, we noticed that almost all the memory allocated to DuckDB was used by an ART index generated for a primary key constraint.

The reason we even have this primary key in place is to enable us to do upserts on the aggregated_data table. I.e. whenever we insert rows into the table, we want the most recent row with the same ID (which is generated from a combination of start/end timestamps, context, metric name, etc.) to overwrite the previous value for that ID. This way, we have only a single aggregation for every time window. The ART index is necessary to be able to perform upserts efficiently.

In this PR, we remove the primary key and opt to not upsert. Instead, we admit duplicate IDs and then simply return the most recent row for every ID. This drastically reduces memory usage at the expense of potentially slower queries. Since during normal operations, there are way more writes than reads, this seems to be an acceptable trade-off.

In the future, it may be worth exploring automatic cleanup functionality to limit DB table growth. A more immediate task would be the creation of a migration script that moves all data from the table with the primary key constraint to the same table without one (we cannot drop primary key constraints and associated indexes in DuckDB)

Alexander Rovner added 5 commits April 15, 2025 14:56
ART index generated for the primary key in the aggregated data table seems to be the reason for the consistently high memory usage and the database's inability to do larger-than-memory processing. Changed all queries to deal with the ID no longer being unique.
@arov00 arov00 requested a review from a team as a code owner May 25, 2025 10:22
@sonarqubecloud
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
1 Security Hotspot
6.7% Coverage on New Code (required ≥ 80%)
E Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@gaetancollaud
Copy link
Member

gaetancollaud commented May 25, 2025

Huge memory for a primary keys sounds like a bad design...

Also this means we're not idempotent anymore. Windows can be re-generated a lot of time. IMO we should support "at-least-once" semantic and not duplicate. I guess the storage can quickly grow.

Could we use an unique index or something? I guess it's the same, I'm just asking.

Or better could we try to force an index type and not use ART?

@arov00
Copy link
Contributor Author

arov00 commented May 26, 2025

Also this means we're not idempotent anymore. Windows can be re-generated a lot of time. IMO we should support "at-least-once" semantic and not duplicate. I guess the storage can quickly grow.

Could you elaborate? Supporting "at-least-once" while avoiding duplicates sounds like a contradiction to me.

Could we use an unique index or something? I guess it's the same, I'm just asking.

Or better could we try to force an index type and not use ART?

DuckDB has only two built-in index types. The ART index is used for highly selective queries, which is what we need to do upserts. It is created automatically for uniqueness constraints and primary key constraints.

SELECT * FROM aggregated_data
WHERE start_time >= ? AND end_time <= ?
""" + nameFilter)) {
SELECT aggdata.id, ANY_VALUE(aggdata.start_time) AS start_time, ANY_VALUE(aggdata.end_time) AS end_time, ANY_VALUE(aggdata.initial_metric_name) AS initial_metric_name, ANY_VALUE(aggdata.entity_type) AS entity_type, ANY_VALUE(aggdata.name) AS name, ANY_VALUE(aggdata.tags) AS tags, ANY_VALUE(aggdata.context) AS context, ANY_VALUE(aggdata.target) AS target, MAX(aggdata.value) AS value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That this is correct was initially pretty confusing to me.
Since id encodes start_time and end_time we can do MAX(aggdata.value) since we are inside the same tumbling window where we do aggregation with SUM or MAX.
Also ANY_VALUE(start_time) is correct since grouping by id also kinda groups by all the elements used for the hash.

Maybe it's clearer what's going on here if we replace GROUP BY aggdata.id with

GROUP BY start_time, end_time, initial_metric_name, entity_type, name, tags, context

I feel like now the approach of creating a ID from hashing other fields and then to group by this id and implicitly knowing that we also group by the fields is an indirection we can avoid.

message_count += 1

# Move to next 15-minute step
current_time += timedelta(seconds=30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: comment doesn't seem to match what's happening
suggestion: delete comments on lines 91 and 77

Copy link
Contributor

@lukasgisi lukasgisi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed: refactoring / removing ID can be in a separate PR

@gaetancollaud
Copy link
Member

Also this means we're not idempotent anymore. Windows can be re-generated a lot of time. IMO we should support "at-least-once" semantic and not duplicate. I guess the storage can quickly grow.

Could you elaborate? Supporting "at-least-once" while avoiding duplicates sounds like a contradiction to me.

Maybe I misunderstood. I understood that you want to remove the pk index and do manual clean-up. This sounds like a huge overhead to me. I've to admit it's been a while that I worked on this project, but the way we use windowing means that we will have a lot of kafka message with the same PK (that we have to upsert). So removing this PK will mean a tons of duplicate, I'm I wrong?

Could we use an unique index or something? I guess it's the same, I'm just asking.
Or better could we try to force an index type and not use ART?

DuckDB has only two built-in index types. The ART index is used for highly selective queries, which is what we need to do upserts. It is created automatically for uniqueness constraints and primary key constraints.

Ok too bad

@arov00
Copy link
Contributor Author

arov00 commented May 26, 2025

Maybe I misunderstood. I understood that you want to remove the pk index and do manual clean-up. This sounds like a huge overhead to me. I've to admit it's been a while that I worked on this project, but the way we use windowing means that we will have a lot of kafka message with the same PK (that we have to upsert). So removing this PK will mean a tons of duplicate, I'm I wrong?

You're right we will have duplicates with this approach. Assuming a 1h window and 6 measurements per hour (as is the case in the Strimzi setup) we would probably have 6 values with the same ID, so roughly a 6x data increase. I think that is somewhat manageable, but not ideal.

I agree that constant manual deduplication would be a huge overhead. A more light-weight solution could be to do time-based retention (e.g. once per day delete all data older than N weeks).

An alternative option (or additional enhancement), that I have not explored yet, would be to suppress aggregations in the stream topology until the time window closes. This way, we would not have these intermediate results in DB. Do you have experience with that?

@gaetancollaud
Copy link
Member

An alternative option (or additional enhancement), that I have not explored yet, would be to suppress aggregations in the stream topology until the time window closes. This way, we would not have these intermediate results in DB. Do you have experience with that?

Unfortunately I don't.

Logically speaking I still think that removing the PK is a bad idea, but if it's the only way... I starting to doubt duckdb 😄

@schocco
Copy link
Contributor

schocco commented May 27, 2025

An alternative option (or additional enhancement), that I have not explored yet, would be to suppress aggregations in the stream topology until the time window closes. This way, we would not have these intermediate results in DB. Do you have experience with that?

This works fine as long as there is a constant flow of data (which should be the case with the way metrics are ingested) - the window closes when streams sees an event with a timestamp for the next window.
It will hence introduce a small delay and "incomplete buckets" (i.e. data for the running time window) will not be visible any more.

Rather than disabling the constraint (which is very useful), could we use multiple tables/collections in duckdb and only have the index/constraint for the current one? E.g. introduce a _{{weeknumber}} suffix, then only have the index for ther latest one?
In the code some additional logic would be required:

  • when writing: inspect record timestamp to get the correct tablename for inserting
  • when querying: 1. list tables with common prefix, 2. dynamically construct a union query over all of them (or only the ones that fit the desired time range for better performance)
  • creation of new tables: either as part of a batch job that also handles the index mgmt, or as part of the writing logic to auto-create missing tables

@lukasgisi
Copy link
Contributor

New DuckDb feature might be worth exploring regarding performance characteristics, basically upsert without need for (indexed) primary key:
https://duckdb.org/2025/09/16/announcing-duckdb-140.html#merge-statement
https://duckdb.org/docs/stable/sql/statements/merge_into.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants