Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
licensei

go-migrate

sqlc
];

env = {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ require (
github.com/spf13/cobra v1.9.1
github.com/spf13/pflag v1.0.6
github.com/spf13/viper v1.20.1
github.com/sqlc-dev/pqtype v0.3.0
github.com/stretchr/testify v1.10.0
github.com/stripe/stripe-go/v80 v80.2.1
github.com/svix/svix-webhooks v1.66.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2199,6 +2199,8 @@ github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqj
github.com/spiffe/go-spiffe/v2 v2.5.0 h1:N2I01KCUkv1FAjZXJMwh95KK1ZIQLYbPfhaxw8WS0hE=
github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/sqlc-dev/pqtype v0.3.0 h1:b09TewZ3cSnO5+M1Kqq05y0+OjqIptxELaSayg7bmqk=
github.com/sqlc-dev/pqtype v0.3.0/go.mod h1:oyUjp5981ctiL9UYvj1bVvCKi8OXkCa0u645hce7CAs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
Expand Down
147 changes: 147 additions & 0 deletions tools/migrate/billing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package migrate_test

import (
"context"
"database/sql"
"encoding/json"
"os"
"strings"
"testing"

"github.com/oklog/ulid/v2"
"github.com/stretchr/testify/require"

flatfeetoubpflatfeedb "github.com/openmeterio/openmeter/tools/migrate/testdata/billing/flatfeetoubpflatfee/db"
)

func TestMigrateFlatFeesToUBPFlatFees(t *testing.T) {
runner{stops{
{
version: 20250605102416,
direction: directionUp,
action: func(t *testing.T, db *sql.DB) {
loadFixture(t, db, "testdata/billing/flatfeetoubpflatfee/fixture.sql")
},
},
{
version: 20250605131637,
direction: directionUp,
action: func(t *testing.T, db *sql.DB) {
const (
validFlatFeeLineID = "01JWB2ND43KPCHMXHKD0KETF5M"
validFlatFeeLineIDWithInvoicingExternalID = "01JWB2ND43KPCHMXHKCYEQ13WY"
deletedFlatFeeLineID = "01JWB2ND43KPCHMXHKCW3PFW6W"
)

q := flatfeetoubpflatfeedb.New(db)
t.Run("valid flat fee line", func(t *testing.T) {
// Case 1: Valid flat fee line

lines, err := q.GetLineHierarchyByDetailedLineID(t.Context(), validFlatFeeLineID)
require.NoError(t, err)

require.Len(t, lines.DetailedLines, 1)
detailedLine := lines.DetailedLines[0]
ubpLine := lines.Line

// the ulid generated by pgsql is a valid one
parsedID, err := ulid.Parse(ubpLine.ID)
require.NoError(t, err)
require.Equal(t, parsedID.String(), ubpLine.ID)

// Let's fetch the child lines
require.NoError(t, err)

// Let's assert the ubp line
require.Equal(t, detailedLine.ID, validFlatFeeLineID)
require.Equal(t, ubpLine.Namespace, detailedLine.Namespace)

// Field validations (no ID missmatch)
require.NotEqual(t, detailedLine.ID, ubpLine.ID)

// Lines are not deleted
require.False(t, ubpLine.DeletedAt.Valid)

// Detailed line changes
require.Equal(t, detailedLine.ChildUniqueReferenceID.String, "flat-price")
require.False(t, detailedLine.DeletedAt.Valid, "detailed line should not be deleted")
require.Equal(t, detailedLine.ManagedBy, "system")
require.False(t, detailedLine.SubscriptionID.Valid, "detailed line should not have a subscription")
require.False(t, detailedLine.SubscriptionItemID.Valid, "detailed line should not have a subscription item")
require.False(t, detailedLine.SubscriptionPhaseID.Valid, "detailed line should not have a subscription phase")
require.False(t, detailedLine.Metadata.Valid, "detailed line should not have metadata")

// UBP line changes
require.True(t, strings.HasSuffix(ubpLine.ChildUniqueReferenceID.String, "/first-phase/in-advance/v[0]/period[2]"))

// metadata
metadata := map[string]string{}
err = json.Unmarshal(ubpLine.Metadata.RawMessage, &metadata)
require.NoError(t, err)
require.Equal(t, metadata["/openmeter-line-reason"], "add-line-wrapping", "metadata should be added signifying why we have added this line")

// price
price := map[string]string{}
err = json.Unmarshal(ubpLine.Price, &price)
require.NoError(t, err)
require.Equal(t, price["type"], "flat", "price type should be flat")
require.Equal(t, price["amount"], "6", "price amount should be 6")
require.Equal(t, price["paymentTerm"], "in_advance", "price payment term should be in_advance")

// other fields
require.Equal(t, ubpLine.PriceType, "flat", "price type should be flat")
require.Equal(t, ubpLine.Quantity.String, "1", "quantity should be 1")
require.Equal(t, ubpLine.PreLinePeriodQuantity.String, "0", "pre line period quantity should be 0")
require.Equal(t, ubpLine.MeteredQuantity.String, "1", "metered quantity should be 1")
require.Equal(t, ubpLine.MeteredPreLinePeriodQuantity.String, "0", "metered pre line period quantity should be 0")
require.False(t, ubpLine.ParentLineID.Valid, "there should be no parent line for the ubp one")
require.NotEmpty(t, ubpLine.InvoiceID, "the invoice id should be valid")

require.Equal(t, ubpLine.InvoiceID, detailedLine.InvoiceID, "the invoice id should be the same")
require.Equal(t, ubpLine.Amount, detailedLine.Amount)
require.Equal(t, ubpLine.Total, detailedLine.Total)
require.Equal(t, "6", ubpLine.Total)
require.Equal(t, "6", ubpLine.Amount)
})

t.Run("valid flat fee line with invoicing external id", func(t *testing.T) {
lines, err := q.GetLineHierarchyByDetailedLineID(t.Context(), validFlatFeeLineIDWithInvoicingExternalID)
require.NoError(t, err)

require.Len(t, lines.DetailedLines, 1)
detailedLine := lines.DetailedLines[0]
ubpLine := lines.Line

// detailed lines must have invoicing external id set, while the ubp line should not
require.Equal(t, detailedLine.InvoicingAppExternalID.String, "invoicing-external-id", "detailed line should have invoicing external id set")
require.False(t, ubpLine.InvoicingAppExternalID.Valid, "ubp line should not have invoicing external id set")
})

t.Run("deleted flat fee line", func(t *testing.T) {
lines, err := q.GetLineHierarchyByDetailedLineID(t.Context(), deletedFlatFeeLineID)
require.NoError(t, err)

require.Len(t, lines.DetailedLines, 1)
detailedLine := lines.DetailedLines[0]
ubpLine := lines.Line

require.True(t, ubpLine.DeletedAt.Valid, "ubp line should be deleted")
require.True(t, detailedLine.DeletedAt.Valid, "detailed line should be deleted")
require.Equal(t, detailedLine.DeletedAt.Time, ubpLine.DeletedAt.Time, "detailed line and ubp line should have the same deleted at")
})
},
},
}}.Test(t)
}

// loadFixture loads a fixture SQL file into the database
//
// If you find a better alternative, please replace this function, as it does not understand the SQL syntax, and
// it's barely enough to load the fixture.
func loadFixture(t *testing.T, db *sql.DB, fixturePath string) {
sqlBytes, err := os.ReadFile(fixturePath)
require.NoError(t, err)

_, err = db.ExecContext(context.Background(), string(sqlBytes))
require.NoError(t, err)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- requires manual revert as we might have created additional lines in this format.
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@

--- Source: https://github.com/geckoboard/pgulid/blob/master/pgulid.sql

CREATE EXTENSION IF NOT EXISTS pgcrypto;

-- Note: pg_temp only exists for the current session, so we don't need to clean up the function after the migration is done
CREATE OR REPLACE FUNCTION pg_temp.generate_ulid()
RETURNS TEXT
AS $$
DECLARE
-- Crockford's Base32
encoding BYTEA = '0123456789ABCDEFGHJKMNPQRSTVWXYZ';
timestamp BYTEA = E'\\000\\000\\000\\000\\000\\000';
output TEXT = '';

unix_time BIGINT;
ulid BYTEA;
BEGIN
-- 6 timestamp bytes
unix_time = (EXTRACT(EPOCH FROM CLOCK_TIMESTAMP()) * 1000)::BIGINT;
timestamp = SET_BYTE(timestamp, 0, (unix_time >> 40)::BIT(8)::INTEGER);
timestamp = SET_BYTE(timestamp, 1, (unix_time >> 32)::BIT(8)::INTEGER);
timestamp = SET_BYTE(timestamp, 2, (unix_time >> 24)::BIT(8)::INTEGER);
timestamp = SET_BYTE(timestamp, 3, (unix_time >> 16)::BIT(8)::INTEGER);
timestamp = SET_BYTE(timestamp, 4, (unix_time >> 8)::BIT(8)::INTEGER);
timestamp = SET_BYTE(timestamp, 5, unix_time::BIT(8)::INTEGER);

-- 10 entropy bytes
ulid = timestamp || gen_random_bytes(10);

-- Encode the timestamp
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 224) >> 5));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 0) & 31)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 1) & 248) >> 3));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 1) & 7) << 2) | ((GET_BYTE(ulid, 2) & 192) >> 6)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 2) & 62) >> 1));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 2) & 1) << 4) | ((GET_BYTE(ulid, 3) & 240) >> 4)));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 3) & 15) << 1) | ((GET_BYTE(ulid, 4) & 128) >> 7)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 4) & 124) >> 2));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 4) & 3) << 3) | ((GET_BYTE(ulid, 5) & 224) >> 5)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 5) & 31)));

