@@ -53,6 +53,65 @@ CREATE INDEX idx_workflow_runs_profile_id ON public.workflow_runs USING btree (p
53
53
-- Index for efficient upsert lookups (supports the UNIQUE constraint)
54
54
CREATE INDEX idx_workflow_runs_unique_lookup ON public .workflow_runs USING btree (workflow_run_id, run_attempt, class_id);
55
55
56
+ -- Backfill workflow_runs table with existing data from the materialized view
57
+ -- This preserves historical data before we drop the view
58
+ INSERT INTO public .workflow_runs (
59
+ workflow_run_id,
60
+ class_id,
61
+ repository_name,
62
+ workflow_name,
63
+ workflow_path,
64
+ head_sha,
65
+ head_branch,
66
+ run_number,
67
+ run_attempt,
68
+ actor_login,
69
+ triggering_actor_login,
70
+ assignment_id,
71
+ profile_id,
72
+ requested_at,
73
+ in_progress_at,
74
+ completed_at,
75
+ conclusion,
76
+ queue_time_seconds,
77
+ run_time_seconds,
78
+ created_at,
79
+ updated_at
80
+ )
81
+ SELECT
82
+ workflow_run_id,
83
+ class_id,
84
+ repository_name,
85
+ workflow_name,
86
+ workflow_path,
87
+ head_sha,
88
+ head_branch,
89
+ run_number,
90
+ run_attempt,
91
+ actor_login,
92
+ triggering_actor_login,
93
+ assignment_id,
94
+ profile_id,
95
+ requested_at,
96
+ in_progress_at,
97
+ completed_at,
98
+ conclusion,
99
+ queue_time_seconds,
100
+ run_time_seconds,
101
+ NOW() as created_at,
102
+ NOW() as updated_at
103
+ FROM public .workflow_events_summary
104
+ ON CONFLICT ON CONSTRAINT workflow_runs_unique_run DO NOTHING;
105
+
106
+ -- Log the backfill results
107
+ DO $$
108
+ DECLARE
109
+ backfilled_count bigint ;
110
+ BEGIN
111
+ SELECT COUNT (* ) INTO backfilled_count FROM public .workflow_runs ;
112
+ RAISE NOTICE ' Backfilled % workflow runs from materialized view' , backfilled_count;
113
+ END $$;
114
+
56
115
-- Enable RLS
57
116
ALTER TABLE public .workflow_runs ENABLE ROW LEVEL SECURITY;
58
117
@@ -105,9 +164,11 @@ BEGIN
105
164
106
165
-- Event-specific upsert: only update fields relevant to each event type
107
166
-- This avoids unnecessary COALESCE operations and reduces UPDATE overhead
167
+ -- CRITICAL: Handles out-of-order webhook delivery (retries, race conditions)
108
168
CASE NEW .event_type
109
169
WHEN ' requested' THEN
110
170
-- First event: INSERT with metadata, only requested_at timestamp
171
+ -- If row exists (in_progress/completed arrived first), backfill requested_at and metadata
111
172
INSERT INTO public .workflow_runs (
112
173
workflow_run_id, class_id, run_attempt, repository_name,
113
174
workflow_name, workflow_path, head_sha, head_branch, run_number,
@@ -121,10 +182,35 @@ BEGIN
121
182
NEW .updated_at , NULL , NULL , NULL ,
122
183
NULL , NULL , NOW(), NOW()
123
184
)
124
- ON CONFLICT ON CONSTRAINT workflow_runs_unique_run DO NOTHING;
185
+ ON CONFLICT ON CONSTRAINT workflow_runs_unique_run
186
+ DO UPDATE SET
187
+ -- Backfill requested_at if it's NULL (out-of-order arrival)
188
+ requested_at = COALESCE(workflow_runs .requested_at , EXCLUDED .requested_at ),
189
+ -- Backfill metadata fields if NULL (they may be missing if later events arrived first)
190
+ repository_name = COALESCE(workflow_runs .repository_name , EXCLUDED .repository_name ),
191
+ workflow_name = COALESCE(workflow_runs .workflow_name , EXCLUDED .workflow_name ),
192
+ workflow_path = COALESCE(workflow_runs .workflow_path , EXCLUDED .workflow_path ),
193
+ head_sha = COALESCE(workflow_runs .head_sha , EXCLUDED .head_sha ),
194
+ head_branch = COALESCE(workflow_runs .head_branch , EXCLUDED .head_branch ),
195
+ run_number = COALESCE(workflow_runs .run_number , EXCLUDED .run_number ),
196
+ actor_login = COALESCE(workflow_runs .actor_login , EXCLUDED .actor_login ),
197
+ triggering_actor_login = COALESCE(workflow_runs .triggering_actor_login , EXCLUDED .triggering_actor_login ),
198
+ assignment_id = COALESCE(workflow_runs .assignment_id , EXCLUDED .assignment_id ),
199
+ profile_id = COALESCE(workflow_runs .profile_id , EXCLUDED .profile_id ),
200
+ -- Recalculate queue_time if we now have both timestamps and it was NULL
201
+ queue_time_seconds = CASE
202
+ WHEN workflow_runs .queue_time_seconds IS NULL
203
+ AND workflow_runs .in_progress_at IS NOT NULL
204
+ AND EXCLUDED .requested_at IS NOT NULL
205
+ THEN EXTRACT(EPOCH FROM (workflow_runs .in_progress_at - EXCLUDED .requested_at ))
206
+ ELSE workflow_runs .queue_time_seconds
207
+ END,
208
+ updated_at = NOW();
125
209
126
210
WHEN ' in_progress' THEN
127
- -- Second event: only update in_progress_at and calculate queue time
211
+ -- Second event: update in_progress_at and calculate queue time
212
+ -- If row exists (requested/completed arrived first), backfill in_progress_at
213
+ -- If row doesn't exist yet (requested not received), create with available data
128
214
INSERT INTO public .workflow_runs (
129
215
workflow_run_id, class_id, run_attempt, repository_name,
130
216
workflow_name, workflow_path, head_sha, head_branch, run_number,
@@ -140,17 +226,33 @@ BEGIN
140
226
)
141
227
ON CONFLICT ON CONSTRAINT workflow_runs_unique_run
142
228
DO UPDATE SET
143
- in_progress_at = EXCLUDED .in_progress_at ,
144
- -- Calculate queue time if we now have both timestamps
229
+ -- Always update in_progress_at (may arrive late or be a retry)
230
+ in_progress_at = COALESCE(workflow_runs .in_progress_at , EXCLUDED .in_progress_at ),
231
+ -- Backfill metadata fields if NULL (requested event may not have arrived yet)
232
+ repository_name = COALESCE(workflow_runs .repository_name , EXCLUDED .repository_name ),
233
+ workflow_name = COALESCE(workflow_runs .workflow_name , EXCLUDED .workflow_name ),
234
+ workflow_path = COALESCE(workflow_runs .workflow_path , EXCLUDED .workflow_path ),
235
+ head_sha = COALESCE(workflow_runs .head_sha , EXCLUDED .head_sha ),
236
+ head_branch = COALESCE(workflow_runs .head_branch , EXCLUDED .head_branch ),
237
+ run_number = COALESCE(workflow_runs .run_number , EXCLUDED .run_number ),
238
+ actor_login = COALESCE(workflow_runs .actor_login , EXCLUDED .actor_login ),
239
+ triggering_actor_login = COALESCE(workflow_runs .triggering_actor_login , EXCLUDED .triggering_actor_login ),
240
+ assignment_id = COALESCE(workflow_runs .assignment_id , EXCLUDED .assignment_id ),
241
+ profile_id = COALESCE(workflow_runs .profile_id , EXCLUDED .profile_id ),
242
+ -- Calculate queue time if we now have both timestamps and it was NULL
145
243
queue_time_seconds = CASE
146
- WHEN workflow_runs .requested_at IS NOT NULL AND EXCLUDED .in_progress_at IS NOT NULL
244
+ WHEN workflow_runs .queue_time_seconds IS NULL
245
+ AND workflow_runs .requested_at IS NOT NULL
246
+ AND EXCLUDED .in_progress_at IS NOT NULL
147
247
THEN EXTRACT(EPOCH FROM (EXCLUDED .in_progress_at - workflow_runs .requested_at ))
148
248
ELSE workflow_runs .queue_time_seconds
149
249
END,
150
250
updated_at = NOW();
151
251
152
252
WHEN ' completed' THEN
153
- -- Third event: only update completed_at, conclusion, and calculate run time
253
+ -- Third event: update completed_at, conclusion, and calculate run time
254
+ -- If row exists (requested/in_progress arrived first), backfill completed_at
255
+ -- If row doesn't exist yet, create with available data
154
256
INSERT INTO public .workflow_runs (
155
257
workflow_run_id, class_id, run_attempt, repository_name,
156
258
workflow_name, workflow_path, head_sha, head_branch, run_number,
@@ -166,11 +268,25 @@ BEGIN
166
268
)
167
269
ON CONFLICT ON CONSTRAINT workflow_runs_unique_run
168
270
DO UPDATE SET
169
- completed_at = EXCLUDED .completed_at ,
170
- conclusion = EXCLUDED .conclusion ,
171
- -- Calculate run time if we now have both timestamps
271
+ -- Always update completed_at and conclusion (may arrive late or be a retry)
272
+ completed_at = COALESCE(workflow_runs .completed_at , EXCLUDED .completed_at ),
273
+ conclusion = COALESCE(workflow_runs .conclusion , EXCLUDED .conclusion ),
274
+ -- Backfill metadata fields if NULL (earlier events may not have arrived yet)
275
+ repository_name = COALESCE(workflow_runs .repository_name , EXCLUDED .repository_name ),
276
+ workflow_name = COALESCE(workflow_runs .workflow_name , EXCLUDED .workflow_name ),
277
+ workflow_path = COALESCE(workflow_runs .workflow_path , EXCLUDED .workflow_path ),
278
+ head_sha = COALESCE(workflow_runs .head_sha , EXCLUDED .head_sha ),
279
+ head_branch = COALESCE(workflow_runs .head_branch , EXCLUDED .head_branch ),
280
+ run_number = COALESCE(workflow_runs .run_number , EXCLUDED .run_number ),
281
+ actor_login = COALESCE(workflow_runs .actor_login , EXCLUDED .actor_login ),
282
+ triggering_actor_login = COALESCE(workflow_runs .triggering_actor_login , EXCLUDED .triggering_actor_login ),
283
+ assignment_id = COALESCE(workflow_runs .assignment_id , EXCLUDED .assignment_id ),
284
+ profile_id = COALESCE(workflow_runs .profile_id , EXCLUDED .profile_id ),
285
+ -- Calculate run time if we now have both timestamps and it was NULL
172
286
run_time_seconds = CASE
173
- WHEN workflow_runs .in_progress_at IS NOT NULL AND EXCLUDED .completed_at IS NOT NULL
287
+ WHEN workflow_runs .run_time_seconds IS NULL
288
+ AND workflow_runs .in_progress_at IS NOT NULL
289
+ AND EXCLUDED .completed_at IS NOT NULL
174
290
THEN EXTRACT(EPOCH FROM (EXCLUDED .completed_at - workflow_runs .in_progress_at ))
175
291
ELSE workflow_runs .run_time_seconds
176
292
END,
182
298
$$;
183
299
184
300
COMMENT ON FUNCTION public.maintain_workflow_runs() IS
185
- ' Automatically maintains workflow_runs table by aggregating workflow_events. Replaces the expensive materialized view refresh with real-time updates.' ;
301
+ ' Automatically maintains workflow_runs table by aggregating workflow_events. Replaces the expensive materialized view refresh with real-time updates. Handles out-of-order webhook delivery by backfilling missing fields and recalculating metrics when late events arrive. ' ;
186
302
187
303
-- Create trigger on workflow_events
188
304
CREATE TRIGGER trigger_maintain_workflow_runs
0 commit comments