diff --git a/cmd/manager_store.go b/cmd/manager_store.go index 200eddf93..b1d67ef8c 100644 --- a/cmd/manager_store.go +++ b/cmd/manager_store.go @@ -20,6 +20,14 @@ type store struct { h *http.Client } +type runningCamp struct { + CampaignID int `db:"campaign_id"` + CampaignType string `db:"campaign_type"` + LastSubscriberID int `db:"last_subscriber_id"` + MaxSubscriberID int `db:"max_subscriber_id"` + ListID int `db:"list_id"` +} + func newManagerStore(q *models.Queries, c *core.Core, m media.Store) *store { return &store{ queries: q, @@ -42,8 +50,22 @@ func (s *store) NextCampaigns(currentIDs []int64, sentCounts []int64) ([]*models // and every batch takes the last ID of the last batch and fetches the next // batch above that. func (s *store) NextSubscribers(campID, limit int) ([]models.Subscriber, error) { + var camps []runningCamp + if err := s.queries.GetRunningCampaign.Select(&camps, campID); err != nil { + return nil, err + } + + var listIDs []int + for _, c := range camps { + listIDs = append(listIDs, c.ListID) + } + + if len(listIDs) == 0 { + return nil, nil + } + var out []models.Subscriber - err := s.queries.NextCampaignSubscribers.Select(&out, campID, limit) + err := s.queries.NextCampaignSubscribers.Select(&out, camps[0].CampaignID, camps[0].CampaignType, camps[0].LastSubscriberID, camps[0].MaxSubscriberID, pq.Array(listIDs), limit) return out, err } diff --git a/internal/migrations/v4.0.0.go b/internal/migrations/v4.0.0.go index 9cf492088..800e692a2 100644 --- a/internal/migrations/v4.0.0.go +++ b/internal/migrations/v4.0.0.go @@ -15,6 +15,12 @@ import ( // V4_0_0 performs the DB migrations. func V4_0_0(db *sqlx.DB, fs stuffbin.FileSystem, ko *koanf.Koanf, lo *log.Logger) error { + lo.Println("IMPORTANT: this upgrade might take a while if you have a large database. Please be patient ...") + + if _, err := db.Exec(`CREATE INDEX IF NOT EXISTS idx_subs_id_status ON subscribers(id, status);`); err != nil { + return err + } + if _, err := db.Exec(` CREATE EXTENSION IF NOT EXISTS pgcrypto; diff --git a/models/queries.go b/models/queries.go index 4594d76ee..0bce69741 100644 --- a/models/queries.go +++ b/models/queries.go @@ -76,6 +76,7 @@ type Queries struct { DeleteCampaignLinkClicks *sqlx.Stmt `query:"delete-campaign-link-clicks"` NextCampaigns *sqlx.Stmt `query:"next-campaigns"` + GetRunningCampaign *sqlx.Stmt `query:"get-running-campaign"` NextCampaignSubscribers *sqlx.Stmt `query:"next-campaign-subscribers"` GetOneCampaignSubscriber *sqlx.Stmt `query:"get-one-campaign-subscriber"` UpdateCampaign *sqlx.Stmt `query:"update-campaign"` diff --git a/queries.sql b/queries.sql index 96dcf87ec..a0d91bb32 100644 --- a/queries.sql +++ b/queries.sql @@ -488,31 +488,23 @@ DELETE FROM lists WHERE id = ALL($1); -- campaigns -- name: create-campaign -- This creates the campaign and inserts campaign_lists relationships. -WITH campLists AS ( - -- Get the list_ids and their optin statuses for the campaigns found in the previous step. - SELECT lists.id AS list_id, campaign_id, optin FROM lists - INNER JOIN campaign_lists ON (campaign_lists.list_id = lists.id) - WHERE lists.id = ANY($14::INT[]) -), -tpl AS ( +WITH tpl AS ( -- If there's no template_id given, use the default template. SELECT (CASE WHEN $13 = 0 THEN id ELSE $13 END) AS id FROM templates WHERE is_default IS TRUE ), counts AS ( - SELECT COALESCE(COUNT(id), 0) as to_send, COALESCE(MAX(id), 0) as max_sub_id - FROM subscribers - LEFT JOIN campLists ON (campLists.campaign_id = ANY($14::INT[])) - LEFT JOIN subscriber_lists ON ( - subscriber_lists.status != 'unsubscribed' AND - subscribers.id = subscriber_lists.subscriber_id AND - subscriber_lists.list_id = campLists.list_id AND - - -- For double opt-in lists, consider only 'confirmed' subscriptions. For single opt-ins, - -- any status except for 'unsubscribed' (already excluded above) works. - (CASE WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed' ELSE true END) - ) - WHERE subscriber_lists.list_id=ANY($14::INT[]) - AND subscribers.status='enabled' + -- This is going to be slow on large databases. + SELECT + COALESCE(COUNT(DISTINCT sl.subscriber_id), 0) AS to_send, COALESCE(MAX(s.id), 0) AS max_sub_id + FROM subscriber_lists sl + JOIN lists l ON sl.list_id = l.id + JOIN subscribers s ON sl.subscriber_id = s.id + WHERE sl.list_id = ANY($14::INT[]) + AND s.status != 'blocklisted' + AND ( + (l.optin = 'double' AND sl.status = 'confirmed') OR + (l.optin != 'double' AND sl.status != 'unsubscribed') + ) ), camp AS ( INSERT INTO campaigns (uuid, type, name, subject, from_email, body, altbody, content_type, send_at, headers, tags, messenger, template_id, to_send, max_subscriber_id, archive, archive_slug, archive_template_id, archive_meta) @@ -672,27 +664,18 @@ campMedia AS ( GROUP BY campaign_id ), counts AS ( - -- For each campaign above, get the total number of subscribers and the max_subscriber_id - -- across all its lists. - SELECT id AS campaign_id, - COUNT(DISTINCT(subscriber_lists.subscriber_id)) AS to_send, - COALESCE(MAX(subscriber_lists.subscriber_id), 0) AS max_subscriber_id + SELECT camps.id AS campaign_id, COUNT(DISTINCT sl.subscriber_id) AS to_send, COALESCE(MAX(sl.subscriber_id), 0) AS max_subscriber_id FROM camps - LEFT JOIN campLists ON (campLists.campaign_id = camps.id) - LEFT JOIN subscriber_lists ON ( - subscriber_lists.list_id = campLists.list_id AND - (CASE - -- For optin campaigns, only e-mail 'unconfirmed' subscribers belonging to 'double' optin lists. - WHEN camps.type = 'optin' THEN subscriber_lists.status = 'unconfirmed' AND campLists.optin = 'double' - - -- For regular campaigns with double optin lists, only e-mail 'confirmed' subscribers. - WHEN campLists.optin = 'double' THEN subscriber_lists.status = 'confirmed' - - -- For regular campaigns with non-double optin lists, e-mail everyone - -- except unsubscribed subscribers. - ELSE subscriber_lists.status != 'unsubscribed' - END) - ) + JOIN campLists cl ON cl.campaign_id = camps.id + JOIN subscriber_lists sl ON sl.list_id = cl.list_id + AND ( + CASE + WHEN camps.type = 'optin' THEN sl.status = 'unconfirmed' AND cl.optin = 'double' + WHEN cl.optin = 'double' THEN sl.status = 'confirmed' + ELSE sl.status != 'unsubscribed' + END + ) + JOIN subscribers s ON (s.id = sl.subscriber_id AND s.status != 'blocklisted') GROUP BY camps.id ), updateCounts AS ( @@ -757,48 +740,63 @@ SELECT COUNT(%s) AS "count", url WHERE campaign_id=ANY($1) AND link_clicks.created_at >= $2 AND link_clicks.created_at <= $3 GROUP BY links.url ORDER BY "count" DESC LIMIT 50; +-- name: get-running-campaign +-- Returns the metadata for a running campaign that is required by next-campaign-subscribers to retrieve +-- a batch of campaign subscribers for processing. +SELECT campaigns.id AS campaign_id, campaigns.type as campaign_type, last_subscriber_id, max_subscriber_id, lists.id AS list_id + FROM campaigns + LEFT JOIN campaign_lists ON (campaign_lists.campaign_id = campaigns.id) + LEFT JOIN lists ON (lists.id = campaign_lists.list_id) + WHERE campaigns.id = $1 AND status='running'; + -- name: next-campaign-subscribers -- Returns a batch of subscribers in a given campaign starting from the last checkpoint -- (last_subscriber_id). Every fetch updates the checkpoint and the sent count, which means -- every fetch returns a new batch of subscribers until all rows are exhausted. -WITH camps AS ( - SELECT last_subscriber_id, max_subscriber_id, type FROM campaigns WHERE id = $1 AND status='running' -), -campLists AS ( +-- +-- In previous versions, get-running-campaign + this was a single query spread across multiple +-- CTEs, but despite numerous permutations and combinations, Postgres query planner simply would not use +-- the right indexes on subscriber_lists when the JOIN or ids were referenced dynamically from campLists +-- (be it a CTE or various kinds of joins). However, statically providing the list IDs to JOIN on ($5::INT[]) +-- the query planner works as expected. The difference is staggering. ~15 seconds on a subscribers table with 15m +-- rows and a subscriber_lists table with 70 million rows when fetching subscribers for a campaign with a single list, +-- vs. a few million seconds using this current approach. +WITH campLists AS ( SELECT lists.id AS list_id, optin FROM lists - LEFT JOIN campaign_lists ON (campaign_lists.list_id = lists.id) + LEFT JOIN campaign_lists ON campaign_lists.list_id = lists.id WHERE campaign_lists.campaign_id = $1 ), -subIDs AS ( - SELECT DISTINCT ON (subscriber_lists.subscriber_id) subscriber_id, list_id, status FROM subscriber_lists - WHERE - -- ARRAY_AGG is 20x faster instead of a simple SELECT because the query planner - -- understands the CTE's cardinality after the scalar array conversion. Huh. - list_id = ANY((SELECT ARRAY_AGG(list_id) FROM campLists)::INT[]) AND - status != 'unsubscribed' AND - subscriber_id > (SELECT last_subscriber_id FROM camps) AND - subscriber_id <= (SELECT max_subscriber_id FROM camps) - ORDER BY subscriber_id LIMIT $2 -), subs AS ( - SELECT subscribers.* FROM subIDs - LEFT JOIN campLists ON (campLists.list_id = subIDs.list_id) - INNER JOIN subscribers ON ( - subscribers.status != 'blocklisted' AND - subscribers.id = subIDs.subscriber_id AND - - (CASE - -- For optin campaigns, only e-mail 'unconfirmed' subscribers. - WHEN (SELECT type FROM camps) = 'optin' THEN subIDs.status = 'unconfirmed' AND campLists.optin = 'double' - - -- For regular campaigns with double optin lists, only e-mail 'confirmed' subscribers. - WHEN campLists.optin = 'double' THEN subIDs.status = 'confirmed' - - -- For regular campaigns with non-double optin lists, e-mail everyone - -- except unsubscribed subscribers. - ELSE subIDs.status != 'unsubscribed' - END) - ) + SELECT s.* + FROM ( + SELECT DISTINCT s.id + FROM subscriber_lists sl + JOIN campLists ON sl.list_id = campLists.list_id + JOIN subscribers s ON s.id = sl.subscriber_id + WHERE + sl.list_id = ANY($5::INT[]) + -- last_subscriber_id + AND s.id > $3 + -- max_subscriber_id + AND s.id <= $4 + -- Subscriber should not be blacklisted. + AND s.status != 'blocklisted' + AND ( + -- If it's an optin campaign and the list is double-optin, only pick unconfirmed subscribers. + ($2 = 'optin' AND sl.status = 'unconfirmed' AND campLists.optin = 'double') + OR ( + -- It is a regular campaign. + $2 != 'optin' AND ( + -- It is a double optin list. Only pick confirmed subscribers. + (campLists.optin = 'double' AND sl.status = 'confirmed') OR + + -- It is a single optin list. Pick all non-unsubscribed subscribers. + (campLists.optin != 'double' AND sl.status != 'unsubscribed') + ) + ) + ) + ORDER BY s.id LIMIT $6 + ) subIDs JOIN subscribers s ON (s.id = subIDs.id) ORDER BY s.id ), u AS ( UPDATE campaigns diff --git a/schema.sql b/schema.sql index 6862e46cf..1f354e413 100644 --- a/schema.sql +++ b/schema.sql @@ -28,6 +28,7 @@ CREATE TABLE subscribers ( ); DROP INDEX IF EXISTS idx_subs_email; CREATE UNIQUE INDEX idx_subs_email ON subscribers(LOWER(email)); DROP INDEX IF EXISTS idx_subs_status; CREATE INDEX idx_subs_status ON subscribers(status); +DROP INDEX IF EXISTS idx_subs_id_status; CREATE INDEX idx_subs_id_status ON subscribers(id, status); DROP INDEX IF EXISTS idx_subs_created_at; CREATE INDEX idx_subs_created_at ON subscribers(created_at); DROP INDEX IF EXISTS idx_subs_updated_at; CREATE INDEX idx_subs_updated_at ON subscribers(updated_at);