-- Encode the entropy
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 6) & 248) >> 3));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 6) & 7) << 2) | ((GET_BYTE(ulid, 7) & 192) >> 6)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 7) & 62) >> 1));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 7) & 1) << 4) | ((GET_BYTE(ulid, 8) & 240) >> 4)));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 8) & 15) << 1) | ((GET_BYTE(ulid, 9) & 128) >> 7)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 9) & 124) >> 2));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 9) & 3) << 3) | ((GET_BYTE(ulid, 10) & 224) >> 5)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 10) & 31)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 11) & 248) >> 3));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 11) & 7) << 2) | ((GET_BYTE(ulid, 12) & 192) >> 6)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 12) & 62) >> 1));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 12) & 1) << 4) | ((GET_BYTE(ulid, 13) & 240) >> 4)));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 13) & 15) << 1) | ((GET_BYTE(ulid, 14) & 128) >> 7)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 14) & 124) >> 2));
output = output || CHR(GET_BYTE(encoding, ((GET_BYTE(ulid, 14) & 3) << 3) | ((GET_BYTE(ulid, 15) & 224) >> 5)));
output = output || CHR(GET_BYTE(encoding, (GET_BYTE(ulid, 15) & 31)));

RETURN output;
END
$$
LANGUAGE plpgsql
VOLATILE;

