Skip to content

Commit

Permalink
Fix and refactor subscriber batch fetching in campaign processing.
Browse files Browse the repository at this point in the history
This has been a hair-pulling rabbit hole of an issue. knadh#1931 and others.
When the `next-campaign-subscribers` query that fetches $n subscribers
per batch for a campaign returns no results, the manager assumes
that the campaign is done and marks as finished.

Marathon debugging revealed fundamental flaws in qyery's logic that
would incorrectly return 0 rows under certain conditions.
- Based on the "layout" of subscribers for eg: a series of blocklisted
  subscribers between confirmed subscribers.
  A series of unconfirmed subscribers in a batch belonging to a double
  opt-in list.
- Bulk import blocklisting users, but not marking their subscriptions
  as 'unsubscribed'.
- Conditions spread across multiple CTEs resulted in returning an
  arbitrary number of rows and $N per batch as the selected $N rows
  would get filtered out elsewhere, possibly even becoming 0.

After fixing this and testing it on our prod instance that has
15 million subscribers and ~70 million subscriptions in the
`subscriber_lists` table, ended up discovered significant inefficiences
in Postgres query planning. When `subscriber_lists` and campaign list IDs
are joined dynamically (CTE or ANY() or any kind of JOIN that involves)
a query, the Postgres query planner is unable to use the right indexes.

After testing dozens of approaches, discovered that statically passing
the values to join on (hardcoding or passing via parametrized $1 vars),
the query uses the right indexes. The difference is staggering.
For the particular scenario on our large prod DB to pull a batch,
~15 seconds vs. ~50ms, a whopping 300x improvement!

This patch splits `next-campaign-subscribers` into two separate queries,
one which fetches campaign metadata and list_ids, whose values are then
passed statically to the next query to fetch subscribers by batch.

In addition, it fixes and refactors broken filtering and counting logic
in `create-campaign` and `next-campaign` queries.

Closes knadh#1931, knadh#1993, knadh#1986.
  • Loading branch information
knadh committed Oct 13, 2024
1 parent ee119b0 commit cea65c0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 76 deletions.
24 changes: 23 additions & 1 deletion cmd/manager_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ type store struct {
h *http.Client
}

type runningCamp struct {
CampaignID int `db:"campaign_id"`
CampaignType string `db:"campaign_type"`
LastSubscriberID int `db:"last_subscriber_id"`
MaxSubscriberID int `db:"max_subscriber_id"`
ListID int `db:"list_id"`
}

