Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions openmeter/billing/invoicelinesplitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,14 @@ func (i LineOrHierarchy) ChildUniqueReferenceID() *string {

return nil
}

func (i LineOrHierarchy) ServicePeriod() Period {
switch i.t {
case LineOrHierarchyTypeLine:
return i.line.Period
case LineOrHierarchyTypeHierarchy:
return i.splitLineHierarchy.Group.ServicePeriod
}

return Period{}
}
15 changes: 13 additions & 2 deletions openmeter/billing/worker/subscription/phaseiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ type subscriptionItemWithPeriods struct {
subscription.SubscriptionItemView
// References
UniqueID string

PhaseID string
PhaseKey string

PeriodIndex int
ItemVersion int

// Period Information

Expand Down Expand Up @@ -374,7 +379,10 @@ func (it *PhaseIterator) generateForAlignedItemVersionPeriod(ctx context.Context
fmt.Sprintf("v[%d]", version),
fmt.Sprintf("period[%d]", periodIdx),
}, "/"),
PhaseID: it.phase.SubscriptionPhase.ID,
PhaseID: it.phase.SubscriptionPhase.ID,
PhaseKey: it.phase.Spec.PhaseKey,
PeriodIndex: periodIdx,
ItemVersion: version,

ServicePeriod: billing.Period{
Start: servicePeriod.From,
Expand Down Expand Up @@ -465,7 +473,10 @@ func (it *PhaseIterator) generateOneTimeAlignedItem(item subscription.Subscripti
item.Spec.ItemKey,
fmt.Sprintf("v[%d]", versionID),
}, "/"),
PhaseID: it.phase.SubscriptionPhase.ID,
PhaseID: it.phase.SubscriptionPhase.ID,
PhaseKey: it.phase.Spec.PhaseKey,
PeriodIndex: 0,
ItemVersion: versionID,

