Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions runtime/drivers/sqlite/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path"
"strconv"
"strings"
"time"
)

// Embed migrations directory in the binary
Expand All @@ -17,6 +18,9 @@ var migrationsFS embed.FS
// Name of the table that tracks migrations.
var migrationVersionTable = "runtime_migration_version"

// TTL for AI sessions; sessions with no messages newer than this are deleted on startup.
var aiSessionTTL = 90 * 24 * time.Hour // 3 months

// Migrate implements drivers.Connection.
// Migrate for SQLite may not be safe for concurrent use.
func (c *connection) Migrate(_ context.Context) (err error) {
Expand Down Expand Up @@ -71,6 +75,14 @@ func (c *connection) Migrate(_ context.Context) (err error) {
return err
}
}

// Apply TTL to AI sessions: delete sessions whose newest message is older than aiSessionTTL.
// Related ai_messages rows are removed automatically via ON DELETE CASCADE.
err = c.deleteExpiredAISessions(ctx)
if err != nil {
return fmt.Errorf("failed to delete expired AI sessions: %w", err)
}

return nil
}

Expand Down Expand Up @@ -133,3 +145,32 @@ func (c *connection) MigrationStatus(_ context.Context) (current, desired int, e
func migrationFilenameToVersion(name string) (int, error) {
return strconv.Atoi(strings.TrimSuffix(name, ".sql"))
}

// deleteExpiredAISessions deletes AI sessions that have no messages newer than aiSessionTTL.
// Messages are deleted explicitly before the sessions to avoid reliance on ON DELETE CASCADE.
func (c *connection) deleteExpiredAISessions(ctx context.Context) error {
cutoff := time.Now().UTC().Add(-aiSessionTTL)

// Identify expired sessions: those older than the cutoff with no messages newer than the cutoff.
expiredSessionsQuery := `
SELECT id FROM ai_sessions
WHERE ai_sessions.created_on < ?
AND NOT EXISTS (
SELECT 1 FROM ai_messages
WHERE ai_messages.session_id = ai_sessions.id
AND ai_messages.created_on >= ?
)
`

// Delete messages first, then sessions.
_, err := c.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM ai_messages WHERE session_id IN (%s)`, expiredSessionsQuery), cutoff, cutoff)
if err != nil {
return fmt.Errorf("failed to delete expired AI messages: %w", err)
}
_, err = c.db.ExecContext(ctx, fmt.Sprintf(`DELETE FROM ai_sessions WHERE id IN (%s)`, expiredSessionsQuery), cutoff, cutoff)
if err != nil {
return fmt.Errorf("failed to delete expired AI sessions: %w", err)
}

return nil
}
63 changes: 63 additions & 0 deletions runtime/drivers/sqlite/migrate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package sqlite

import (
"path/filepath"
"testing"
"time"

"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/storage"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestDeleteExpiredAISessions(t *testing.T) {
now := time.Now().UTC()
old := now.Add(-aiSessionTTL - 24*time.Hour)

tmpDir := t.TempDir()
dbPath := filepath.Join(tmpDir, "test.db")
storageDir := filepath.Join(tmpDir, "storage")
cfg := map[string]any{"dsn": dbPath}

// Open the database, run migrations, and seed test data.
h, err := driver{}.Open("", cfg, storage.MustNew(storageDir, nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
require.NoError(t, h.Migrate(t.Context()))

catalog, ok := h.AsCatalogStore("inst")
require.True(t, ok)

// Session 1: old session with only old messages (should be deleted).
require.NoError(t, catalog.InsertAISession(t.Context(), &drivers.AISession{ID: "s1", CreatedOn: old, UpdatedOn: old}))
require.NoError(t, catalog.InsertAIMessage(t.Context(), &drivers.AIMessage{ID: "m1", SessionID: "s1", CreatedOn: old, UpdatedOn: old}))

// Session 2: old session with no messages (should be deleted).
require.NoError(t, catalog.InsertAISession(t.Context(), &drivers.AISession{ID: "s2", CreatedOn: old, UpdatedOn: old}))

// Session 3: old session with a recent message (should be kept).
require.NoError(t, catalog.InsertAISession(t.Context(), &drivers.AISession{ID: "s3", CreatedOn: old, UpdatedOn: now}))
require.NoError(t, catalog.InsertAIMessage(t.Context(), &drivers.AIMessage{ID: "m3", SessionID: "s3", CreatedOn: now, UpdatedOn: now}))

// Session 4: new session with no messages (should be kept).
require.NoError(t, catalog.InsertAISession(t.Context(), &drivers.AISession{ID: "s4", CreatedOn: now, UpdatedOn: now}))

// Close the handle, then re-open and migrate to trigger the TTL cleanup.
require.NoError(t, h.Close())
h, err = driver{}.Open("", cfg, storage.MustNew(storageDir, nil), activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
defer h.Close()
require.NoError(t, h.Migrate(t.Context()))

// Query the database directly to verify results.
db := h.(*connection).db

var sessionIDs []string
require.NoError(t, db.SelectContext(t.Context(), &sessionIDs, `SELECT id FROM ai_sessions ORDER BY id`))
require.Equal(t, []string{"s3", "s4"}, sessionIDs)

var messageIDs []string
require.NoError(t, db.SelectContext(t.Context(), &messageIDs, `SELECT id FROM ai_messages ORDER BY id`))
require.Equal(t, []string{"m3"}, messageIDs)
}
Loading