func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store {
return &store{
queries: q,
Expand All @@ -42,8 +50,22 @@ func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models
// and every batch takes the last ID of the last batch and fetches the next
// batch above that.
func (s *store) NextSubscribers(campID, limit int) ([]models.Subscriber, error) {
var camps []runningCamp
if err := s.queries.GetRunningCampaign.Select(&camps, campID); err != nil {
return nil, err
}

var listIDs []int
for _, c := range camps {
listIDs = append(listIDs, c.ListID)
}

if len(listIDs) == 0 {
return nil, nil
}

var out []models.Subscriber
err := s.queries.NextCampaignSubscribers.Select(&out, campID, limit)
err := s.queries.NextCampaignSubscribers.Select(&out, camps[0].CampaignID, camps[0].CampaignType, camps[0].LastSubscriberID, camps[0].MaxSubscriberID, pq.Array(listIDs), limit)
return out, err
}

Expand Down
6 changes: 6 additions & 0 deletions internal/migrations/v4.0.0.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (

// V4_0_0 performs the DB migrations.
func V4_0_0(db *sqlx.DB, fs stuffbin.FileSystem, ko *koanf.Koanf, lo *log.Logger) error {
lo.Println("IMPORTANT: this upgrade might take a while if you have a large database. Please be patient ...")

if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_subs_id_status ON subscribers(id, status);`); err != nil {
return err
}

if _, err := db.Exec(`
CREATE EXTENSION IF NOT EXISTS pgcrypto;
Expand Down
1 change: 1 addition & 0 deletions models/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type Queries struct {
DeleteCampaignLinkClicks *sqlx.Stmt `query:"delete-campaign-link-clicks"`

NextCampaigns *sqlx.Stmt `query:"next-campaigns"`
GetRunningCampaign *sqlx.Stmt `query:"get-running-campaign"`
NextCampaignSubscribers *sqlx.Stmt `query:"next-campaign-subscribers"`
GetOneCampaignSubscriber *sqlx.Stmt `query:"get-one-campaign-subscriber"`
UpdateCampaign *sqlx.Stmt `query:"update-campaign"`
Expand Down
148 changes: 73 additions & 75 deletions queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -488,31 +488,23 @@ DELETE FROM lists WHERE id = ALL($1);
-- campaigns
-- name: create-campaign
-- This creates the campaign and inserts campaign_lists relationships.
WITH campLists AS (
-- Get the list_ids and their optin statuses for the campaigns found in the previous step.
SELECT lists.id AS list_id, campaign_id, optin FROM lists
INNER JOIN campaign_lists ON (campaign_lists.list_id = lists.id)
WHERE lists.id = ANY($14::INT[])
),
tpl AS (
WITH tpl AS (
-- If there's no template_id given, use the default template.
SELECT (CASE WHEN $13 = 0 THEN id ELSE $13 END) AS id FROM templates WHERE is_default IS TRUE
),
counts AS (
SELECT COALESCE(COUNT(id), 0) as to_send, COALESCE(MAX(id), 0) as max_sub_id
FROM subscribers
LEFT JOIN campLists ON (campLists.campaign_id = ANY($14::INT[]))
LEFT JOIN subscriber_lists ON (
subscriber_lists.status != 'unsubscribed' AND
subscribers.id = subscriber_lists.subscriber_id AND
subscriber_lists.list_id = campLists.list_id AND

-- For double opt-in lists, consider only 'confirmed' subscriptions. For single opt-ins,
-- any status except for 'unsubscribed' (already excluded above) works.
(CASE WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed' ELSE true END)
)
WHERE subscriber_lists.list_id=ANY($14::INT[])
AND subscribers.status='enabled'
-- This is going to be slow on large databases.
SELECT
COALESCE(COUNT(DISTINCT sl.subscriber_id), 0) AS to_send, COALESCE(MAX(s.id), 0) AS max_sub_id
FROM subscriber_lists sl
JOIN lists l ON sl.list_id = l.id
JOIN subscribers s ON sl.subscriber_id = s.id
WHERE sl.list_id = ANY($14::INT[])
AND s.status != 'blocklisted'
AND (
(l.optin = 'double' AND sl.status = 'confirmed') OR
(l.optin != 'double' AND sl.status != 'unsubscribed')
)
),
camp AS (
INSERT INTO campaigns (uuid, type, name, subject, from_email, body, altbody, content_type, send_at, headers, tags, messenger, template_id, to_send, max_subscriber_id, archive, archive_slug, archive_template_id, archive_meta)
Expand Down Expand Up @@ -672,27 +664,18 @@ campMedia AS (
GROUP BY campaign_id
),
counts AS (
-- For each campaign above, get the total number of subscribers and the max_subscriber_id
-- across all its lists.
SELECT id AS campaign_id,
COUNT(DISTINCT(subscriber_lists.subscriber_id)) AS to_send,
COALESCE(MAX(subscriber_lists.subscriber_id), 0) AS max_subscriber_id
SELECT camps.id AS campaign_id, COUNT(DISTINCT sl.subscriber_id) AS to_send, COALESCE(MAX(sl.subscriber_id), 0) AS max_subscriber_id
FROM camps
LEFT JOIN campLists ON (campLists.campaign_id = camps.id)
LEFT JOIN subscriber_lists ON (
subscriber_lists.list_id = campLists.list_id AND
(CASE
-- For optin campaigns, only e-mail 'unconfirmed' subscribers belonging to 'double' optin lists.
WHEN camps.type = 'optin' THEN subscriber_lists.status = 'unconfirmed' AND campLists.optin = 'double'

-- For regular campaigns with double optin lists, only e-mail 'confirmed' subscribers.
WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed'

-- For regular campaigns with non-double optin lists, e-mail everyone
-- except unsubscribed subscribers.
ELSE subscriber_lists.status != 'unsubscribed'
END)
)
JOIN campLists cl ON cl.campaign_id = camps.id
JOIN subscriber_lists sl ON sl.list_id = cl.list_id
AND (
CASE
WHEN camps.type = 'optin' THEN sl.status = 'unconfirmed' AND cl.optin = 'double'
WHEN cl.optin = 'double' THEN sl.status = 'confirmed'
ELSE sl.status != 'unsubscribed'
END
)
JOIN subscribers s ON (s.id = sl.subscriber_id AND s.status != 'blocklisted')
GROUP BY camps.id
),
updateCounts AS (
Expand Down Expand Up @@ -757,48 +740,63 @@ SELECT COUNT(%s) AS "count", url
WHERE campaign_id=ANY($1) AND link_clicks.created_at >= $2 AND link_clicks.created_at <= $3
GROUP BY links.url ORDER BY "count" DESC LIMIT 50;

-- name: get-running-campaign
-- Returns the metadata for a running campaign that is required by next-campaign-subscribers to retrieve
-- a batch of campaign subscribers for processing.
SELECT campaigns.id AS campaign_id, campaigns.type as campaign_type, last_subscriber_id, max_subscriber_id, lists.id AS list_id
FROM campaigns
LEFT JOIN campaign_lists ON (campaign_lists.campaign_id = campaigns.id)
LEFT JOIN lists ON (lists.id = campaign_lists.list_id)
WHERE campaigns.id = $1 AND status='running';

-- name: next-campaign-subscribers
-- Returns a batch of subscribers in a given campaign starting from the last checkpoint
-- (last_subscriber_id). Every fetch updates the checkpoint and the sent count, which means
-- every fetch returns a new batch of subscribers until all rows are exhausted.
WITH camps AS (
SELECT last_subscriber_id, max_subscriber_id, type FROM campaigns WHERE id = $1 AND status='running'
),
campLists AS (
--
-- In previous versions, get-running-campaign + this was a single query spread across multiple
-- CTEs, but despite numerous permutations and combinations, Postgres query planner simply would not use
-- the right indexes on subscriber_lists when the JOIN or ids were referenced dynamically from campLists
-- (be it a CTE or various kinds of joins). However, statically providing the list IDs to JOIN on ($5::INT[])
-- the query planner works as expected. The difference is staggering. ~15 seconds on a subscribers table with 15m
-- rows and a subscriber_lists table with 70 million rows when fetching subscribers for a campaign with a single list,
-- vs. a few million seconds using this current approach.
WITH campLists AS (
SELECT lists.id AS list_id, optin FROM lists
LEFT JOIN campaign_lists ON (campaign_lists.list_id = lists.id)
LEFT JOIN campaign_lists ON campaign_lists.list_id = lists.id
WHERE campaign_lists.campaign_id = $1
),
subIDs AS (
SELECT DISTINCT ON (subscriber_lists.subscriber_id) subscriber_id, list_id, status FROM subscriber_lists
WHERE
-- ARRAY_AGG is 20x faster instead of a simple SELECT because the query planner
-- understands the CTE's cardinality after the scalar array conversion. Huh.
list_id = ANY((SELECT ARRAY_AGG(list_id) FROM campLists)::INT[]) AND
status != 'unsubscribed' AND
subscriber_id > (SELECT last_subscriber_id FROM camps) AND
subscriber_id <= (SELECT max_subscriber_id FROM camps)
ORDER BY subscriber_id LIMIT $2
),
subs AS (
SELECT subscribers.* FROM subIDs
LEFT JOIN campLists ON (campLists.list_id = subIDs.list_id)
INNER JOIN subscribers ON (
subscribers.status != 'blocklisted' AND
subscribers.id = subIDs.subscriber_id AND

(CASE
-- For optin campaigns, only e-mail 'unconfirmed' subscribers.
WHEN (SELECT type FROM camps) = 'optin' THEN subIDs.status = 'unconfirmed' AND campLists.optin = 'double'

-- For regular campaigns with double optin lists, only e-mail 'confirmed' subscribers.
WHEN campLists.optin = 'double' THEN subIDs.status = 'confirmed'

-- For regular campaigns with non-double optin lists, e-mail everyone
-- except unsubscribed subscribers.
ELSE subIDs.status != 'unsubscribed'
END)
)
SELECT s.*
FROM (
SELECT DISTINCT s.id
FROM subscriber_lists sl
JOIN campLists ON sl.list_id = campLists.list_id
JOIN subscribers s ON s.id = sl.subscriber_id
WHERE
sl.list_id = ANY($5::INT[])
-- last_subscriber_id
AND s.id > $3
-- max_subscriber_id
AND s.id <= $4
-- Subscriber should not be blacklisted.
AND s.status != 'blocklisted'
AND (
-- If it's an optin campaign and the list is double-optin, only pick unconfirmed subscribers.
($2 = 'optin' AND sl.status = 'unconfirmed' AND campLists.optin = 'double')
OR (
-- It is a regular campaign.
$2 != 'optin' AND (
-- It is a double optin list. Only pick confirmed subscribers.
(campLists.optin = 'double' AND sl.status = 'confirmed') OR

-- It is a single optin list. Pick all non-unsubscribed subscribers.
(campLists.optin != 'double' AND sl.status != 'unsubscribed')
)
)
)
ORDER BY s.id LIMIT $6
) subIDs JOIN subscribers s ON (s.id = subIDs.id) ORDER BY s.id
),
u AS (
UPDATE campaigns
Expand Down
1 change: 1 addition & 0 deletions schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ CREATE TABLE subscribers (
);
DROP INDEX IF EXISTS idx_subs_email; CREATE UNIQUE INDEX idx_subs_email ON subscribers(LOWER(email));
DROP INDEX IF EXISTS idx_subs_status; CREATE INDEX idx_subs_status ON subscribers(status);
DROP INDEX IF EXISTS idx_subs_id_status; CREATE INDEX idx_subs_id_status ON subscribers(id, status);
DROP INDEX IF EXISTS idx_subs_created_at; CREATE INDEX idx_subs_created_at ON subscribers(created_at);
DROP INDEX IF EXISTS idx_subs_updated_at; CREATE INDEX idx_subs_updated_at ON subscribers(updated_at);

Expand Down

0 comments on commit cea65c0

Please sign in to comment.