Skip to content

Commit 791fcaf

Browse files
committed
feat(coordinator): assign static prover first and avoid reassigning failing task to same prover
1 parent 45b23ed commit 791fcaf

File tree

11 files changed

+371
-108
lines changed

11 files changed

+371
-108
lines changed

coordinator/conf/config.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"prover_manager": {
33
"provers_per_session": 1,
44
"session_attempts": 5,
5+
"external_prover_threshold": 32,
56
"bundle_collection_time_sec": 180,
67
"batch_collection_time_sec": 180,
78
"chunk_collection_time_sec": 180,

coordinator/internal/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ type ProverManager struct {
1616
// Number of attempts that a session can be retried if previous attempts failed.
1717
// Currently we only consider proving timeout as failure here.
1818
SessionAttempts uint8 `json:"session_attempts"`
19+
// Threshold for activating the external prover based on unassigned task count.
20+
ExternalProverThreshold int64 `json:"external_prover_threshold"`
1921
// Zk verifier config.
2022
Verifier *VerifierConfig `json:"verifier"`
2123
// BatchCollectionTimeSec batch Proof collection time (in seconds).

coordinator/internal/logic/provertask/batch_prover_task.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/gin-gonic/gin"
@@ -63,29 +64,59 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
6364

6465
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
6566
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
67+
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
68+
unassignedBatchCount, err := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
69+
if err != nil {
70+
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
71+
return nil, ErrCoordinatorInternalFailure
72+
}
73+
// Assign external prover if unassigned task number exceeds threshold
74+
if unassignedBatchCount < bp.cfg.ProverManager.ExternalProverThreshold {
75+
return nil, nil
76+
}
77+
}
78+
6679
var batchTask *orm.Batch
6780
for i := 0; i < 5; i++ {
6881
var getTaskError error
6982
var tmpBatchTask *orm.Batch
70-
tmpBatchTask, getTaskError = bp.batchOrm.GetAssignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
83+
var assignedOffset, unassignedOffset = 0, 0
84+
tmpAssignedBatchTasks, getTaskError := bp.batchOrm.GetAssignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
7185
if getTaskError != nil {
72-
log.Error("failed to get assigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
86+
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
7387
return nil, ErrCoordinatorInternalFailure
7488
}
75-
7689
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
77-
// batch to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
78-
if tmpBatchTask == nil {
79-
tmpBatchTask, getTaskError = bp.batchOrm.GetUnassignedBatch(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
80-
if getTaskError != nil {
81-
log.Error("failed to get unassigned batch proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
82-
return nil, ErrCoordinatorInternalFailure
83-
}
90+
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
91+
tmpUnassignedBatchTask, getTaskError := bp.batchOrm.GetUnassignedBatches(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
92+
if getTaskError != nil {
93+
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
94+
return nil, ErrCoordinatorInternalFailure
8495
}
96+
for {
97+
tmpBatchTask = nil
98+
if assignedOffset < len(tmpAssignedBatchTasks) {
99+
tmpBatchTask = tmpAssignedBatchTasks[assignedOffset]
100+
assignedOffset++
101+
} else if unassignedOffset < len(tmpUnassignedBatchTask) {
102+
tmpBatchTask = tmpUnassignedBatchTask[unassignedOffset]
103+
unassignedOffset++
104+
}
105+
106+
if tmpBatchTask == nil {
107+
log.Debug("get empty batch", "height", getTaskParameter.ProverHeight)
108+
return nil, nil
109+
}
85110

86-
if tmpBatchTask == nil {
87-
log.Debug("get empty batch", "height", getTaskParameter.ProverHeight)
88-
return nil, nil
111+
// Don't dispatch the same failing job to the same prover
112+
proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
113+
if err != nil {
114+
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err)
115+
return nil, ErrCoordinatorInternalFailure
116+
}
117+
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
118+
break
119+
}
89120
}
90121

91122
rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)

coordinator/internal/logic/provertask/bundle_prover_task.go

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/gin-gonic/gin"
@@ -63,29 +64,59 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
6364

6465
maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
6566
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
67+
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
68+
unassignedBundleCount, err := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
69+
if err != nil {
70+
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
71+
return nil, ErrCoordinatorInternalFailure
72+
}
73+
// Assign external prover if unassigned task number exceeds threshold
74+
if unassignedBundleCount < bp.cfg.ProverManager.ExternalProverThreshold {
75+
return nil, nil
76+
}
77+
}
78+
6679
var bundleTask *orm.Bundle
6780
for i := 0; i < 5; i++ {
6881
var getTaskError error
6982
var tmpBundleTask *orm.Bundle
70-
tmpBundleTask, getTaskError = bp.bundleOrm.GetAssignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
83+
var assignedOffset, unassignedOffset = 0, 0
84+
tmpAssignedBundleTasks, getTaskError := bp.bundleOrm.GetAssignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
7185
if getTaskError != nil {
72-
log.Error("failed to get assigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
86+
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
7387
return nil, ErrCoordinatorInternalFailure
7488
}
75-
7689
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
77-
// bundle to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
78-
if tmpBundleTask == nil {
79-
tmpBundleTask, getTaskError = bp.bundleOrm.GetUnassignedBundle(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
80-
if getTaskError != nil {
81-
log.Error("failed to get unassigned bundle proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
82-
return nil, ErrCoordinatorInternalFailure
83-
}
90+
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
91+
tmpUnassignedBundleTask, getTaskError := bp.bundleOrm.GetUnassignedBundles(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, 50)
92+
if getTaskError != nil {
93+
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
94+
return nil, ErrCoordinatorInternalFailure
8495
}
96+
for {
97+
tmpBundleTask = nil
98+
if assignedOffset < len(tmpAssignedBundleTasks) {
99+
tmpBundleTask = tmpAssignedBundleTasks[assignedOffset]
100+
assignedOffset++
101+
} else if unassignedOffset < len(tmpUnassignedBundleTask) {
102+
tmpBundleTask = tmpUnassignedBundleTask[unassignedOffset]
103+
unassignedOffset++
104+
}
105+
106+
if tmpBundleTask == nil {
107+
log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight)
108+
return nil, nil
109+
}
85110

86-
if tmpBundleTask == nil {
87-
log.Debug("get empty bundle", "height", getTaskParameter.ProverHeight)
88-
return nil, nil
111+
// Don't dispatch the same failing job to the same prover
112+
proverTask, err := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
113+
if err != nil {
114+
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err)
115+
return nil, ErrCoordinatorInternalFailure
116+
}
117+
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
118+
break
119+
}
89120
}
90121

91122
rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)

coordinator/internal/logic/provertask/chunk_prover_task.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/gin-gonic/gin"
@@ -61,29 +62,59 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
6162

6263
maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
6364
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
65+
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
66+
unassignedChunkCount, err := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
67+
if err != nil {
68+
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
69+
return nil, ErrCoordinatorInternalFailure
70+
}
71+
// Assign external prover if unassigned task number exceeds threshold
72+
if unassignedChunkCount < cp.cfg.ProverManager.ExternalProverThreshold {
73+
return nil, nil
74+
}
75+
}
76+
6477
var chunkTask *orm.Chunk
6578
for i := 0; i < 5; i++ {
6679
var getTaskError error
6780
var tmpChunkTask *orm.Chunk
68-
tmpChunkTask, getTaskError = cp.chunkOrm.GetAssignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
81+
var assignedOffset, unassignedOffset = 0, 0
82+
tmpAssignedChunkTasks, getTaskError := cp.chunkOrm.GetAssignedChunks(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50)
6983
if getTaskError != nil {
7084
log.Error("failed to get assigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
7185
return nil, ErrCoordinatorInternalFailure
7286
}
73-
7487
// Why here need get again? In order to support a task can assign to multiple prover, need also assign `ProvingTaskAssigned`
7588
// chunk to prover. But use `proving_status in (1, 2)` will not use the postgres index. So need split the sql.
76-
if tmpChunkTask == nil {
77-
tmpChunkTask, getTaskError = cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
78-
if getTaskError != nil {
79-
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
80-
return nil, ErrCoordinatorInternalFailure
81-
}
89+
tmpUnassignedChunkTask, getTaskError := cp.chunkOrm.GetUnassignedChunk(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight, 50)
90+
if getTaskError != nil {
91+
log.Error("failed to get unassigned chunk proving tasks", "height", getTaskParameter.ProverHeight, "err", getTaskError)
92+
return nil, ErrCoordinatorInternalFailure
8293
}
94+
for {
95+
tmpChunkTask = nil
96+
if assignedOffset < len(tmpAssignedChunkTasks) {
97+
tmpChunkTask = tmpAssignedChunkTasks[assignedOffset]
98+
assignedOffset++
99+
} else if unassignedOffset < len(tmpUnassignedChunkTask) {
100+
tmpChunkTask = tmpUnassignedChunkTask[unassignedOffset]
101+
unassignedOffset++
102+
}
103+
104+
if tmpChunkTask == nil {
105+
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
106+
return nil, nil
107+
}
83108

84-
if tmpChunkTask == nil {
85-
log.Debug("get empty chunk", "height", getTaskParameter.ProverHeight)
86-
return nil, nil
109+
// Don't dispatch the same failing job to the same prover
110+
proverTask, err := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
111+
if err != nil {
112+
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", err)
113+
return nil, ErrCoordinatorInternalFailure
114+
}
115+
if proverTask == nil || types.ProverProveStatus(proverTask.ProvingStatus) != types.ProverProofInvalid {
116+
break
117+
}
87118
}
88119

89120
rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)

coordinator/internal/logic/provertask/prover_task.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ var (
2727
getTaskCounterVec *prometheus.CounterVec = nil
2828
)
2929

30+
var (
31+
// ExternalProverNamePrefix prefix of prover name
32+
ExternalProverNamePrefix = "external"
33+
)
34+
3035
// ProverTask the interface of a collector who send data to prover
3136
type ProverTask interface {
3237
Assign(ctx *gin.Context, getTaskParameter *coordinatorType.GetTaskParameter) (*coordinatorType.GetTaskSchema, error)

coordinator/internal/orm/batch.go

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -78,38 +78,47 @@ func (*Batch) TableName() string {
7878
return "batch"
7979
}
8080

81-
// GetUnassignedBatch retrieves unassigned batch based on the specified limit.
81+
// GetUnassignedBatches retrieves unassigned batches based on the specified limit.
8282
// The returned batches are sorted in ascending order by their index.
83-
func (o *Batch) GetUnassignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
84-
var batch Batch
83+
func (o *Batch) GetUnassignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) {
84+
var batch []*Batch
8585
db := o.db.WithContext(ctx)
86-
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
87-
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
86+
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;",
87+
int(types.ProvingTaskUnassigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit)
8888
err := db.Raw(sql).Scan(&batch).Error
8989
if err != nil {
90-
return nil, fmt.Errorf("Batch.GetUnassignedBatch error: %w", err)
90+
return nil, fmt.Errorf("Batch.GetUnassignedBatches error: %w", err)
9191
}
92-
if batch.Hash == "" {
93-
return nil, nil
92+
return batch, nil
93+
}
94+
95+
// GetUnassignedBatchCount retrieves unassigned batch count based on the specified limit.
96+
func (o *Batch) GetUnassignedBatchCount(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (int64, error) {
97+
var count int64
98+
db := o.db.WithContext(ctx)
99+
db = db.Model(&Batch{})
100+
db = db.Where("proving_status = ?", int(types.ProvingTaskUnassigned))
101+
db = db.Where("total_attempts < ?", maxTotalAttempts)
102+
db = db.Where("active_attempts < ?", maxActiveAttempts)
103+
db = db.Where("batch.deleted_at IS NULL")
104+
if err := db.Count(&count).Error; err != nil {
105+
return 0, fmt.Errorf("Batch.GetUnassignedBatchCount error: %w", err)
94106
}
95-
return &batch, nil
107+
return count, nil
96108
}
97109

98-
// GetAssignedBatch retrieves assigned batch based on the specified limit.
110+
// GetAssignedBatches retrieves assigned batches based on the specified limit.
99111
// The returned batches are sorted in ascending order by their index.
100-
func (o *Batch) GetAssignedBatch(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8) (*Batch, error) {
101-
var batch Batch
112+
func (o *Batch) GetAssignedBatches(ctx context.Context, maxActiveAttempts, maxTotalAttempts uint8, limit uint64) ([]*Batch, error) {
113+
var batch []*Batch
102114
db := o.db.WithContext(ctx)
103-
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT 1;",
104-
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady))
115+
sql := fmt.Sprintf("SELECT * FROM batch WHERE proving_status = %d AND total_attempts < %d AND active_attempts < %d AND chunk_proofs_status = %d AND batch.deleted_at IS NULL ORDER BY batch.index LIMIT %d;",
116+
int(types.ProvingTaskAssigned), maxTotalAttempts, maxActiveAttempts, int(types.ChunkProofsStatusReady), limit)
105117
err := db.Raw(sql).Scan(&batch).Error
106118
if err != nil {
107-
return nil, fmt.Errorf("Batch.GetAssignedBatch error: %w", err)
108-
}
109-
if batch.Hash == "" {
110-
return nil, nil
119+
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
111120
}
112-
return &batch, nil
121+
return batch, nil
113122
}
114123

115124
// GetUnassignedAndChunksUnreadyBatches get the batches which is unassigned and chunks is not ready
@@ -132,19 +141,6 @@ func (o *Batch) GetUnassignedAndChunksUnreadyBatches(ctx context.Context, offset
132141
return batches, nil
133142
}
134143

135-
// GetAssignedBatches retrieves all batches whose proving_status is either types.ProvingTaskAssigned.
136-
func (o *Batch) GetAssignedBatches(ctx context.Context) ([]*Batch, error) {
137-
db := o.db.WithContext(ctx)
138-
db = db.Model(&Batch{})
139-
db = db.Where("proving_status = ?", int(types.ProvingTaskAssigned))
140-
141-
var assignedBatches []*Batch
142-
if err := db.Find(&assignedBatches).Error; err != nil {
143-
return nil, fmt.Errorf("Batch.GetAssignedBatches error: %w", err)
144-
}
145-
return assignedBatches, nil
146-
}
147-
148144
// GetProvingStatusByHash retrieves the proving status of a batch given its hash.
149145
func (o *Batch) GetProvingStatusByHash(ctx context.Context, hash string) (types.ProvingStatus, error) {
150146
db := o.db.WithContext(ctx)

0 commit comments

Comments
 (0)