Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions openmeter/customer/adapter/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (a *adapter) ListCustomers(ctx context.Context, input customer.ListCustomer

query := repo.db.Customer.Query().Where(customerdb.Namespace(input.Namespace))
query = WithSubjects(query, now)
query = withSubscription(query, now)
query = WithActiveSubscriptions(query, now)

// Do not return deleted customers by default
if !input.IncludeDeleted {
Expand Down Expand Up @@ -313,7 +313,7 @@ func (a *adapter) GetCustomer(ctx context.Context, input customer.GetCustomerInp

query := repo.db.Customer.Query()
query = WithSubjects(query, now)
query = withSubscription(query, now)
query = WithActiveSubscriptions(query, now)

if input.CustomerID != nil {
query = query.Where(customerdb.Namespace(input.CustomerID.Namespace))
Expand Down Expand Up @@ -389,7 +389,7 @@ func (a *adapter) GetCustomerByUsageAttribution(ctx context.Context, input custo
)).
Where(customerdb.DeletedAtIsNil())
query = WithSubjects(query, now)
query = withSubscription(query, now)
query = WithActiveSubscriptions(query, now)

customerEntity, err := query.First(ctx)
if err != nil {
Expand Down Expand Up @@ -625,8 +625,8 @@ func WithSubjects(q *entdb.CustomerQuery, at time.Time) *entdb.CustomerQuery {
})
}

// withSubscription returns a query with the subscription
func withSubscription(query *entdb.CustomerQuery, at time.Time) *entdb.CustomerQuery {
// WithActiveSubscriptions returns a query with the subscription
func WithActiveSubscriptions(query *entdb.CustomerQuery, at time.Time) *entdb.CustomerQuery {
return query.WithSubscription(func(query *entdb.SubscriptionQuery) {
applyActiveSubscriptionFilter(query, at)
query.WithPlan()
Expand Down
45 changes: 36 additions & 9 deletions openmeter/customer/adapter/entitymapping.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package adapter

import (
"errors"

"github.com/samber/lo"

"github.com/openmeterio/openmeter/openmeter/customer"
Expand All @@ -9,17 +11,40 @@ import (
)

func CustomerFromDBEntity(e db.Customer) (*customer.Customer, error) {
var subjectKeys []string

if e.Edges.Subjects != nil {
subjectKeys = lo.Map(
e.Edges.Subjects,
func(item *db.CustomerSubjects, _ int) string {
return item.SubjectKey
},
)
subjects, err := e.Edges.SubjectsOrErr()
if err != nil {
if db.IsNotLoaded(err) {
return nil, errors.New("subjects must be loaded for customer")
}

return nil, err
}

subjectKeys := lo.FilterMap(subjects, func(item *db.CustomerSubjects, _ int) (string, bool) {
if item == nil {
return "", false
}

return item.SubjectKey, true
})

subscriptions, err := e.Edges.SubscriptionOrErr()
if err != nil {
if db.IsNotLoaded(err) {
return nil, errors.New("subscriptions must be loaded for customer")
}

return nil, err
}

subscriptionIDs := lo.FilterMap(subscriptions, func(item *db.Subscription, _ int) (string, bool) {
if item == nil {
return "", false
}

return item.ID, true
})

var metadata *models.Metadata

if len(e.Metadata) > 0 {
Expand Down Expand Up @@ -49,6 +74,8 @@ func CustomerFromDBEntity(e db.Customer) (*customer.Customer, error) {
Currency: e.Currency,
Metadata: metadata,
Annotation: annotations,

ActiveSubscriptionIDs: subscriptionIDs,
}

if e.Key != "" {
Expand Down
2 changes: 2 additions & 0 deletions openmeter/customer/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Customer struct {
BillingAddress *models.Address `json:"billingAddress,omitempty"`
Metadata *models.Metadata `json:"metadata,omitempty"`
Annotation *models.Annotations `json:"annotations,omitempty"`

ActiveSubscriptionIDs []string
}

// GetUsageAttribution returns the customer usage attribution
Expand Down
11 changes: 11 additions & 0 deletions openmeter/customer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,14 @@ var ErrCustomerSubjectKeyNotSingular = models.NewValidationIssue(
func NewErrCustomerSubjectKeyNotSingular(subjectKeys []string) error {
return ErrCustomerSubjectKeyNotSingular.WithAttr("subject_keys", subjectKeys)
}

const ErrCodeDeletingCustomerWithActiveSubscriptions models.ErrorCode = "deleting_customer_with_active_subscriptions"

var ErrDeletingCustomerWithActiveSubscriptions = models.NewValidationIssue(
ErrCodeDeletingCustomerWithActiveSubscriptions,
"cannot delete customer with active subscriptions",
)

func NewErrDeletingCustomerWithActiveSubscriptions(subscriptionIDs []string) error {
return ErrDeletingCustomerWithActiveSubscriptions.WithAttr("subscriptions", subscriptionIDs)
}
6 changes: 6 additions & 0 deletions openmeter/customer/service/customer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func (s *Service) DeleteCustomer(ctx context.Context, input customer.DeleteCusto
return nil
}

if len(cus.ActiveSubscriptionIDs) > 0 {
return models.NewGenericPreConditionFailedError(
customer.NewErrDeletingCustomerWithActiveSubscriptions(cus.ActiveSubscriptionIDs),
)
}

// Run pre delete hooks
if err = s.hooks.PreDelete(ctx, cus); err != nil {
return err
Expand Down
82 changes: 56 additions & 26 deletions openmeter/entitlement/adapter/entitlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (a *entitlementDBAdapter) GetEntitlement(ctx context.Context, entitlementID
res, err := withAllUsageResets(repo.db.Entitlement.Query(), []string{entitlementID.Namespace}).
WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithActiveSubscriptions(q, clock.Now())
}).
WithSubject().
Where(
Expand Down Expand Up @@ -89,7 +90,8 @@ func (a *entitlementDBAdapter) GetActiveEntitlementOfCustomerAt(ctx context.Cont
func(ctx context.Context, repo *entitlementDBAdapter) (*entitlement.Entitlement, error) {
res, err := withAllUsageResets(repo.db.Entitlement.Query(), []string{namespace}).
WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithSubjects(q, at)
customeradapter.WithActiveSubscriptions(q, at)
}).
WithSubject().
Where(EntitlementActiveAt(at)...).
Expand Down Expand Up @@ -126,14 +128,18 @@ func (a *entitlementDBAdapter) CreateEntitlement(ctx context.Context, ent entitl
ctx,
a,
func(ctx context.Context, repo *entitlementDBAdapter) (*entitlement.Entitlement, error) {
cust, err := repo.db.Customer.Query().
WithSubjects(func(csq *db.CustomerSubjectsQuery) {
csq.Where(customersubjectsdb.DeletedAtIsNil())
}).
Where(customerdb.Namespace(ent.Namespace)).
Where(customerdb.ID(ent.UsageAttribution.ID)).
Where(customerdb.DeletedAtIsNil()).
Only(ctx)
now := clock.Now().UTC()

query := repo.db.Customer.Query().Where(
customerdb.Namespace(ent.Namespace),
customerdb.ID(ent.UsageAttribution.ID),
customerdb.DeletedAtIsNil(),
)

query = customeradapter.WithSubjects(query, now)
query = customeradapter.WithActiveSubscriptions(query, now)

cus, err := query.Only(ctx)
if err != nil {
if db.IsNotFound(err) {
return nil, models.NewGenericNotFoundError(
Expand All @@ -144,12 +150,12 @@ func (a *entitlementDBAdapter) CreateEntitlement(ctx context.Context, ent entitl
return nil, fmt.Errorf("failed to resolve customer: %w", err)
}

custEnt, err := customeradapter.CustomerFromDBEntity(*cust)
entitlementCustomer, err := customeradapter.CustomerFromDBEntity(*cus)
if err != nil {
return nil, fmt.Errorf("failed to map customer: %w", err)
}

subjectKey, err := custEnt.UsageAttribution.GetSubjectKey()
subjectKey, err := entitlementCustomer.UsageAttribution.GetSubjectKey()
if err != nil {
return nil, fmt.Errorf("failed to get subject key: %w", err)
}
Expand All @@ -176,7 +182,7 @@ func (a *entitlementDBAdapter) CreateEntitlement(ctx context.Context, ent entitl
SetNamespace(ent.Namespace).
SetFeatureID(ent.FeatureID).
SetMetadata(ent.Metadata).
SetCustomerID(cust.ID).
SetCustomerID(cus.ID).
SetSubjectID(subj.ID).
SetSubjectKey(subjectKey).
SetFeatureKey(ent.FeatureKey).
Expand Down Expand Up @@ -216,7 +222,8 @@ func (a *entitlementDBAdapter) CreateEntitlement(ctx context.Context, ent entitl
// Query the created entitlement back with customer and subject edges loaded
entWithEdges, err := repo.db.Entitlement.Query().
WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithSubjects(q, now)
customeradapter.WithActiveSubscriptions(q, now)
}).
WithSubject().
Where(db_entitlement.ID(res.ID)).
Expand Down Expand Up @@ -309,7 +316,12 @@ func (a *entitlementDBAdapter) ListEntitlementsAffectedByIngestEvents(ctx contex
db_entitlement.HasFeatureWith(db_feature.MeterSlugIn(pair.MeterSlugs...)),
).
WithFeature().
WithCustomer().
WithCustomer(
func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithActiveSubscriptions(q, clock.Now())
},
).
All(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -339,9 +351,12 @@ func (a *entitlementDBAdapter) GetActiveEntitlementsOfCustomer(ctx context.Conte
ctx,
a,
func(ctx context.Context, repo *entitlementDBAdapter) ([]entitlement.Entitlement, error) {
now := clock.Now().UTC()

res, err := withAllUsageResets(repo.db.Entitlement.Query(), []string{namespace}).
WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithSubjects(q, now)
customeradapter.WithActiveSubscriptions(q, now)
}).
WithSubject().
Where(EntitlementActiveAt(at)...).
Expand Down Expand Up @@ -378,8 +393,11 @@ func (a *entitlementDBAdapter) ListEntitlements(ctx context.Context, params enti
ctx,
a,
func(ctx context.Context, repo *entitlementDBAdapter) (pagination.PagedResponse[entitlement.Entitlement], error) {
now := clock.Now().UTC()

query := repo.db.Entitlement.Query().WithSubject().WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithSubjects(q, now)
customeradapter.WithActiveSubscriptions(q, now)
})

if len(params.Namespaces) > 0 {
Expand All @@ -395,7 +413,7 @@ func (a *entitlementDBAdapter) ListEntitlements(ctx context.Context, params enti
customersubjectsdb.SubjectKeyIn(params.SubjectKeys...),
customersubjectsdb.DeletedAtIsNil(),
),
customerNotDeletedAt(clock.Now()),
customerNotDeletedAt(now),
),
)
}
Expand Down Expand Up @@ -432,15 +450,15 @@ func (a *entitlementDBAdapter) ListEntitlements(ctx context.Context, params enti
}

if !params.IncludeDeleted {
query = query.Where(db_entitlement.Or(db_entitlement.DeletedAtGT(clock.Now()), db_entitlement.DeletedAtIsNil()))
query = query.Where(db_entitlement.Or(db_entitlement.DeletedAtGT(now), db_entitlement.DeletedAtIsNil()))
}

if !params.IncludeDeletedAfter.IsZero() {
query = query.Where(db_entitlement.Or(db_entitlement.DeletedAtGT(params.IncludeDeletedAfter), db_entitlement.DeletedAtIsNil()))
}

if params.ExcludeInactive {
query = query.Where(EntitlementActiveAt(clock.Now())...)
query = query.Where(EntitlementActiveAt(now)...)
}

if params.OrderBy != "" {
Expand Down Expand Up @@ -693,14 +711,21 @@ func (a *entitlementDBAdapter) UpsertEntitlementCurrentPeriods(ctx context.Conte
return fmt.Errorf("%d duplicate entitlement updates provided", len(updates)-len(uniqueUpdates))
}

now := clock.Now()

// We will check that all provided entitlements exist as we don't want to create any, just update the current usage period
entitlements, err := repo.db.Entitlement.Query().
// We're ignoring namespace here but as IDs are globally unique this should be fine
Where(db_entitlement.IDIn(slicesx.Map(updates, func(u entitlement.UpsertEntitlementCurrentPeriodElement) string {
return u.ID
})...),
).
WithCustomer().
WithCustomer(
func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, now)
customeradapter.WithActiveSubscriptions(q, now)
},
).
All(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -773,16 +798,19 @@ func (a *entitlementDBAdapter) ListActiveEntitlementsWithExpiredUsagePeriod(ctx
ctx,
a,
func(ctx context.Context, repo *entitlementDBAdapter) ([]entitlement.Entitlement, error) {
now := clock.Now()

query := withAllUsageResets(repo.db.Entitlement.Query(), params.Namespaces).
WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithSubjects(q, now)
customeradapter.WithActiveSubscriptions(q, now)
}).
WithSubject().
Where(EntitlementActiveAt(params.Highwatermark)...).
Where(
db_entitlement.CurrentUsagePeriodEndNotNil(),
db_entitlement.CurrentUsagePeriodEndLTE(params.Highwatermark),
db_entitlement.Or(db_entitlement.DeletedAtIsNil(), db_entitlement.DeletedAtGT(clock.Now())),
db_entitlement.Or(db_entitlement.DeletedAtIsNil(), db_entitlement.DeletedAtGT(now)),
)

if len(params.Namespaces) > 0 {
Expand Down Expand Up @@ -917,9 +945,11 @@ func (a *entitlementDBAdapter) GetScheduledEntitlements(ctx context.Context, nam
ctx,
a,
func(ctx context.Context, repo *entitlementDBAdapter) (*[]entitlement.Entitlement, error) {
nowTS := clock.Now()
now := clock.Now()

query := repo.db.Entitlement.Query().WithSubject().WithCustomer(func(q *db.CustomerQuery) {
customeradapter.WithSubjects(q, clock.Now())
customeradapter.WithSubjects(q, now)
customeradapter.WithActiveSubscriptions(q, now)
})
query = withAllUsageResets(query, []string{namespace})
res, err := query.
Expand All @@ -930,11 +960,11 @@ func (a *entitlementDBAdapter) GetScheduledEntitlements(ctx context.Context, nam
),
).
Where(
db_entitlement.Or(db_entitlement.DeletedAtIsNil(), db_entitlement.DeletedAtGT(nowTS)),
db_entitlement.Or(db_entitlement.DeletedAtIsNil(), db_entitlement.DeletedAtGT(now)),
db_entitlement.Namespace(namespace),
db_entitlement.HasCustomerWith(
customerdb.Namespace(namespace),
customerNotDeletedAt(nowTS),
customerNotDeletedAt(now),
customerdb.ID(customerID),
),
db_entitlement.FeatureKey(featureKey),
Expand Down
Loading