Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions internal/storage/ledger/resource_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,18 @@ func (h accountsResourceHandler) Schema() common.EntitySchema {
}

func (h accountsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
ret := h.store.db.NewSelect().
ret := h.store.newScopedSelect().
ModelTableExpr(h.store.GetPrefixedRelationName("accounts")).
Column("address", "address_array", "first_usage", "insertion_date", "updated_at").
Where("ledger = ?", h.store.ledger.Name)
Column("address", "address_array", "first_usage", "insertion_date", "updated_at")

if opts.PIT != nil && !opts.PIT.IsZero() {
ret = ret.Where("accounts.first_usage <= ?", opts.PIT)
}

if h.store.ledger.HasFeature(features.FeatureAccountMetadataHistory, "SYNC") && opts.PIT != nil && !opts.PIT.IsZero() {
selectDistinctAccountMetadataHistories := h.store.db.NewSelect().
selectDistinctAccountMetadataHistories := h.store.newScopedSelect().
DistinctOn("accounts_address").
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
Where("ledger = ?", h.store.ledger.Name).
Column("accounts_address").
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
Where("date <= ?", opts.PIT)
Expand All @@ -65,9 +63,8 @@ func (h accountsResourceHandler) ResolveFilter(opts common.ResourceQuery[any], o
return fmt.Sprintf("%s %s ?", property, common.ConvertOperatorToSQL(operator)), []any{value}, nil
case balanceRegex.MatchString(property) || property == "balance":

selectBalance := h.store.db.NewSelect().
Where("accounts_address = dataset.address").
Where("ledger = ?", h.store.ledger.Name)
selectBalance := h.store.newScopedSelect().
Where("accounts_address = dataset.address")

if opts.PIT != nil && !opts.PIT.IsZero() {
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
Expand Down Expand Up @@ -122,14 +119,13 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
}
}

selectRowsQuery := h.store.db.NewSelect().
selectRowsQuery := h.store.newScopedSelect().
Where("accounts_address in (select address from dataset)")
if opts.UsePIT() {
selectRowsQuery = selectRowsQuery.
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
Where("ledger = ?", h.store.ledger.Name)
Column("accounts_address", "asset")
if property == "volumes" {
selectRowsQuery = selectRowsQuery.
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes").
Expand All @@ -143,16 +139,15 @@ func (h accountsResourceHandler) Expand(opts common.ResourceQuery[any], property
selectRowsQuery = selectRowsQuery.
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_address").
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes").
Where("ledger = ?", h.store.ledger.Name)
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")
}

return h.store.db.NewSelect().
With("rows", selectRowsQuery).
ModelTableExpr("rows").
Column("accounts_address").
ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + strcase.SnakeCase(property)).
Group("accounts_address"), &common.JoinCondition{
With("rows", selectRowsQuery).
ModelTableExpr("rows").
Column("accounts_address").
ColumnExpr("public.aggregate_objects(json_build_object(asset, json_build_object('input', (volumes).inputs, 'output', (volumes).outputs))::jsonb) as " + strcase.SnakeCase(property)).
Group("accounts_address"), &common.JoinCondition{
Left: "address",
Right: "accounts_address",
}, nil
Expand Down
21 changes: 8 additions & 13 deletions internal/storage/ledger/resource_aggregated_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ func (h aggregatedBalancesResourceRepositoryHandler) Schema() common.EntitySchem
func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.RepositoryHandlerBuildContext[GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) {

if query.UsePIT() {
ret := h.store.db.NewSelect().
ret := h.store.newScopedSelect().
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
Where("ledger = ?", h.store.ledger.Name)
Column("accounts_address", "asset")
if query.Opts.UseInsertionDate {
if !h.store.ledger.HasFeature(features.FeatureMovesHistory, "ON") {
return nil, NewErrMissingFeature(features.FeatureMovesHistory)
Expand All @@ -49,23 +48,21 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R
if query.UseFilter("address", func(value any) bool {
return isPartialAddress(value.(string))
}) {
subQuery := h.store.db.NewSelect().
subQuery := h.store.newScopedSelect().
TableExpr(h.store.GetPrefixedRelationName("accounts")).
Column("address_array").
Where("accounts.address = accounts_address").
Where("ledger = ?", h.store.ledger.Name)
Where("accounts.address = accounts_address")

ret = ret.
ColumnExpr("accounts.address_array as accounts_address_array").
Join(`join lateral (?) accounts on true`, subQuery)
}

if query.UseFilter("metadata") {
subQuery := h.store.db.NewSelect().
subQuery := h.store.newScopedSelect().
DistinctOn("accounts_address").
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
Where("ledger = ?", h.store.ledger.Name).
Where("accounts_metadata.accounts_address = moves.accounts_address").
Where("date <= ?", query.PIT)

Expand All @@ -76,19 +73,17 @@ func (h aggregatedBalancesResourceRepositoryHandler) BuildDataset(query common.R

return ret, nil
} else {
ret := h.store.db.NewSelect().
ret := h.store.newScopedSelect().
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_address").
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes").
Where("ledger = ?", h.store.ledger.Name)
ColumnExpr("(input, output)::"+h.store.GetPrefixedRelationName("volumes")+" as volumes")

if query.UseFilter("metadata") || query.UseFilter("address", func(value any) bool {
return isPartialAddress(value.(string))
}) {
subQuery := h.store.db.NewSelect().
subQuery := h.store.newScopedSelect().
TableExpr(h.store.GetPrefixedRelationName("accounts")).
Column("address").
Where("ledger = ?", h.store.ledger.Name).
Where("accounts.address = accounts_address")

if query.UseFilter("address") {
Expand Down
5 changes: 2 additions & 3 deletions internal/storage/ledger/resource_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ func (h logsResourceHandler) Schema() common.EntitySchema {
}

func (h logsResourceHandler) BuildDataset(_ common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
return h.store.db.NewSelect().
return h.store.newScopedSelect().
ModelTableExpr(h.store.GetPrefixedRelationName("logs")).
ColumnExpr("*").
Where("ledger = ?", h.store.ledger.Name), nil
ColumnExpr("*"), nil
}

func (h logsResourceHandler) ResolveFilter(_ common.ResourceQuery[any], operator, property string, value any) (string, []any, error) {
Expand Down
11 changes: 4 additions & 7 deletions internal/storage/ledger/resource_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (h transactionsResourceHandler) Schema() common.EntitySchema {
}

func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerBuildContext[any]) (*bun.SelectQuery, error) {
ret := h.store.db.NewSelect().
ret := h.store.newScopedSelect().
ModelTableExpr(h.store.GetPrefixedRelationName("transactions")).
Column(
"ledger",
Expand All @@ -44,8 +44,7 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
"destinations",
"sources_arrays",
"destinations_arrays",
).
Where("ledger = ?", h.store.ledger.Name)
)

if slices.Contains(opts.Expand, "volumes") {
ret = ret.Column("post_commit_volumes")
Expand All @@ -56,10 +55,9 @@ func (h transactionsResourceHandler) BuildDataset(opts common.RepositoryHandlerB
}

if h.store.ledger.HasFeature(features.FeatureAccountMetadataHistory, "SYNC") && opts.PIT != nil && !opts.PIT.IsZero() {
selectDistinctTransactionMetadataHistories := h.store.db.NewSelect().
selectDistinctTransactionMetadataHistories := h.store.newScopedSelect().
DistinctOn("transactions_id").
ModelTableExpr(h.store.GetPrefixedRelationName("transactions_metadata")).
Where("ledger = ?", h.store.ledger.Name).
Column("transactions_id", "metadata").
Order("transactions_id", "revision desc").
Where("date <= ?", opts.PIT)
Expand Down Expand Up @@ -130,12 +128,11 @@ func (h transactionsResourceHandler) Expand(_ common.ResourceQuery[any], propert
h.store.db.NewSelect().
TableExpr(
"(?) moves",
h.store.db.NewSelect().
h.store.newScopedSelect().
DistinctOn("transactions_id, accounts_address, asset").
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
Column("transactions_id", "accounts_address", "asset").
ColumnExpr(`first_value(moves.post_commit_effective_volumes) over (partition by (transactions_id, accounts_address, asset) order by seq desc) as post_commit_effective_volumes`).
Where("ledger = ?", h.store.ledger.Name).
Where("transactions_id in (select id from dataset)"),
).
Column("transactions_id", "accounts_address").
Expand Down
17 changes: 6 additions & 11 deletions internal/storage/ledger/resource_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,17 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
return isPartialAddress(value.(string))
})
if !query.UsePIT() && !query.UseOOT() {
selectVolumes = h.store.db.NewSelect().
selectVolumes = h.store.newScopedSelect().
Column("asset", "input", "output").
ColumnExpr("input - output as balance").
ColumnExpr("accounts_address as account").
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_volumes")).
Where("ledger = ?", h.store.ledger.Name).
Order("accounts_address", "asset")

if query.UseFilter("metadata") || query.UseFilter("first_usage") || needAddressSegments {
accountsQuery := h.store.db.NewSelect().
accountsQuery := h.store.newScopedSelect().
TableExpr(h.store.GetPrefixedRelationName("accounts")).
Column("address").
Where("ledger = ?", h.store.ledger.Name).
Where("accounts.address = accounts_address")

if needAddressSegments {
Expand All @@ -70,14 +68,13 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
return nil, NewErrMissingFeature(features.FeatureMovesHistory)
}

selectVolumes = h.store.db.NewSelect().
selectVolumes = h.store.newScopedSelect().
Column("asset").
ColumnExpr("accounts_address as account").
ColumnExpr("sum(case when not is_source then amount else 0 end) as input").
ColumnExpr("sum(case when is_source then amount else 0 end) as output").
ColumnExpr("sum(case when not is_source then amount else -amount end) as balance").
ModelTableExpr(h.store.GetPrefixedRelationName("moves")).
Where("ledger = ?", h.store.ledger.Name).
GroupExpr("accounts_address, asset").
Order("accounts_address", "asset")

Expand All @@ -95,10 +92,9 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
}

if needAddressSegments || query.UseFilter("first_usage") {
accountsQuery := h.store.db.NewSelect().
accountsQuery := h.store.newScopedSelect().
TableExpr(h.store.GetPrefixedRelationName("accounts")).
Where("accounts.address = accounts_address").
Where("ledger = ?", h.store.ledger.Name)
Where("accounts.address = accounts_address")

if needAddressSegments {
accountsQuery = accountsQuery.ColumnExpr("address_array")
Expand All @@ -112,11 +108,10 @@ func (h volumesResourceHandler) BuildDataset(query common.RepositoryHandlerBuild
}

if query.UseFilter("metadata") {
subQuery := h.store.db.NewSelect().
subQuery := h.store.newScopedSelect().
DistinctOn("accounts_address").
ModelTableExpr(h.store.GetPrefixedRelationName("accounts_metadata")).
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
Where("ledger = ?", h.store.ledger.Name).
Where("accounts_metadata.accounts_address = moves.accounts_address")

selectVolumes = selectVolumes.
Expand Down
25 changes: 24 additions & 1 deletion internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,29 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
}
}

// newScopedSelect creates a new select query scoped to the current ledger.
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
// To avoid that, we use a WHERE OR to separate the two cases:
// 1. Check if the ledger is the only one in the bucket
// 2. Otherwise, filter by ledger name
func (store *Store) newScopedSelect() *bun.SelectQuery {
q := store.db.NewSelect()
checkLedgerAlone := store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("count = 1").
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("bucket").
ColumnExpr("COUNT(*) AS count").
Group("bucket"),
).
Where("_system.ledgers.name = ?", store.ledger.Name)

return q.
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
}

func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
ret := &Store{
db: db,
Expand All @@ -174,7 +197,7 @@ func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Sto
}

var err error
ret.checkBucketSchemaHistogram, err = ret.meter.Int64Histogram("store.check_bucket_schema")
ret.checkBucketSchemaHistogram, err = ret.meter.Int64Histogram("store.check_bucket_schema")
if err != nil {
panic(err)
}
Expand Down
Loading