Skip to content

Commit 0e14ff9

Browse files
committed
Tweaks and fixes to background tasks (#4447)
* adjustments * chore: query cache, clippy, fmt
1 parent db738ff commit 0e14ff9

13 files changed

+59
-42
lines changed

apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json

Lines changed: 0 additions & 16 deletions
This file was deleted.

apps/labrinth/.sqlx/query-7910f8c982de4c9c8f3b40cecda741d6d43c3084b7f347e5da31c353cc9bae53.json

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/labrinth/src/database/models/charge_item.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,15 @@ impl DBCharge {
260260
let charge_type = ChargeType::Subscription.as_str();
261261
let res = select_charges_with_predicate!(
262262
r#"
263+
INNER JOIN users_subscriptions us ON us.id = charges.subscription_id
263264
WHERE
264-
charge_type = $1 AND
265+
charges.charge_type = $1 AND
265266
(
266-
(status = 'cancelled' AND due < NOW()) OR
267-
(status = 'expiring' AND due < NOW()) OR
268-
(status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')
267+
(charges.status = 'cancelled' AND charges.due < NOW()) OR
268+
(charges.status = 'expiring' AND charges.due < NOW()) OR
269+
(charges.status = 'failed' AND charges.last_attempt < NOW() - INTERVAL '2 days')
269270
)
271+
AND us.status = 'provisioned'
270272
"#,
271273
charge_type
272274
)
@@ -321,6 +323,7 @@ impl DBCharge {
321323
AND COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) < NOW() - INTERVAL '1 day'
322324
AND u.email IS NOT NULL
323325
AND due - INTERVAL '7 days' > NOW()
326+
AND due - INTERVAL '14 days' < NOW() -- Due between 7 and 14 days from now
324327
ORDER BY COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) ASC
325328
FOR NO KEY UPDATE SKIP LOCKED
326329
LIMIT $1

apps/labrinth/src/database/models/notification_item.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl NotificationBuilder {
6464
ids.notification_id,
6565
ids.user_id,
6666
ids.date_available,
67-
COALESCE(SUM(pv.amount), 0.0) sum
67+
FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents
6868
FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)
6969
LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available
7070
GROUP BY ids.user_id, ids.notification_id, ids.date_available
@@ -81,6 +81,7 @@ impl NotificationBuilder {
8181
'amount', to_jsonb(sum)
8282
) body
8383
FROM period_payouts
84+
WHERE sum > 0
8485
",
8586
&notification_ids[..],
8687
&users_raw_ids[..],

apps/labrinth/src/database/models/notifications_template_item.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ const TEMPLATES_NAMESPACE: &str = "notifications_templates";
77
const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data";
88
const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes
99

10-
#[derive(Clone, Serialize, Deserialize)]
10+
#[derive(Debug, Clone, Serialize, Deserialize)]
11+
1112
pub struct NotificationTemplate {
1213
pub id: i64,
1314
pub channel: NotificationChannel,

apps/labrinth/src/models/v2/notifications.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ pub enum LegacyNotificationBody {
136136
new_owner_organization_id: Option<OrganizationId>,
137137
},
138138
PayoutAvailable {
139-
amount: f64,
139+
amount: u64,
140140
date_available: DateTime<Utc>,
141141
},
142142
Unknown,

apps/labrinth/src/models/v3/notifications.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ pub struct Notification {
2727
pub actions: Vec<NotificationAction>,
2828
}
2929

30-
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
30+
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
3131
#[serde(rename_all = "snake_case")]
3232
pub enum NotificationType {
3333
// If adding a notification type, add a variant in `NotificationBody` of the same name!
@@ -114,6 +114,17 @@ impl NotificationType {
114114
"email_changed" => NotificationType::EmailChanged,
115115
"payment_failed" => NotificationType::PaymentFailed,
116116
"tax_notification" => NotificationType::TaxNotification,
117+
"payout_available" => NotificationType::PayoutAvailable,
118+
"moderation_message_received" => {
119+
NotificationType::ModerationMessageReceived
120+
}
121+
"report_status_updated" => NotificationType::ReportStatusUpdated,
122+
"report_submitted" => NotificationType::ReportSubmitted,
123+
"project_status_approved" => {
124+
NotificationType::ProjectStatusApproved
125+
}
126+
"project_status_neutral" => NotificationType::ProjectStatusNeutral,
127+
"project_transferred" => NotificationType::ProjectTransferred,
117128
"unknown" => NotificationType::Unknown,
118129
_ => NotificationType::Unknown,
119130
}
@@ -223,7 +234,7 @@ pub enum NotificationBody {
223234
},
224235
PayoutAvailable {
225236
date_available: DateTime<Utc>,
226-
amount: f64,
237+
amount: u64,
227238
},
228239
Unknown,
229240
}

apps/labrinth/src/queue/billing.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,8 @@ pub async fn index_subscriptions(
343343
}
344344

345345
for mut c in charges {
346+
processed_charges += 1;
347+
346348
let payment_intent_id = c
347349
.payment_platform_id
348350
.as_deref()
@@ -465,8 +467,6 @@ pub async fn index_subscriptions(
465467
c.tax_drift_loss = Some(drift);
466468
c.tax_platform_id = Some(tax_platform_id);
467469
c.upsert(&mut txn).await?;
468-
469-
processed_charges += 1;
470470
}
471471

472472
txn.commit().await?;
@@ -477,14 +477,17 @@ pub async fn index_subscriptions(
477477
}
478478
}
479479

480-
let tax_charges_index_handle = tokio::spawn(anrok_api_operations(
480+
anrok_api_operations(
481481
pool.clone(),
482482
redis.clone(),
483483
stripe_client.clone(),
484484
anrok_client.clone(),
485-
));
485+
)
486+
.await;
486487

487488
let res = async {
489+
info!("Gathering charges to unprovision");
490+
488491
let mut transaction = pool.begin().await?;
489492
let mut clear_cache_users = Vec::new();
490493

@@ -539,6 +542,8 @@ pub async fn index_subscriptions(
539542
.await?;
540543

541544
for charge in all_charges {
545+
info!("Indexing charge '{}'", to_base62(charge.id.0 as u64));
546+
542547
let Some(subscription) = all_subscriptions
543548
.iter_mut()
544549
.find(|x| Some(x.id) == charge.subscription_id)
@@ -664,12 +669,6 @@ pub async fn index_subscriptions(
664669
warn!("Error indexing subscriptions: {:?}", e);
665670
}
666671

667-
if let Err(error) = tax_charges_index_handle.await
668-
&& error.is_panic()
669-
{
670-
std::panic::resume_unwind(error.into_panic());
671-
}
672-
673672
info!("Done indexing subscriptions");
674673
}
675674

0 commit comments

Comments
 (0)