Skip to content

Commit f38748e

Browse files
authored
feat: conditional where on selections (#1100)
* feat: add utils function to centralize the creation of select queries * feat: optimize queries on ledgers alone in their buckets
1 parent 1dabc08 commit f38748e

File tree

6 files changed

+57
-53
lines changed

6 files changed

+57
-53
lines changed

internal/storage/ledger/resource_accounts.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,18 @@ func (h accountsResourceHandler) Schema() common.EntitySchema {
2626
}
2727

2828
func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
29-
ret := h.store.db.NewSelect().
29+
ret := h.store.newScopedSelect().
3030
ModelTableExpr(h.store.GetPrefixedRelationName("accounts")).
31-
Column("address", "address_array", "first_usage", "insertion_date", "updated_at").
32-
Where("ledger = ?", h.store.ledger.Name)
31+
Column("address", "address_array", "first_usage", "insertion_date", "updated_at")
3332

3433
if opts.PIT != nil && !opts.PIT.IsZero() {
3534
ret = ret.Where("accounts.first_usage <= ?", opts.PIT)
3635
}
3736

3837
if h.store.ledger.HasFeature(features.FeatureAccountMetadataHistory, "SYNC") && opts.PIT != nil && !opts.PIT.IsZero() {
39-
selectDistinctAccountMetadataHistories := h.store.db.NewSelect().
38+
selectDistinctAccountMetadataHistories := h.store.newScopedSelect().
4039
DistinctOn("accounts_address").
4140
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
42-
Where("ledger = ?", h.store.ledger.Name).
4341
Column("accounts_address").
4442
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
4543
Where("date <= ?", opts.PIT)
@@ -65,9 +63,8 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o
6563
return fmt.Sprintf("%s %s ?", property, common.ConvertOperatorToSQL(operator)), []any{value}, nil
6664
case balanceRegex.MatchString(property) || property == "balance":
6765

68-
selectBalance := h.store.db.NewSelect().
69-
Where("accounts_address = dataset.address").
70-
Where("ledger = ?", h.store.ledger.Name)
66+
selectBalance := h.store.newScopedSelect().
67+
Where("accounts_address = dataset.address")
7168

7269
if opts.PIT != nil && !opts.PIT.IsZero() {
7370
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
@@ -122,14 +119,13 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
122119
}
123120
}
124121

125-
selectRowsQuery := h.store.db.NewSelect().
122+
selectRowsQuery := h.store.newScopedSelect().
126123
Where("accounts_address in (select address from dataset)")
127124
if opts.UsePIT() {
128125
selectRowsQuery = selectRowsQuery.
129126
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
130127
DistinctOn("accounts_address, asset").
131-
Column("accounts_address", "asset").
132-
Where("ledger = ?", h.store.ledger.Name)
128+
Column("accounts_address", "asset")
133129
if property == "volumes" {
134130
selectRowsQuery = selectRowsQuery.
135131
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes").
@@ -143,16 +139,15 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
143139
selectRowsQuery = selectRowsQuery.
144140
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
145141
Column("asset", "accounts_address").
146-
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes").
147-
Where("ledger = ?", h.store.ledger.Name)
142+
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
148143
}
149144

150145
return h.store.db.NewSelect().
151-
With("rows", selectRowsQuery).
152-
ModelTableExpr("rows").
153-
Column("accounts_address").
154-
ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + strcase.SnakeCase(property)).
155-
Group("accounts_address"), &common.JoinCondition{
146+
With("rows", selectRowsQuery).
147+
ModelTableExpr("rows").
148+
Column("accounts_address").
149+
ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + strcase.SnakeCase(property)).
150+
Group("accounts_address"), &common.JoinCondition{
156151
Left: "address",
157152
Right: "accounts_address",
158153
}, nil

internal/storage/ledger/resource_aggregated_balances.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,10 @@ func (h aggregatedBalancesResourceRepositoryHandler) Schema() common.EntitySchem
2323
func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.RepositoryHandlerBuildContext[GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) {
2424

2525
if query.UsePIT() {
26-
ret := h.store.db.NewSelect().
26+
ret := h.store.newScopedSelect().
2727
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
2828
DistinctOn("accounts_address, asset").
29-
Column("accounts_address", "asset").
30-
Where("ledger = ?", h.store.ledger.Name)
29+
Column("accounts_address", "asset")
3130
if query.Opts.UseInsertionDate {
3231
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
3332
return nil, NewErrMissingFeature(features.FeatureMovesHistory)
@@ -49,23 +48,21 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
4948
if query.UseFilter("address", func(value any) bool {
5049
return isPartialAddress(value.(string))
5150
}) {
52-
subQuery := h.store.db.NewSelect().
51+
subQuery := h.store.newScopedSelect().
5352
TableExpr(h.store.GetPrefixedRelationName("accounts")).
5453
Column("address_array").
55-
Where("accounts.address = accounts_address").
56-
Where("ledger = ?", h.store.ledger.Name)
54+
Where("accounts.address = accounts_address")
5755

5856
ret = ret.
5957
ColumnExpr("accounts.address_array as accounts_address_array").
6058
Join(`join lateral (?) accounts on true`, subQuery)
6159
}
6260

6361
if query.UseFilter("metadata") {
64-
subQuery := h.store.db.NewSelect().
62+
subQuery := h.store.newScopedSelect().
6563
DistinctOn("accounts_address").
6664
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
6765
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
68-
Where("ledger = ?", h.store.ledger.Name).
6966
Where("accounts_metadata.accounts_address = moves.accounts_address").
7067
Where("date <= ?", query.PIT)
7168

@@ -76,19 +73,17 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
7673

7774
return ret, nil
7875
} else {
79-
ret := h.store.db.NewSelect().
76+
ret := h.store.newScopedSelect().
8077
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
8178
Column("asset", "accounts_address").
82-
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes").
83-
Where("ledger = ?", h.store.ledger.Name)
79+
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
8480

8581
if query.UseFilter("metadata") || query.UseFilter("address", func(value any) bool {
8682
return isPartialAddress(value.(string))
8783
}) {
88-
subQuery := h.store.db.NewSelect().
84+
subQuery := h.store.newScopedSelect().
8985
TableExpr(h.store.GetPrefixedRelationName("accounts")).
9086
Column("address").
91-
Where("ledger = ?", h.store.ledger.Name).
9287
Where("accounts.address = accounts_address")
9388

9489
if query.UseFilter("address") {

internal/storage/ledger/resource_logs.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,9 @@ func (h logsResourceHandler) Schema() common.EntitySchema {
2121
}
2222

2323
func (h logsResourceHandler) BuildDataset(_ common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
24-
return h.store.db.NewSelect().
24+
return h.store.newScopedSelect().
2525
ModelTableExpr(h.store.GetPrefixedRelationName("logs")).
26-
ColumnExpr("*").
27-
Where("ledger = ?", h.store.ledger.Name), nil
26+
ColumnExpr("*"), nil
2827
}
2928

3029
func (h logsResourceHandler) ResolveFilter(_ common.ResourceQuery[any], operator, property string, value any) (string, []any, error) {

internal/storage/ledger/resource_transactions.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (h transactionsResourceHandler) Schema() common.EntitySchema {
3131
}
3232

3333
func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
34-
ret := h.store.db.NewSelect().
34+
ret := h.store.newScopedSelect().
3535
ModelTableExpr(h.store.GetPrefixedRelationName("transactions")).
3636
Column(
3737
"ledger",
@@ -45,8 +45,7 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
4545
"destinations",
4646
"sources_arrays",
4747
"destinations_arrays",
48-
).
49-
Where("ledger = ?", h.store.ledger.Name)
48+
)
5049

5150
if slices.Contains(opts.Expand, "volumes") {
5251
ret = ret.Column("post_commit_volumes")
@@ -57,10 +56,9 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
5756
}
5857

5958
if h.store.ledger.HasFeature(features.FeatureAccountMetadataHistory, "SYNC") && opts.PIT != nil && !opts.PIT.IsZero() {
60-
selectDistinctTransactionMetadataHistories := h.store.db.NewSelect().
59+
selectDistinctTransactionMetadataHistories := h.store.newScopedSelect().
6160
DistinctOn("transactions_id").
6261
ModelTableExpr(h.store.GetPrefixedRelationName("transactions_metadata")).
63-
Where("ledger = ?", h.store.ledger.Name).
6462
Column("transactions_id", "metadata").
6563
Order("transactions_id", "revision desc").
6664
Where("date <= ?", opts.PIT)
@@ -134,12 +132,11 @@ func (h transactionsResourceHandler) Expand(_ common.ResourceQuery[any], propert
134132
h.store.db.NewSelect().
135133
TableExpr(
136134
"(?) moves",
137-
h.store.db.NewSelect().
135+
h.store.newScopedSelect().
138136
DistinctOn("transactions_id, accounts_address, asset").
139137
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
140138
Column("transactions_id", "accounts_address", "asset").
141139
ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`).
142-
Where("ledger = ?", h.store.ledger.Name).
143140
Where("transactions_id in (select id from dataset)"),
144141
).
145142
Column("transactions_id", "accounts_address").

internal/storage/ledger/resource_volumes.go

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,17 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
3434
return isPartialAddress(value.(string))
3535
})
3636
if !query.UsePIT() && !query.UseOOT() {
37-
selectVolumes = h.store.db.NewSelect().
37+
selectVolumes = h.store.newScopedSelect().
3838
Column("asset", "input", "output").
3939
ColumnExpr("input - output as balance").
4040
ColumnExpr("accounts_address as account").
4141
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
42-
Where("ledger = ?", h.store.ledger.Name).
4342
Order("accounts_address", "asset")
4443

4544
if query.UseFilter("metadata") || query.UseFilter("first_usage") || needAddressSegments {
46-
accountsQuery := h.store.db.NewSelect().
45+
accountsQuery := h.store.newScopedSelect().
4746
TableExpr(h.store.GetPrefixedRelationName("accounts")).
4847
Column("address").
49-
Where("ledger = ?", h.store.ledger.Name).
5048
Where("accounts.address = accounts_address")
5149

5250
if needAddressSegments {
@@ -70,14 +68,13 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
7068
return nil, NewErrMissingFeature(features.FeatureMovesHistory)
7169
}
7270

73-
selectVolumes = h.store.db.NewSelect().
71+
selectVolumes = h.store.newScopedSelect().
7472
Column("asset").
7573
ColumnExpr("accounts_address as account").
7674
ColumnExpr("sum(case when not is_source then amount else 0 end) as input").
7775
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
7876
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
7977
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
80-
Where("ledger = ?", h.store.ledger.Name).
8178
GroupExpr("accounts_address, asset").
8279
Order("accounts_address", "asset")
8380

@@ -95,10 +92,9 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
9592
}
9693

9794
if needAddressSegments || query.UseFilter("first_usage") {
98-
accountsQuery := h.store.db.NewSelect().
95+
accountsQuery := h.store.newScopedSelect().
9996
TableExpr(h.store.GetPrefixedRelationName("accounts")).
100-
Where("accounts.address = accounts_address").
101-
Where("ledger = ?", h.store.ledger.Name)
97+
Where("accounts.address = accounts_address")
10298

10399
if needAddressSegments {
104100
accountsQuery = accountsQuery.ColumnExpr("address_array")
@@ -112,11 +108,10 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
112108
}
113109

114110
if query.UseFilter("metadata") {
115-
subQuery := h.store.db.NewSelect().
111+
subQuery := h.store.newScopedSelect().
116112
DistinctOn("accounts_address").
117113
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
118114
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
119-
Where("ledger = ?", h.store.ledger.Name).
120115
Where("accounts_metadata.accounts_address = moves.accounts_address")
121116

122117
selectVolumes = selectVolumes.

internal/storage/ledger/store.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,29 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
163163
}
164164
}
165165

166+
// newScopedSelect creates a new select query scoped to the current ledger.
167+
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
168+
// To avoid that, we use a WHERE OR to separate the two cases:
169+
// 1. Check if the ledger is the only one in the bucket
170+
// 2. Otherwise, filter by ledger name
171+
func (store *Store) newScopedSelect() *bun.SelectQuery {
172+
q := store.db.NewSelect()
173+
checkLedgerAlone := store.db.NewSelect().
174+
TableExpr("_system.ledgers").
175+
ColumnExpr("count = 1").
176+
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
177+
store.db.NewSelect().
178+
TableExpr("_system.ledgers").
179+
ColumnExpr("bucket").
180+
ColumnExpr("COUNT(*) AS count").
181+
Group("bucket"),
182+
).
183+
Where("_system.ledgers.name = ?", store.ledger.Name)
184+
185+
return q.
186+
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
187+
}
188+
166189
func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
167190
ret := &Store{
168191
db: db,
@@ -174,7 +197,7 @@ func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Sto
174197
}
175198

176199
var err error
177-
ret.checkBucketSchemaHistogram, err = ret.meter.Int64Histogram("store.check_bucket_schema")
200+
ret.checkBucketSchemaHistogram, err = ret.meter.Int64Histogram("store.check_bucket_schema")
178201
if err != nil {
179202
panic(err)
180203
}

0 commit comments

Comments
 (0)