-- Recommended to create a backup table, but we don't have a way to revert this migration.
--
-- CREATE TABLE IF NOT EXISTS billing_backup_migrated_flat_fees
-- AS
-- SELECT l.*, c.per_unit_amount, c.category, c.payment_term, c.index
-- FROM billing_invoice_lines l JOIN billing_invoice_flat_fee_line_configs c ON (l.fee_line_config_id = c.id)
-- WHERE type = 'flat_fee' AND status = 'valid';

CREATE OR REPLACE FUNCTION pg_temp.migrate_flat_fees_to_ubp_flat_fees(idToMigrate TEXT)
RETURNS TEXT
AS $$
DECLARE
existing_flat_fee RECORD;
new_usage_based_line billing_invoice_lines%ROWTYPE;
ubp_flat_fee_config_id TEXT;
ubp_line_id TEXT;
updated_metadata jsonb;
metadata_in jsonb;
BEGIN
SELECT
l.*,
c.per_unit_amount, c.payment_term
INTO existing_flat_fee
FROM
billing_invoice_lines l JOIN billing_invoice_flat_fee_line_configs c ON (l.fee_line_config_id = c.id)
WHERE
l.id = idToMigrate;

IF existing_flat_fee IS NULL THEN
RAISE EXCEPTION 'Flat fee line with id % not found', idToMigrate;
END IF;