ServicePeriod: billing.Period{
Start: servicePeriod.From,
Expand Down
129 changes: 122 additions & 7 deletions openmeter/billing/worker/subscription/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"slices"
"strings"
"time"

"github.com/alpacahq/alpacadecimal"
Expand Down Expand Up @@ -271,13 +272,6 @@ func (h *Handler) compareSubscriptionWithExistingLines(ctx context.Context, subs
return nil, nil
}

inScopeLinesByUniqueID, unique := slicesx.UniqueGroupBy(inScopeLines, func(i subscriptionItemWithPeriods) string {
return i.UniqueID
})
if !unique {
return nil, fmt.Errorf("duplicate unique ids in the upcoming lines")
}

existingLinesByUniqueID, unique := slicesx.UniqueGroupBy(
lo.Filter(existingLines, func(l billing.LineOrHierarchy, _ int) bool {
return l.ChildUniqueReferenceID() != nil
Expand All @@ -289,6 +283,19 @@ func (h *Handler) compareSubscriptionWithExistingLines(ctx context.Context, subs
return nil, fmt.Errorf("duplicate unique ids in the existing lines")
}

// let's correct the period start (+invoiceAt) for any upcoming lines if needed
inScopeLines, err = h.correctPeriodStartForUpcomingLines(ctx, subs.Subscription.ID, inScopeLines, existingLinesByUniqueID)
if err != nil {
return nil, fmt.Errorf("correcting period start for upcoming lines: %w", err)
}

inScopeLinesByUniqueID, unique := slicesx.UniqueGroupBy(inScopeLines, func(i subscriptionItemWithPeriods) string {
return i.UniqueID
})
if !unique {
return nil, fmt.Errorf("duplicate unique ids in the upcoming lines")
}

existingLineUniqueIDs := lo.Keys(existingLinesByUniqueID)
inScopeLineUniqueIDs := lo.Keys(inScopeLinesByUniqueID)
// Let's execute the synchronization
Expand Down Expand Up @@ -332,6 +339,114 @@ func (h *Handler) compareSubscriptionWithExistingLines(ctx context.Context, subs
})
}

// correctPeriodStartForUpcomingLines corrects the period start for the upcoming lines, it will adjust the period start for the lines.
//
// The adjustment only happens if the line is subscription managed and has billing.subscription.sync.ignore annotation. This esentially
// allows for reanchoring if the period calculation changes.
func (h *Handler) correctPeriodStartForUpcomingLines(ctx context.Context, subscriptionID string, inScopeLines []subscriptionItemWithPeriods, existingLinesByUniqueID map[string]billing.LineOrHierarchy) ([]subscriptionItemWithPeriods, error) {
for idx, line := range inScopeLines {
if line.PeriodIndex == 0 {
// This is the first period, so we don't need to correct the period start
continue
}

previousPeriodUniqueID := strings.Join([]string{
subscriptionID,
line.PhaseKey,
line.Spec.ItemKey,
fmt.Sprintf("v[%d]", line.ItemVersion),
fmt.Sprintf("period[%d]", line.PeriodIndex-1),
}, "/")

existingPreviousLine, ok := existingLinesByUniqueID[previousPeriodUniqueID]
if !ok {
// This is a new line, so we don't need to correct the period start
continue
}

switch existingPreviousLine.Type() {
case billing.LineOrHierarchyTypeLine:
previousLine, err := existingPreviousLine.AsLine()
if err != nil {
return nil, fmt.Errorf("getting previous line: %w", err)
}

if !h.isLineInScopeForPeriodCorrection(previousLine) {
continue
}
case billing.LineOrHierarchyTypeHierarchy:
hierarchy, err := existingPreviousLine.AsHierarchy()
if err != nil {
return nil, fmt.Errorf("getting previous hierarchy: %w", err)
}

if !h.isHierarchyInScopeForPeriodCorrection(hierarchy) {
continue
}

default:
continue
}

previousServicePeriod := existingPreviousLine.ServicePeriod()

// If the lines are continuous we are fine
if line.ServicePeriod.Start.Equal(previousServicePeriod.End) {
continue
}

// Should not happen as this line is never the first line
if !line.ServicePeriod.Start.Equal(line.BillingPeriod.Start) || !line.FullServicePeriod.Start.Equal(line.BillingPeriod.Start) {
return nil, fmt.Errorf("line[%s] service period start does not match billing period start or full service period start", line.UniqueID)
}

inScopeLines[idx].ServicePeriod.Start = previousServicePeriod.End
inScopeLines[idx].BillingPeriod.Start = previousServicePeriod.End
inScopeLines[idx].FullServicePeriod.Start = previousServicePeriod.End
}

return inScopeLines, nil
}

func (h *Handler) isLineInScopeForPeriodCorrection(line *billing.Line) bool {
if line.ManagedBy != billing.SubscriptionManagedLine {
// We only correct the period start for subscription managed lines, for manual edits
// we should not apply this logic, as the user might have created a setup where the period start
// is no longer valid.
return false
}

if line.Annotations == nil {
// If the previous line is not annotated to be frozen we should not correct the period start
return false
}

val, ok := line.Annotations[billing.AnnotationSubscriptionSyncIgnore]
if !ok {
return false
}

boolVal, ok := val.(bool)
if !ok {
return false
}

return boolVal
}

func (h *Handler) isHierarchyInScopeForPeriodCorrection(hierarchy *billing.SplitLineHierarchy) bool {
servicePeriod := hierarchy.Group.ServicePeriod

// The correction can only happen if the last line the progressively billed group is in scope for the period correction
for _, line := range hierarchy.Lines {
if line.Line.Period.End.Equal(servicePeriod.End) {
return h.isLineInScopeForPeriodCorrection(line.Line)
}
}

return false
}

func (h *Handler) getPatchesFromPlan(p *subscriptionSyncPlan, subs subscription.SubscriptionView, currency currencyx.Calculator, invoiceByID InvoiceByID) ([]linePatch, error) {
patches := make([]linePatch, 0, len(p.LinesToDelete)+len(p.LinesToUpsert))

Expand Down
112 changes: 112 additions & 0 deletions openmeter/billing/worker/subscription/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4462,6 +4462,118 @@ func (s *SubscriptionHandlerTestSuite) TestAlignedSubscriptionProratingBehavior(
})
}

func (s *SubscriptionHandlerTestSuite) TestSyncronizeSubscriptionPeriodAlgorithmChange() {
ctx := s.Context
clock.FreezeTime(s.mustParseTime("2025-01-31T00:00:00Z"))
defer clock.UnFreeze()

// Given
// a subscription started with a monthly in advance flat fee
// the first month is already synced
// When we change the algorithm we use to calculate the period (emulated by an invoice change)
// Then
// The next line will be automatically adjusted to start at the end of the previous period's end

subsView := s.createSubscriptionFromPlanPhases([]productcatalog.Phase{
{
PhaseMeta: s.phaseMeta("first-phase", ""),
RateCards: productcatalog.RateCards{
&productcatalog.UsageBasedRateCard{
RateCardMeta: productcatalog.RateCardMeta{
Key: "in-advance",
Name: "in-advance",
Price: productcatalog.NewPriceFrom(productcatalog.FlatPrice{
Amount: alpacadecimal.NewFromFloat(6),
PaymentTerm: productcatalog.InAdvancePaymentTerm,
}),
},
BillingCadence: datetime.MustParse(s.T(), "P1M"),
},
},
},
})

s.NoError(s.Handler.SyncronizeSubscription(ctx, subsView, clock.Now()))

invoice := s.gatheringInvoice(ctx, s.Namespace, s.Customer.ID)
s.DebugDumpInvoice("gathering invoice", invoice)

invoice, err := s.BillingService.UpdateInvoice(ctx, billing.UpdateInvoiceInput{
Invoice: invoice.InvoiceID(),
EditFn: func(invoice *billing.Invoice) error {
line := invoice.Lines.OrEmpty()[0]
// simulate some faulty behavior (the old algo would have set the end to 03-03, but this way we can test this with both the old and new alog)
line.Period.Start = s.mustParseTime("2025-01-31T00:00:00Z")
line.Period.End = s.mustParseTime("2025-03-02T00:00:00Z")
line.Annotations = models.Annotations{
billing.AnnotationSubscriptionSyncIgnore: true,
}

invoice.Lines = billing.NewLineChildren([]*billing.Line{
line,
})
return nil
},
})
s.NoError(err)

s.DebugDumpInvoice("gathering invoice - updated", invoice)
s.expectLines(invoice, subsView.Subscription.ID, []expectedLine{
{
Matcher: recurringLineMatcher{
PhaseKey: "first-phase",
ItemKey: "in-advance",
},
Qty: mo.Some(1.0),
UnitPrice: mo.Some(6.0),
Periods: []billing.Period{
{
Start: s.mustParseTime("2025-01-31T00:00:00Z"),
End: s.mustParseTime("2025-03-02T00:00:00Z"),
},
},
},
})

// Let's generate the next set of items
clock.FreezeTime(s.mustParseTime("2025-02-28T00:00:00Z"))

s.NoError(s.Handler.SyncronizeSubscription(ctx, subsView, clock.Now()))

invoice = s.gatheringInvoice(ctx, s.Namespace, s.Customer.ID)
s.DebugDumpInvoice("gathering invoice - updated", invoice)

s.expectLines(invoice, subsView.Subscription.ID, []expectedLine{
{
Matcher: recurringLineMatcher{
PhaseKey: "first-phase",
ItemKey: "in-advance",
PeriodMin: 0,
PeriodMax: 1,
},
Qty: mo.Some(1.0),
UnitPrice: mo.Some(6.0),
Periods: []billing.Period{
{
Start: s.mustParseTime("2025-01-31T00:00:00Z"),
End: s.mustParseTime("2025-03-02T00:00:00Z"),
},
{
Start: s.mustParseTime("2025-03-02T00:00:00Z"),
// TODO: Once the period fix is there, this should be 03-31
End: s.mustParseTime("2025-04-03T00:00:00Z"),
},
},

InvoiceAt: mo.Some([]time.Time{
s.mustParseTime("2025-01-31T00:00:00Z"),
// TODO: Once the period fix is there, this should be 03-31
s.mustParseTime("2025-03-02T00:00:00Z"),
}),
},
})
}

type expectedLine struct {
Matcher lineMatcher
Qty mo.Option[float64]
Expand Down