From 8cda5574ef4305045a32ce2d5e3ab098681a14a4 Mon Sep 17 00:00:00 2001 From: Noah Stride Date: Tue, 15 Aug 2023 18:40:14 +0100 Subject: [PATCH] Finish up support --- go.mod | 2 +- go.sum | 6 ++++++ ingester/handle_actor_profile.go | 3 +-- ingester/ingester.go | 6 +++--- store/gen/candidate_actors.sql.go | 10 +++++++--- store/gen/candidate_posts.sql.go | 2 +- ...down.sql => 000018_self_labelling_support.down.sql} | 0 ...ort.up.sql => 000018_self_labelling_support.up.sql} | 0 store/queries/candidate_actors.sql | 8 +++++--- store/queries/candidate_posts.sql | 2 +- 10 files changed, 25 insertions(+), 14 deletions(-) rename store/migrations/{000015_self_labelling_support.down.sql => 000018_self_labelling_support.down.sql} (100%) rename store/migrations/{000015_self_labelling_support.up.sql => 000018_self_labelling_support.up.sql} (100%) diff --git a/go.mod b/go.mod index c55b75ea..b3711c25 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( cloud.google.com/go/cloudsqlconn v1.2.4 connectrpc.com/connect v1.11.0 connectrpc.com/otelconnect v0.5.0 - github.com/bluesky-social/indigo v0.0.0-20230810233313-ca1f9da51f09 + github.com/bluesky-social/indigo v0.0.0-20230815172301-d9efabf04191 github.com/golang-migrate/migrate/v4 v4.16.2 github.com/google/go-cmp v0.5.9 github.com/gorilla/websocket v1.5.0 diff --git a/go.sum b/go.sum index c2ff84bb..158ddcc0 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,12 @@ github.com/bluesky-social/indigo v0.0.0-20230728184334-c2ae70612145 h1:La+KOOafH github.com/bluesky-social/indigo v0.0.0-20230728184334-c2ae70612145/go.mod h1:R4lTJrJ8FkabyT9oqn3nIB+49xqcGPNdlHLq2huREjY= github.com/bluesky-social/indigo v0.0.0-20230810233313-ca1f9da51f09 h1:9Jh/ez4Ie3+YCGhI9agCQcB97AIHjwJvXAu/gEzdcP4= github.com/bluesky-social/indigo v0.0.0-20230810233313-ca1f9da51f09/go.mod h1:mvd5qjdsWSCYMu8r0Lc+M7FNbBi63mmTMJnPh+F1ghI= +github.com/bluesky-social/indigo v0.0.0-20230811212902-2bdfb5bc4fdc h1:DTZVw4dC40fPdFtQho+++q6B21X58qMMeJKrq2HLwIg= +github.com/bluesky-social/indigo v0.0.0-20230811212902-2bdfb5bc4fdc/go.mod h1:mvd5qjdsWSCYMu8r0Lc+M7FNbBi63mmTMJnPh+F1ghI= +github.com/bluesky-social/indigo v0.0.0-20230815171710-dcd896ab2eb2 h1:gyUHE69cDQc+3DaBfjraoJSH47nIux2z6xt0jRdsSTM= +github.com/bluesky-social/indigo v0.0.0-20230815171710-dcd896ab2eb2/go.mod h1:mvd5qjdsWSCYMu8r0Lc+M7FNbBi63mmTMJnPh+F1ghI= +github.com/bluesky-social/indigo v0.0.0-20230815172301-d9efabf04191 h1:1ecZfDDYbqXLHbbCkLYRSZf2c3rpreqyK/bjb5qKlUY= +github.com/bluesky-social/indigo v0.0.0-20230815172301-d9efabf04191/go.mod h1:mvd5qjdsWSCYMu8r0Lc+M7FNbBi63mmTMJnPh+F1ghI= github.com/brianvoe/gofakeit/v6 v6.20.2 h1:FLloufuC7NcbHqDzVQ42CG9AKryS1gAGCRt8nQRsW+Y= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= diff --git a/ingester/handle_actor_profile.go b/ingester/handle_actor_profile.go index 59763c55..2bce38d7 100644 --- a/ingester/handle_actor_profile.go +++ b/ingester/handle_actor_profile.go @@ -49,8 +49,7 @@ func (fi *FirehoseIngester) handleActorProfileUpdate( IndexedAt: time.Now(), DisplayName: displayName, Description: description, - // TODO: Extract SelfLabels from post - SelfLabels: []string{}, + SelfLabels: selfLabels, }, ) if err != nil { diff --git a/ingester/ingester.go b/ingester/ingester.go index b312e42e..ada0fe12 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/bluesky-social/indigo/events/schedulers/sequential" "strconv" "github.com/bluesky-social/indigo/util" @@ -259,9 +260,8 @@ func (fi *FirehoseIngester) Start(ctx context.Context) error { return nil }, } - return events.HandleRepoStream(ctx, con, &events.SequentialScheduler{ - Do: callbacks.EventHandler, - }) + scheduler := sequential.NewScheduler("main", callbacks.EventHandler) + return events.HandleRepoStream(ctx, con, scheduler) // TODO: sometimes stream exits of own accord, we should attempt to // reconnect and enter an "error state". }) diff --git a/store/gen/candidate_actors.sql.go b/store/gen/candidate_actors.sql.go index 82e0096c..16a61dc3 100644 --- a/store/gen/candidate_actors.sql.go +++ b/store/gen/candidate_actors.sql.go @@ -55,17 +55,19 @@ WITH ap as ( INSERT INTO actor_profiles (actor_did, commit_cid, created_at, indexed_at, display_name, - description) + description, self_labels) VALUES ($1, $2, $3, $4, - $5, $6) + $5, $6, + $7) ON CONFLICT (actor_did, commit_cid) DO UPDATE SET created_at = EXCLUDED.created_at, indexed_at = EXCLUDED.indexed_at, display_name = EXCLUDED.display_name, - description = EXCLUDED.description + description = EXCLUDED.description, + self_labels = EXCLUDED.self_labels RETURNING actor_did, commit_cid) UPDATE candidate_actors ca SET @@ -81,6 +83,7 @@ type CreateLatestActorProfileParams struct { IndexedAt pgtype.Timestamptz DisplayName pgtype.Text Description pgtype.Text + SelfLabels []string } func (q *Queries) CreateLatestActorProfile(ctx context.Context, arg CreateLatestActorProfileParams) error { @@ -91,6 +94,7 @@ func (q *Queries) CreateLatestActorProfile(ctx context.Context, arg CreateLatest arg.IndexedAt, arg.DisplayName, arg.Description, + arg.SelfLabels, ) return err } diff --git a/store/gen/candidate_posts.sql.go b/store/gen/candidate_posts.sql.go index 22188e3a..d4b903ab 100644 --- a/store/gen/candidate_posts.sql.go +++ b/store/gen/candidate_posts.sql.go @@ -67,7 +67,7 @@ WHERE -- Filter by NSFW status. If unspecified, do not filter. AND ($3::BOOLEAN IS NULL OR ((ARRAY ['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) OR - ARRAY ['TODO-TEMPORARY'] && cp.self_labels) = $3) + (ARRAY ['porn', 'nudity', 'suggestive'] && cp.self_labels)) = $3) -- Remove posts newer than the cursor timestamp AND (cp.indexed_at < $4) diff --git a/store/migrations/000015_self_labelling_support.down.sql b/store/migrations/000018_self_labelling_support.down.sql similarity index 100% rename from store/migrations/000015_self_labelling_support.down.sql rename to store/migrations/000018_self_labelling_support.down.sql diff --git a/store/migrations/000015_self_labelling_support.up.sql b/store/migrations/000018_self_labelling_support.up.sql similarity index 100% rename from store/migrations/000015_self_labelling_support.up.sql rename to store/migrations/000018_self_labelling_support.up.sql diff --git a/store/queries/candidate_actors.sql b/store/queries/candidate_actors.sql index 16150a6d..3e610849 100644 --- a/store/queries/candidate_actors.sql +++ b/store/queries/candidate_actors.sql @@ -30,17 +30,19 @@ WITH ap as ( INSERT INTO actor_profiles (actor_did, commit_cid, created_at, indexed_at, display_name, - description) + description, self_labels) VALUES (sqlc.arg(actor_did), sqlc.arg(commit_cid), sqlc.arg(created_at), sqlc.arg(indexed_at), - sqlc.arg(display_name), sqlc.arg(description)) + sqlc.arg(display_name), sqlc.arg(description), + sqlc.arg(self_labels)) ON CONFLICT (actor_did, commit_cid) DO UPDATE SET created_at = EXCLUDED.created_at, indexed_at = EXCLUDED.indexed_at, display_name = EXCLUDED.display_name, - description = EXCLUDED.description + description = EXCLUDED.description, + self_labels = EXCLUDED.self_labels RETURNING actor_did, commit_cid) UPDATE candidate_actors ca SET diff --git a/store/queries/candidate_posts.sql b/store/queries/candidate_posts.sql index 6b515077..fe4440ec 100644 --- a/store/queries/candidate_posts.sql +++ b/store/queries/candidate_posts.sql @@ -35,7 +35,7 @@ WHERE -- Filter by NSFW status. If unspecified, do not filter. AND (sqlc.narg(is_nsfw)::BOOLEAN IS NULL OR ((ARRAY ['nsfw', 'mursuit', 'murrsuit'] && cp.hashtags) OR - ARRAY ['TODO-TEMPORARY'] && cp.self_labels) = @is_nsfw) + (ARRAY ['porn', 'nudity', 'suggestive'] && cp.self_labels)) = @is_nsfw) -- Remove posts newer than the cursor timestamp AND (cp.indexed_at < @cursor_timestamp)