-- let's generate a new usagebased line config first

ubp_flat_fee_config_id = pg_temp.generate_ulid();

INSERT INTO
billing_invoice_usage_based_line_configs (id, namespace, price_type, price, pre_line_period_quantity, metered_quantity, metered_pre_line_period_quantity)
VALUES (
ubp_flat_fee_config_id,
existing_flat_fee.namespace,
'flat',
format('{"type": "flat", "amount": "%s", "paymentTerm": "%s"}', existing_flat_fee.per_unit_amount, existing_flat_fee.payment_term)::jsonb,
0, -- pre_line_period_quantity
1, -- metered_quantity
0 -- metered_pre_line_period_quantity
);

-- let's create a new usagebased line

ubp_line_id = pg_temp.generate_ulid();
IF existing_flat_fee.metadata = 'null'::jsonb THEN
metadata_in = '{}'::jsonb;
ELSE
metadata_in = coalesce(existing_flat_fee.metadata, '{}'::jsonb);
END IF;

updated_metadata = jsonb_insert(metadata_in, '{/openmeter-line-reason}', '"add-line-wrapping"');
INSERT INTO billing_invoice_lines (
id,
namespace,
metadata,
created_at,
updated_at,
deleted_at,
name,
description,
period_start,
period_end,
invoice_at,
type,
status,
currency,
quantity,
tax_config,
invoice_id,
fee_line_config_id,
usage_based_line_config_id,
parent_line_id,
child_unique_reference_id,
amount,
taxes_total,
taxes_inclusive_total,
taxes_exclusive_total,
charges_total,
discounts_total,
total,
invoicing_app_external_id,
subscription_id,
subscription_item_id,
subscription_phase_id,
line_ids,
managed_by,
ratecard_discounts)
VALUES (
ubp_line_id,
existing_flat_fee.namespace,
updated_metadata,
existing_flat_fee.created_at,
existing_flat_fee.updated_at,
existing_flat_fee.deleted_at,
existing_flat_fee.name,
existing_flat_fee.description,
existing_flat_fee.period_start,
existing_flat_fee.period_end,
existing_flat_fee.invoice_at,
'usage_based',
'valid',
existing_flat_fee.currency,
existing_flat_fee.quantity,
existing_flat_fee.tax_config,
existing_flat_fee.invoice_id,
null, -- fee_line_config_id
ubp_flat_fee_config_id,
existing_flat_fee.parent_line_id, -- parent_line_id
existing_flat_fee.child_unique_reference_id,
existing_flat_fee.amount,
existing_flat_fee.taxes_total,
existing_flat_fee.taxes_inclusive_total,
existing_flat_fee.taxes_exclusive_total,
existing_flat_fee.charges_total,
existing_flat_fee.discounts_total,
existing_flat_fee.total,
NULL, -- invoicing_app_external_id (the flat_fee detailed line is syncronized to external systems not this specific line)
existing_flat_fee.subscription_id,
existing_flat_fee.subscription_item_id,
existing_flat_fee.subscription_phase_id,
existing_flat_fee.line_ids,
existing_flat_fee.managed_by,
existing_flat_fee.ratecard_discounts
);

-- let's convert the flat fee line into a detailed line

UPDATE billing_invoice_lines SET
status = 'detailed',
parent_line_id = ubp_line_id,
child_unique_reference_id = 'flat-price', -- FlatPriceChildUniqueReferenceID
subscription_id = NULL,
subscription_item_id = NULL,
subscription_phase_id = NULL,
metadata = NULL,
managed_by = 'system'
WHERE
id = idToMigrate;

RETURN ubp_line_id;
END
$$
LANGUAGE plpgsql
VOLATILE;


--- Let's do the migraiton

SELECT pg_temp.migrate_flat_fees_to_ubp_flat_fees(id) FROM billing_invoice_lines WHERE type = 'flat_fee' AND status = 'valid';
Loading
Loading