Skip to content

Commit

Permalink
fix(prover): use a more performant query to get next job for speciali…
Browse files Browse the repository at this point in the history
…zed prover
  • Loading branch information
RomanBrodetski committed Nov 21, 2023
1 parent 7c137b7 commit b174189
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 67 deletions.
118 changes: 59 additions & 59 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -5197,65 +5197,6 @@
},
"query": "INSERT INTO events_queue (l1_batch_number, serialized_events_queue) VALUES ($1, $2)"
},
"62aaa047e3da5bd966608fec421ddad1b8afa04aaf35e946219d703bbe6ac9c5": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "l1_batch_number",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "circuit_id",
"ordinal": 2,
"type_info": "Int2"
},
{
"name": "aggregation_round",
"ordinal": 3,
"type_info": "Int2"
},
{
"name": "sequence_number",
"ordinal": 4,
"type_info": "Int4"
},
{
"name": "depth",
"ordinal": 5,
"type_info": "Int4"
},
{
"name": "is_node_final_proof",
"ordinal": 6,
"type_info": "Bool"
}
],
"nullable": [
false,
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Int2Array",
"Int2Array",
"Int4Array",
"Text"
]
}
},
"query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $4\n WHERE id = (\n SELECT pj.id\n FROM prover_jobs_fri AS pj\n JOIN (\n SELECT * FROM unnest($1::smallint[], $2::smallint[])\n )\n AS tuple (circuit_id, round)\n ON tuple.circuit_id = pj.circuit_id AND tuple.round = pj.aggregation_round\n WHERE pj.status = 'queued'\n AND pj.protocol_version = ANY($3)\n ORDER BY pj.l1_batch_number ASC, pj.aggregation_round DESC, pj.id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n "
},
"6317155050a5dae24ea202cfd54d1e58cc7aeb0bfd4d95aa351f85cff04d3bff": {
"describe": {
"columns": [
Expand Down Expand Up @@ -8994,6 +8935,65 @@
},
"query": "SELECT prover_jobs.result as proof, scheduler_witness_jobs.aggregation_result_coords\n FROM prover_jobs\n INNER JOIN scheduler_witness_jobs\n ON prover_jobs.l1_batch_number = scheduler_witness_jobs.l1_batch_number\n WHERE prover_jobs.l1_batch_number >= $1 AND prover_jobs.l1_batch_number <= $2\n AND prover_jobs.aggregation_round = 3\n AND prover_jobs.status = 'successful'\n "
},
"a9fc7d587aff79ecb78c1a56b8299d5cb39e7fb0b10cb82b9abb1691f87422e6": {
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "l1_batch_number",
"ordinal": 1,
"type_info": "Int8"
},
{
"name": "circuit_id",
"ordinal": 2,
"type_info": "Int2"
},
{
"name": "aggregation_round",
"ordinal": 3,
"type_info": "Int2"
},
{
"name": "sequence_number",
"ordinal": 4,
"type_info": "Int4"
},
{
"name": "depth",
"ordinal": 5,
"type_info": "Int4"
},
{
"name": "is_node_final_proof",
"ordinal": 6,
"type_info": "Bool"
}
],
"nullable": [
false,
false,
false,
false,
false,
false,
false
],
"parameters": {
"Left": [
"Int2Array",
"Int2Array",
"Int4Array",
"Text"
]
}
},
"query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $4\n WHERE id = (\n SELECT pj.id\n FROM ( SELECT * FROM unnest($1::smallint[], $2::smallint[]) ) AS tuple (circuit_id, round)\n JOIN LATERAL\n (\n SELECT * FROM prover_jobs_fri AS pj\n WHERE pj.status = 'queued'\n AND pj.protocol_version = ANY($3)\n AND pj.circuit_id = tuple.circuit_id AND pj.aggregation_round = tuple.round\n ORDER BY pj.l1_batch_number ASC, pj.id ASC\n LIMIT 1\n ) AS pj ON true\n ORDER BY pj.l1_batch_number ASC, pj.aggregation_round DESC, pj.id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n "
},
"aa279ce3351b30788711be6c65cb99cb14304ac38f8fed6d332237ffafc7c86b": {
"describe": {
"columns": [],
Expand Down
18 changes: 10 additions & 8 deletions core/lib/dal/src/fri_prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ impl FriProverDal<'_, '_> {
picked_by = $4
WHERE id = (
SELECT pj.id
FROM prover_jobs_fri AS pj
JOIN (
SELECT * FROM unnest($1::smallint[], $2::smallint[])
)
AS tuple (circuit_id, round)
ON tuple.circuit_id = pj.circuit_id AND tuple.round = pj.aggregation_round
WHERE pj.status = 'queued'
AND pj.protocol_version = ANY($3)
FROM ( SELECT * FROM unnest($1::smallint[], $2::smallint[]) ) AS tuple (circuit_id, round)
JOIN LATERAL
(
SELECT * FROM prover_jobs_fri AS pj
WHERE pj.status = 'queued'
AND pj.protocol_version = ANY($3)
AND pj.circuit_id = tuple.circuit_id AND pj.aggregation_round = tuple.round
ORDER BY pj.l1_batch_number ASC, pj.id ASC
LIMIT 1
) AS pj ON true
ORDER BY pj.l1_batch_number ASC, pj.aggregation_round DESC, pj.id ASC
LIMIT 1
FOR UPDATE
Expand Down

0 comments on commit b174189

Please sign in to comment.