Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 130 additions & 9 deletions data/account/participationRegistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,22 @@ type ParticipationRecord struct {
Voting *crypto.OneTimeSignatureSecrets
}

// StateProofKey is a placeholder for the real state proof key type.
// PKI TODO: Replace this with a real object.
type StateProofKey []byte

// ParticipationRecordForRound adds in the per-round state proof key.
type ParticipationRecordForRound struct {
ParticipationRecord

StateProof StateProofKey
}

// IsZero returns true if the object contains zero values.
func (r ParticipationRecordForRound) IsZero() bool {
return r.StateProof == nil && r.ParticipationRecord.IsZero()
}

var zeroParticipationRecord = ParticipationRecord{}

// IsZero returns true if the object contains zero values.
Expand Down Expand Up @@ -152,11 +168,18 @@ var ErrMultipleKeysForID = errors.New("multiple valid keys found for the same pa
// ErrNoKeyForID there may be cases where a key is deleted and used at the same time, so this error should be handled.
var ErrNoKeyForID = errors.New("no valid key found for the participationID")

// ErrSecretNotFound is used when attempting to lookup secrets for a particular round.
var ErrSecretNotFound = errors.New("the participation ID did not have secrets for the requested round")

// ParticipationRegistry contain all functions for interacting with the Participation Registry.
type ParticipationRegistry interface {
// Insert adds a record to storage and computes the ParticipationID
Insert(record Participation) (ParticipationID, error)

// AppendKeys appends state proof keys to an existing Participation record. Keys can only be appended
// once, an error will occur when the data is flushed when inserting a duplicate key.
AppendKeys(id ParticipationID, keys map[uint64]StateProofKey) error

// Delete removes a record from storage.
Delete(id ParticipationID) error

Expand All @@ -169,6 +192,9 @@ type ParticipationRegistry interface {
// GetAll of the participation records.
GetAll() []ParticipationRecord

// GetForRound fetches a record with all secrets for a particular round.
GetForRound(id ParticipationID, round basics.Round) (ParticipationRecordForRound, error)

// Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account
// then it is possible for multiple records to be updated.
Register(id ParticipationID, on basics.Round) error
Expand Down Expand Up @@ -256,8 +282,9 @@ const (
key BLOB NOT NULL, --* msgpack encoding of ParticipationAccount.BlockProof.SignatureAlgorithm
PRIMARY KEY (pk, round)
)`
insertKeysetQuery = `INSERT INTO Keysets (participationID, account, firstValidRound, lastValidRound, keyDilution, vrf) VALUES (?, ?, ?, ?, ?, ?)`
insertRollingQuery = `INSERT INTO Rolling (pk, voting) VALUES (?, ?)`
insertKeysetQuery = `INSERT INTO Keysets (participationID, account, firstValidRound, lastValidRound, keyDilution, vrf, stateProof) VALUES (?, ?, ?, ?, ?, ?, ?)`
insertRollingQuery = `INSERT INTO Rolling (pk, voting) VALUES (?, ?)`
appendStateProofKeysQuery = `INSERT INTO StateProofKeys (pk, round, key) VALUES(?, ?, ?)`

// SELECT pk FROM Keysets WHERE participationID = ?
selectPK = `SELECT pk FROM Keysets WHERE participationID = ? LIMIT 1`
Expand All @@ -270,6 +297,10 @@ const (
FROM Keysets k
INNER JOIN Rolling r
ON k.pk = r.pk`
selectStateProofKeys = `SELECT s.key
FROM StateProofKeys s
WHERE round=?
AND pk IN (SELECT pk FROM Keysets WHERE participationID=?)`
deleteKeysets = `DELETE FROM Keysets WHERE pk=?`
deleteRolling = `DELETE FROM Rolling WHERE pk=?`
updateRollingFieldsSQL = `UPDATE Rolling
Expand Down Expand Up @@ -332,6 +363,7 @@ type updatingParticipationRecord struct {
type partDBWriteRecord struct {
insertID ParticipationID
insert Participation
keys map[uint64]StateProofKey

registerUpdated map[ParticipationID]updatingParticipationRecord

Expand Down Expand Up @@ -380,7 +412,11 @@ func (db *participationDB) writeThread() {
if len(wr.registerUpdated) != 0 {
err = db.registerInner(wr.registerUpdated)
} else if !wr.insertID.IsZero() {
err = db.insertInner(wr.insert, wr.insertID)
if wr.insert != (Participation{}) {
err = db.insertInner(wr.insert, wr.insertID)
} else if len(wr.keys) != 0 {
err = db.appendKeysInner(wr.insertID, wr.keys)
}
} else if !wr.delete.IsZero() {
err = db.deleteInner(wr.delete)
} else if wr.flushResultChannel != nil {
Expand Down Expand Up @@ -413,9 +449,9 @@ func verifyExecWithOneRowEffected(err error, result sql.Result, operationName st
}

func (db *participationDB) insertInner(record Participation, id ParticipationID) (err error) {

var rawVRF []byte
var rawVoting []byte
var rawStateProof []byte

if record.VRF != nil {
rawVRF = protocol.Encode(record.VRF)
Expand All @@ -424,6 +460,7 @@ func (db *participationDB) insertInner(record Participation, id ParticipationID)
voting := record.Voting.Snapshot()
rawVoting = protocol.Encode(&voting)
}
// PKI TODO: Extract state proof from record.

err = db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
result, err := tx.Exec(
Expand All @@ -433,8 +470,9 @@ func (db *participationDB) insertInner(record Participation, id ParticipationID)
record.FirstValid,
record.LastValid,
record.KeyDilution,
rawVRF)
if err := verifyExecWithOneRowEffected(err, result, "insert keyset"); err != nil {
rawVRF,
rawStateProof)
if err = verifyExecWithOneRowEffected(err, result, "insert keyset"); err != nil {
return err
}
pk, err := result.LastInsertId()
Expand All @@ -444,7 +482,7 @@ func (db *participationDB) insertInner(record Participation, id ParticipationID)

// Create Rolling entry
result, err = tx.Exec(insertRollingQuery, pk, rawVoting)
if err := verifyExecWithOneRowEffected(err, result, "insert rolling"); err != nil {
if err = verifyExecWithOneRowEffected(err, result, "insert rolling"); err != nil {
return err
}

Expand All @@ -453,6 +491,37 @@ func (db *participationDB) insertInner(record Participation, id ParticipationID)
return err
}

func (db *participationDB) appendKeysInner(id ParticipationID, keys map[uint64]StateProofKey) error {
err := db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
// Fetch primary key
var pk int
row := tx.QueryRow(selectPK, id[:])
err := row.Scan(&pk)
if err == sql.ErrNoRows {
// nothing to do.
return nil
}
if err != nil {
return fmt.Errorf("unable to scan pk: %w", err)
}

stmt, err := tx.Prepare(appendStateProofKeysQuery)
if err != nil {
return fmt.Errorf("unable to prepare state proof insert: %w", err)
}

for k, v := range keys {
result, err := stmt.Exec(pk, k, v)
if err = verifyExecWithOneRowEffected(err, result, "append keys"); err != nil {
return err
}
}

return nil
})
return err
}

func (db *participationDB) registerInner(updated map[ParticipationID]updatingParticipationRecord) error {
var cacheDeletes []ParticipationID
err := db.store.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
Expand Down Expand Up @@ -502,12 +571,12 @@ func (db *participationDB) deleteInner(id ParticipationID) error {

// Delete rows
result, err := tx.Exec(deleteKeysets, pk)
if err := verifyExecWithOneRowEffected(err, result, "delete keyset"); err != nil {
if err = verifyExecWithOneRowEffected(err, result, "delete keyset"); err != nil {
return err
}

result, err = tx.Exec(deleteRolling, pk)
if err := verifyExecWithOneRowEffected(err, result, "delete rolling"); err != nil {
if err = verifyExecWithOneRowEffected(err, result, "delete rolling"); err != nil {
return err
}

Expand Down Expand Up @@ -578,6 +647,8 @@ func (db *participationDB) Insert(record Participation) (id ParticipationID, err

id = record.ID()
if _, ok := db.cache[id]; ok {
// PKI TODO: Add a special case to set the StateProof public key if it is in the input
// but not in the cache.
return id, ErrAlreadyInserted
}

Expand Down Expand Up @@ -619,6 +690,27 @@ func (db *participationDB) Insert(record Participation) (id ParticipationID, err
return
}

func (db *participationDB) AppendKeys(id ParticipationID, keys map[uint64]StateProofKey) error {
db.mutex.Lock()
defer db.mutex.Unlock()

if _, ok := db.cache[id]; !ok {
return ErrParticipationIDNotFound
}

keyCopy := make(map[uint64]StateProofKey, len(keys))
for k, v := range keys {
keyCopy[k] = v // PKI TODO: Deep copy?
}

// Update the DB asynchronously.
db.writeQueue <- partDBWriteRecord{
insertID: id,
keys: keyCopy,
}
return nil
}

func (db *participationDB) Delete(id ParticipationID) error {
db.mutex.Lock()
defer db.mutex.Unlock()
Expand All @@ -629,6 +721,7 @@ func (db *participationDB) Delete(id ParticipationID) error {
}
delete(db.dirty, id)
delete(db.cache, id)

// do the db part async
db.writeQueue <- partDBWriteRecord{
delete: id,
Expand Down Expand Up @@ -770,6 +863,34 @@ func (db *participationDB) GetAll() []ParticipationRecord {
return results
}

// GetForRound fetches a record with all secrets for a particular round.
func (db *participationDB) GetForRound(id ParticipationID, round basics.Round) (ParticipationRecordForRound, error) {
var result ParticipationRecordForRound
result.ParticipationRecord = db.Get(id)
if result.ParticipationRecord.IsZero() {
return ParticipationRecordForRound{}, ErrParticipationIDNotFound
}

err := db.store.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRow(selectStateProofKeys, round, id[:])
err := row.Scan(&result.StateProof)
if err == sql.ErrNoRows {
return ErrSecretNotFound
}
if err != nil {
return fmt.Errorf("error while querying secrets: %w", err)
}

return nil
})

if err != nil {
return ParticipationRecordForRound{}, fmt.Errorf("unable to lookup secrets: %w", err)
}

return result, nil
}

// updateRollingFields sets all of the rolling fields according to the record object.
func updateRollingFields(ctx context.Context, tx *sql.Tx, record ParticipationRecord) error {
result, err := tx.ExecContext(ctx, updateRollingFieldsSQL,
Expand Down
80 changes: 80 additions & 0 deletions data/account/participationRegistryBench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package account

import (
"fmt"
"testing"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/util/db"
)

func benchmarkKeyRegistration(numKeys int, b *testing.B) {
// setup
rootDB, err := db.OpenPair(b.Name(), true)
if err != nil {
b.Fail()
}
registry, err := makeParticipationRegistry(rootDB, logging.TestingLog(b))
if err != nil {
b.Fail()
}

// Insert records so that we can t
b.Run(fmt.Sprintf("KeyInsert_%d", numKeys), func(b *testing.B) {
for n := 0; n < b.N; n++ {
for key := 0; key < numKeys; key++ {
p := makeTestParticipation(key, basics.Round(0), basics.Round(1000000), 3)
registry.Insert(p)
}
}
})

// The first call to Register updates the DB.
b.Run(fmt.Sprintf("KeyRegistered_%d", numKeys), func(b *testing.B) {
for n := 0; n < b.N; n++ {
for key := 0; key < numKeys; key++ {
p := makeTestParticipation(key, basics.Round(0), basics.Round(1000000), 3)

// Unfortunately we need to repeatedly clear out the registration fields to ensure the
// db update runs each time this is called.
record := registry.cache[p.ID()]
record.EffectiveFirst = 0
record.EffectiveLast = 0
registry.cache[p.ID()] = record
registry.Register(p.ID(), 50)
}
}
})

// The keys should now be updated, so Register is a no-op.
b.Run(fmt.Sprintf("NoOp_%d", numKeys), func(b *testing.B) {
for n := 0; n < b.N; n++ {
for key := 0; key < numKeys; key++ {
p := makeTestParticipation(key, basics.Round(0), basics.Round(1000000), 3)
registry.Register(p.ID(), 50)
}
}
})
}

func BenchmarkKeyRegistration1(b *testing.B) { benchmarkKeyRegistration(1, b) }
func BenchmarkKeyRegistration5(b *testing.B) { benchmarkKeyRegistration(5, b) }
func BenchmarkKeyRegistration10(b *testing.B) { benchmarkKeyRegistration(10, b) }
func BenchmarkKeyRegistration50(b *testing.B) { benchmarkKeyRegistration(50, b) }
Loading