Skip to content

Commit

Permalink
fix: improve delay before subscriber receives event (#3237)
Browse files Browse the repository at this point in the history
Before the change, the new integration test was showing delays ranging
from 0.2s - 1.2s from when an event is published to when the async call
is created.
After the change we are consistently under 0.2s.
  • Loading branch information
matt2e authored Oct 30, 2024
1 parent 4806b03 commit 4681a10
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 38 deletions.
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

pubSub := pubsub.New(conn, encryption, svc.tasks, optional.Some[pubsub.AsyncCallListener](svc))
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub

svc.registry = artefacts.New(conn)
Expand Down
5 changes: 1 addition & 4 deletions backend/controller/cronjobs/internal/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
Expand Down Expand Up @@ -49,8 +47,7 @@ func TestNewCronJobsForModule(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

scheduler := scheduledtask.New(ctx, key, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute
Expand Down
5 changes: 1 addition & 4 deletions backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/async"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -24,8 +22,7 @@ func TestNoCallToAcquire(t *testing.T) {
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)
scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
dal := New(ctx, conn, encryption, pubSub, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
Expand Down
8 changes: 2 additions & 6 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (

dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -35,8 +33,7 @@ func TestDAL(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
timelineSrv := timeline.New(ctx, conn, encryption)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
Expand Down Expand Up @@ -198,8 +195,7 @@ func TestCreateArtefactConflict(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

timelineSrv := timeline.New(ctx, conn, encryption)
key := model.NewControllerKey("localhost", "8081")
Expand Down
46 changes: 44 additions & 2 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/async"
in "github.com/TBD54566975/ftl/internal/integration"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/alecthomas/assert/v2"
)

func TestPubSub(t *testing.T) {
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestConsumptionDelay(t *testing.T) {
in.Sleep(time.Second*2),

// Get all event created ats, and all async call created ats
// Compare each, make sure none are less than 0.2s of each other
// Compare each, make sure none are less than 0.1s of each other
in.QueryRow("ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
Expand All @@ -79,7 +80,7 @@ func TestConsumptionDelay(t *testing.T) {
SELECT COUNT(*)
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.2;
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) < 0.1;
`, 0),
)
}
Expand Down Expand Up @@ -211,3 +212,44 @@ func TestLeaseFailure(t *testing.T) {
),
)
}

// TestIdlePerformance tests that async calls are created quickly after an event is published
func TestIdlePerformance(t *testing.T) {
in.Run(t,
in.WithLanguages("go"),
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

// publish a number of events with a delay between each
in.Repeat(5, func(t testing.TB, ic in.TestContext) {
in.Call("publisher", "publishOne", in.Obj{}, func(t testing.TB, resp in.Obj) {})(t, ic)
in.Sleep(time.Millisecond*1200)(t, ic)
}),

// compare publication date and consumption date of each event
in.ExpectError(func(t testing.TB, ic in.TestContext) {
badResult := in.GetRow(t, ic, "ftl", `
WITH event_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from topic_events order by created_at
) AS sub_event_times
),
async_call_times AS (
SELECT created_at, ROW_NUMBER() OVER (ORDER BY created_at) AS row_num
FROM (
select * from async_calls ac order by created_at
) AS sub_async_calls
)
SELECT ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at)))
FROM event_times
JOIN async_call_times ON event_times.row_num = async_call_times.row_num
WHERE ABS(EXTRACT(EPOCH FROM (event_times.created_at - async_call_times.created_at))) > 0.2
LIMIT 1;
`, 1)
assert.True(t, false, "async calls should be created quickly after an event is published, but it took %vs", badResult[0])
}, "sql: no rows in result set"), // no rows found means that all events were consumed quickly
)
}
65 changes: 50 additions & 15 deletions backend/controller/pubsub/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pubsub
import (
"context"
"fmt"
"math/rand/v2"
"time"

"github.com/alecthomas/types/optional"
Expand All @@ -22,7 +23,7 @@ const (
// Events can be added simultaneously, which can cause events with out of order create_at values
// By adding a delay, we ensure that by the time we read the events, no new events will be added
// with earlier created_at values.
eventConsumptionDelay = 200 * time.Millisecond
eventConsumptionDelay = 100 * time.Millisecond
)

type Scheduler interface {
Expand All @@ -36,44 +37,78 @@ type AsyncCallListener interface {

type Service struct {
dal *dal.DAL
scheduler Scheduler
asyncCallListener optional.Option[AsyncCallListener]
eventPublished chan struct{}
}

func New(conn libdal.Connection, encryption *encryption.Service, scheduler Scheduler, asyncCallListener optional.Option[AsyncCallListener]) *Service {
func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service, asyncCallListener optional.Option[AsyncCallListener]) *Service {
m := &Service{
dal: dal.New(conn, encryption),
scheduler: scheduler,
asyncCallListener: asyncCallListener,
eventPublished: make(chan struct{}),
}
m.scheduler.Parallel("progress-subs", backoff.Backoff{
Min: 1 * time.Second,
Max: 5 * time.Second,
Jitter: true,
Factor: 1.5,
}, m.progressSubscriptions)
go m.poll(ctx)
return m
}

func (s *Service) progressSubscriptions(ctx context.Context) (time.Duration, error) {
// poll waits for an event to be published (incl eventConsumptionDelay) or for a poll interval to pass
func (s *Service) poll(ctx context.Context) {
logger := log.FromContext(ctx).Scope("pubsub")
var publishedAt optional.Option[time.Time]
for {
var publishTrigger <-chan time.Time
if pub, ok := publishedAt.Get(); ok {
publishTrigger = time.After(time.Until(pub.Add(eventConsumptionDelay)))
}

// poll interval with jitter (1s - 1.1s)
poll := time.Millisecond * (time.Duration(rand.Float64())*(100.0) + 1000.0) //nolint:gosec

select {
case <-ctx.Done():
return

case <-s.eventPublished:
// published an event, so now we wait for eventConsumptionDelay before trying to progress subscriptions
if !publishedAt.Ok() {
publishedAt = optional.Some(time.Now())
}

case <-publishTrigger:
// an event has been published and we have waited for eventConsumptionDelay
if err := s.progressSubscriptions(ctx); err != nil {
logger.Warnf("%s", err)
}
publishedAt = optional.None[time.Time]()

case <-time.After(poll):
if err := s.progressSubscriptions(ctx); err != nil {
logger.Warnf("%s", err)
}
}
}
}

func (s *Service) progressSubscriptions(ctx context.Context) error {
count, err := s.dal.ProgressSubscriptions(ctx, eventConsumptionDelay)
if err != nil {
return 0, fmt.Errorf("progress subscriptions: %w", err)
return fmt.Errorf("progress subscriptions: %w", err)
}
if count > 0 {
// notify controller that we added an async call
if listener, ok := s.asyncCallListener.Get(); ok {
listener.AsyncCallWasAdded(ctx)
}
}
return time.Second, nil
return nil
}

func (s *Service) PublishEventForTopic(ctx context.Context, module, topic, caller string, payload []byte) error {
err := s.dal.PublishEventForTopic(ctx, module, topic, caller, payload)
if err != nil {
return fmt.Errorf("%s.%s: publish: %w", module, topic, err)
}
s.eventPublished <- struct{}{}
return nil
}

Expand All @@ -100,8 +135,8 @@ func (s *Service) OnCallCompletion(ctx context.Context, tx libdal.Connection, or

// AsyncCallDidCommit is called after a subscription's async call has been completed and committed to the database.
func (s *Service) AsyncCallDidCommit(ctx context.Context, origin async.AsyncOriginPubSub) {
if _, err := s.progressSubscriptions(ctx); err != nil {
log.FromContext(ctx).Errorf(err, "failed to progress subscriptions")
if err := s.progressSubscriptions(ctx); err != nil {
log.FromContext(ctx).Scope("pubsub").Errorf(err, "failed to progress subscriptions")
}
}

Expand Down
8 changes: 2 additions & 6 deletions backend/controller/timeline/internal/timeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
controllerdal "github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -38,8 +36,7 @@ func TestTimeline(t *testing.T) {

timeline := timeline2.New(ctx, conn, encryption)
registry := artefacts.New(conn)
scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

key := model.NewControllerKey("localhost", strconv.Itoa(8080+1))
cjs := cronjobs.New(ctx, key, "test.com", encryption, timeline, conn)
Expand Down Expand Up @@ -248,8 +245,7 @@ func TestDeleteOldEvents(t *testing.T) {

timeline := timeline2.New(ctx, conn, encryption)
registry := artefacts.New(conn)
scheduler := scheduledtask.New(ctx, model.ControllerKey{}, leases.NewFakeLeaser())
pubSub := pubsub.New(conn, encryption, scheduler, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
controllerDAL := controllerdal.New(ctx, conn, encryption, pubSub, nil)

var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100)
Expand Down

0 comments on commit 4681a10

Please sign in to comment.