From b1741899c8ec0770dd21f87e336fdc4dd339a3e4 Mon Sep 17 00:00:00 2001 From: romanbrodetskiy Date: Tue, 21 Nov 2023 01:32:08 +0300 Subject: [PATCH] fix(prover): use a more performant query to get next job for specialized prover --- core/lib/dal/sqlx-data.json | 118 ++++++++++++++--------------- core/lib/dal/src/fri_prover_dal.rs | 18 +++-- 2 files changed, 69 insertions(+), 67 deletions(-) diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index e031fe7d671..18f0d8f198e 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -5197,65 +5197,6 @@ }, "query": "INSERT INTO events_queue (l1_batch_number, serialized_events_queue) VALUES ($1, $2)" }, - "62aaa047e3da5bd966608fec421ddad1b8afa04aaf35e946219d703bbe6ac9c5": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "l1_batch_number", - "ordinal": 1, - "type_info": "Int8" - }, - { - "name": "circuit_id", - "ordinal": 2, - "type_info": "Int2" - }, - { - "name": "aggregation_round", - "ordinal": 3, - "type_info": "Int2" - }, - { - "name": "sequence_number", - "ordinal": 4, - "type_info": "Int4" - }, - { - "name": "depth", - "ordinal": 5, - "type_info": "Int4" - }, - { - "name": "is_node_final_proof", - "ordinal": 6, - "type_info": "Bool" - } - ], - "nullable": [ - false, - false, - false, - false, - false, - false, - false - ], - "parameters": { - "Left": [ - "Int2Array", - "Int2Array", - "Int4Array", - "Text" - ] - } - }, - "query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $4\n WHERE id = (\n SELECT pj.id\n FROM prover_jobs_fri AS pj\n JOIN (\n SELECT * FROM unnest($1::smallint[], $2::smallint[])\n )\n AS tuple (circuit_id, round)\n ON tuple.circuit_id = pj.circuit_id AND tuple.round = pj.aggregation_round\n WHERE pj.status = 'queued'\n AND pj.protocol_version = ANY($3)\n ORDER BY pj.l1_batch_number ASC, pj.aggregation_round DESC, pj.id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n " - }, "6317155050a5dae24ea202cfd54d1e58cc7aeb0bfd4d95aa351f85cff04d3bff": { "describe": { "columns": [ @@ -8994,6 +8935,65 @@ }, "query": "SELECT prover_jobs.result as proof, scheduler_witness_jobs.aggregation_result_coords\n FROM prover_jobs\n INNER JOIN scheduler_witness_jobs\n ON prover_jobs.l1_batch_number = scheduler_witness_jobs.l1_batch_number\n WHERE prover_jobs.l1_batch_number >= $1 AND prover_jobs.l1_batch_number <= $2\n AND prover_jobs.aggregation_round = 3\n AND prover_jobs.status = 'successful'\n " }, + "a9fc7d587aff79ecb78c1a56b8299d5cb39e7fb0b10cb82b9abb1691f87422e6": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "l1_batch_number", + "ordinal": 1, + "type_info": "Int8" + }, + { + "name": "circuit_id", + "ordinal": 2, + "type_info": "Int2" + }, + { + "name": "aggregation_round", + "ordinal": 3, + "type_info": "Int2" + }, + { + "name": "sequence_number", + "ordinal": 4, + "type_info": "Int4" + }, + { + "name": "depth", + "ordinal": 5, + "type_info": "Int4" + }, + { + "name": "is_node_final_proof", + "ordinal": 6, + "type_info": "Bool" + } + ], + "nullable": [ + false, + false, + false, + false, + false, + false, + false + ], + "parameters": { + "Left": [ + "Int2Array", + "Int2Array", + "Int4Array", + "Text" + ] + } + }, + "query": "\n UPDATE prover_jobs_fri\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now(),\n picked_by = $4\n WHERE id = (\n SELECT pj.id\n FROM ( SELECT * FROM unnest($1::smallint[], $2::smallint[]) ) AS tuple (circuit_id, round)\n JOIN LATERAL\n (\n SELECT * FROM prover_jobs_fri AS pj\n WHERE pj.status = 'queued'\n AND pj.protocol_version = ANY($3)\n AND pj.circuit_id = tuple.circuit_id AND pj.aggregation_round = tuple.round\n ORDER BY pj.l1_batch_number ASC, pj.id ASC\n LIMIT 1\n ) AS pj ON true\n ORDER BY pj.l1_batch_number ASC, pj.aggregation_round DESC, pj.id ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING prover_jobs_fri.id, prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id,\n prover_jobs_fri.aggregation_round, prover_jobs_fri.sequence_number, prover_jobs_fri.depth,\n prover_jobs_fri.is_node_final_proof\n " + }, "aa279ce3351b30788711be6c65cb99cb14304ac38f8fed6d332237ffafc7c86b": { "describe": { "columns": [], diff --git a/core/lib/dal/src/fri_prover_dal.rs b/core/lib/dal/src/fri_prover_dal.rs index d3cb1364455..0787e818f6d 100644 --- a/core/lib/dal/src/fri_prover_dal.rs +++ b/core/lib/dal/src/fri_prover_dal.rs @@ -113,14 +113,16 @@ impl FriProverDal<'_, '_> { picked_by = $4 WHERE id = ( SELECT pj.id - FROM prover_jobs_fri AS pj - JOIN ( - SELECT * FROM unnest($1::smallint[], $2::smallint[]) - ) - AS tuple (circuit_id, round) - ON tuple.circuit_id = pj.circuit_id AND tuple.round = pj.aggregation_round - WHERE pj.status = 'queued' - AND pj.protocol_version = ANY($3) + FROM ( SELECT * FROM unnest($1::smallint[], $2::smallint[]) ) AS tuple (circuit_id, round) + JOIN LATERAL + ( + SELECT * FROM prover_jobs_fri AS pj + WHERE pj.status = 'queued' + AND pj.protocol_version = ANY($3) + AND pj.circuit_id = tuple.circuit_id AND pj.aggregation_round = tuple.round + ORDER BY pj.l1_batch_number ASC, pj.id ASC + LIMIT 1 + ) AS pj ON true ORDER BY pj.l1_batch_number ASC, pj.aggregation_round DESC, pj.id ASC LIMIT 1 FOR UPDATE