Skip to content

Commit

Permalink
app: address all to-do's
Browse files Browse the repository at this point in the history
- we now dedup existing exits to avoid working them twice
- reference obol api implementation doesn't return 400 if there are already threshold partial exits
  • Loading branch information
gsora committed Oct 16, 2023
1 parent aa2107e commit 08ae85f
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 57 deletions.
70 changes: 53 additions & 17 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package app

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand All @@ -30,20 +31,12 @@ import (

// Config is the lido-dv-exit CLI configuration flag value holder.
type Config struct {
Log log.Config

// TODO: check that's a real URL
BeaconNodeURL string

// TODO: check if the directory exists and that is writable.
EjectorExitPath string

// TODO: check that the directory exists, keystore.LoadManifest will check the format is appropriate.
Log log.Config
BeaconNodeURL string
EjectorExitPath string
CharonRuntimeDir string

ExitEpoch uint64

ObolAPIURL string
ExitEpoch uint64
ObolAPIURL string
}

// Run runs the lido-dv-exit core logic.
Expand Down Expand Up @@ -75,6 +68,11 @@ func Run(ctx context.Context, config Config) error {
return errors.Wrap(err, "keystore load error")
}

existingValIndices, err := loadExistingValidatorExits(config.EjectorExitPath)
if err != nil {
return err
}

// TODO(gsora): cross-check the lido-ejector exits already present with valsKeys, so that we don't
// re-process what's already been processed.

Expand All @@ -83,7 +81,6 @@ func Run(ctx context.Context, config Config) error {
return errors.Wrap(err, "can't connect to beacon node")
}

// TODO(gsora): check obol api url, see if correct
oAPI := obolapi.Client{ObolAPIUrl: config.ObolAPIURL}

tick := time.NewTicker(1 * time.Second)
Expand All @@ -100,8 +97,7 @@ func Run(ctx context.Context, config Config) error {
return errors.Wrap(err, "validator keys to phase0")
}

// TODO(gsora): calling with finalized here, need to understand what's better
valIndices, err := bnClient.ValidatorsByPubKey(ctx, bnapi.StateIDFinalized.String(), phase0Vals)
valIndices, err := bnClient.ValidatorsByPubKey(ctx, bnapi.StateIDHead.String(), phase0Vals)
if err != nil {
log.Error(ctx, "Cannot fetch validator state", err)
continue
Expand All @@ -112,6 +108,15 @@ func Run(ctx context.Context, config Config) error {

ctx := log.WithCtx(ctx, z.Str("validator", validatorPubkStr))

if _, ok := existingValIndices[valIndex]; ok {
// we already have an exit for this validator, remove it from the list and don't
// process it
log.Debug(ctx, "Validator already has an exit")
delete(valsKeys, keystore.ValidatorPubkey(validatorPubkStr))

continue
}

if !shouldProcessValidator(val) {
log.Debug(ctx, "Not processing validator", z.Str("state", val.Status.String()))
continue
Expand Down Expand Up @@ -273,7 +278,8 @@ func postPartialExit(ctx context.Context, oAPI obolapi.Client, mutationHash []by
return true
}

// TODO(gsora): check this logic with the team.
// shouldProcessValidator returns true if a validator needs to be processed, meaning a full exit message must
// be created.
func shouldProcessValidator(v *eth2v1.Validator) bool {
return v.Status == eth2v1.ValidatorStateActiveOngoing
}
Expand Down Expand Up @@ -336,3 +342,33 @@ func eth2Client(ctx context.Context, bnURL string) (eth2wrap.Client, error) {

return eth2wrap.AdaptEth2HTTP(bnClient, 1*time.Second), nil
}

// loadExistingValidatorExits reads the indices for validators whose exits have been already processed.
func loadExistingValidatorExits(ejectorPath string) (map[eth2p0.ValidatorIndex]struct{}, error) {
exitPaths, err := filepath.Glob(filepath.Join(ejectorPath, "*.json"))
if err != nil {
return nil, errors.Wrap(err, "ejector exits glob")
}

ret := map[eth2p0.ValidatorIndex]struct{}{}

if len(exitPaths) == 0 {
return ret, nil
}

for _, ep := range exitPaths {
exitBytes, err := os.ReadFile(ep)
if err != nil {
return nil, errors.Wrap(err, "read exit file", z.Str("path", ep))
}

var exit eth2p0.SignedVoluntaryExit
if err := json.Unmarshal(exitBytes, &exit); err != nil {
return nil, errors.Wrap(err, "exit file unmarshal", z.Str("path", ep))
}

ret[exit.Message.ValidatorIndex] = struct{}{}
}

return ret, nil
}
167 changes: 131 additions & 36 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
eth2http "github.com/attestantio/go-eth2-client/http"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
k1 "github.com/decred/dcrd/dcrec/secp256k1/v4"
"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/eth2wrap"
"github.com/obolnetwork/charon/app/k1util"
Expand All @@ -38,7 +39,7 @@ import (

const exitEpoch = eth2p0.Epoch(194048)

func Test_RunFlow(t *testing.T) {
func Test_NormalFlow(t *testing.T) {
valAmt := 4
operatorAmt := 4

Expand All @@ -51,51 +52,89 @@ func Test_RunFlow(t *testing.T) {
cluster.WithVersion("v1.7.0"),
)

operatorShares := make([][]tbls.PrivateKey, operatorAmt)
srvs := testAPIServers(t, lock)
defer srvs.Close()

for opIdx := 0; opIdx < operatorAmt; opIdx++ {
for _, share := range keyShares {
operatorShares[opIdx] = append(operatorShares[opIdx], share[opIdx])
}
}
run(t,
t.TempDir(),
lock,
enrs,
keyShares,
true,
srvs,
)
}

dag, err := manifest.NewDAGFromLockForT(t, lock)
require.NoError(t, err)
func Test_RunTwice(t *testing.T) {
valAmt := 4
operatorAmt := 4

mBytes, err := proto.Marshal(dag)
require.NoError(t, err)
lock, enrs, keyShares := cluster.NewForT(
t,
valAmt,
operatorAmt,
operatorAmt,
0,
cluster.WithVersion("v1.7.0"),
)

baseDir := t.TempDir()
ejectorDir := filepath.Join(t.TempDir(), "ejector")
require.NoError(t, os.Mkdir(ejectorDir, 0o755))
srvs := testAPIServers(t, lock)
defer srvs.Close()

// write private keys and manifest files
for opIdx := 0; opIdx < operatorAmt; opIdx++ {
opID := fmt.Sprintf("op%d", opIdx)
oDir := filepath.Join(baseDir, opID)
eDir := filepath.Join(ejectorDir, opID)
keysDir := filepath.Join(oDir, "validator_keys")
manifestFile := filepath.Join(oDir, "cluster-manifest.pb")
root := t.TempDir()

run(t,
root,
lock,
enrs,
keyShares,
true,
srvs,
)

// delete half exits from each ejector directory
ejectorDir := filepath.Join(root, "ejector")

require.NoError(t, os.MkdirAll(oDir, 0o755))
require.NoError(t, k1util.Save(enrs[opIdx], filepath.Join(oDir, "charon-enr-private-key")))
for opID := 0; opID < operatorAmt; opID++ {
ejectorOPPath := filepath.Join(ejectorDir, fmt.Sprintf("op%d", opID))

require.NoError(t, os.MkdirAll(keysDir, 0o755))
require.NoError(t, os.MkdirAll(eDir, 0o755))
exitPaths, err := filepath.Glob(filepath.Join(ejectorOPPath, "*.json"))
require.NoError(t, err)

require.NoError(t, ckeystore.StoreKeysInsecure(operatorShares[opIdx], keysDir, ckeystore.ConfirmInsecureKeys))
require.NoError(t, os.WriteFile(manifestFile, mBytes, 0o755))
for exitIdx := 0; exitIdx < len(exitPaths)/2; exitIdx++ {
require.NoError(t, os.Remove(exitPaths[exitIdx]))
}
}

// wire test server for obol api
run(t,
root,
lock,
enrs,
keyShares,
false,
srvs,
)
}

type testServers struct {
obolAPIServer *httptest.Server
bnapiServer *httptest.Server
}

func (ts testServers) Close() error {
_ = ts.obolAPIServer.Close
_ = ts.bnapiServer.Close

return nil
}

func testAPIServers(t *testing.T, lock cluster.Lock) testServers {
t.Helper()

oapiHandler, oapiAddLock := obolapi.MockServer()
oapiAddLock(lock)

oapiServer := httptest.NewServer(oapiHandler)
defer oapiServer.Close()

// wire eth mock server

mockValidators := map[string]eth2v1.Validator{}

Expand All @@ -119,7 +158,63 @@ func Test_RunFlow(t *testing.T) {

bnapiHandler := bnapi.MockBeaconNode(mockValidators)
bnapiServer := httptest.NewServer(bnapiHandler)
defer bnapiServer.Close()

return testServers{
obolAPIServer: oapiServer,
bnapiServer: bnapiServer,
}
}

func run(
t *testing.T,
root string,
lock cluster.Lock,
enrs []*k1.PrivateKey,
keyShares [][]tbls.PrivateKey,
createDirFiles bool,
servers testServers,
) {
t.Helper()

operatorAmt := len(lock.Operators)

operatorShares := make([][]tbls.PrivateKey, operatorAmt)

for opIdx := 0; opIdx < operatorAmt; opIdx++ {
for _, share := range keyShares {
operatorShares[opIdx] = append(operatorShares[opIdx], share[opIdx])
}
}

dag, err := manifest.NewDAGFromLockForT(t, lock)
require.NoError(t, err)

mBytes, err := proto.Marshal(dag)
require.NoError(t, err)

ejectorDir := filepath.Join(root, "ejector")

if createDirFiles {
require.NoError(t, os.Mkdir(ejectorDir, 0o755))

// write private keys and manifest files
for opIdx := 0; opIdx < operatorAmt; opIdx++ {
opID := fmt.Sprintf("op%d", opIdx)
oDir := filepath.Join(root, opID)
eDir := filepath.Join(ejectorDir, opID)
keysDir := filepath.Join(oDir, "validator_keys")
manifestFile := filepath.Join(oDir, "cluster-manifest.pb")

require.NoError(t, os.MkdirAll(oDir, 0o755))
require.NoError(t, k1util.Save(enrs[opIdx], filepath.Join(oDir, "charon-enr-private-key")))

require.NoError(t, os.MkdirAll(keysDir, 0o755))
require.NoError(t, os.MkdirAll(eDir, 0o755))

require.NoError(t, ckeystore.StoreKeysInsecure(operatorShares[opIdx], keysDir, ckeystore.ConfirmInsecureKeys))
require.NoError(t, os.WriteFile(manifestFile, mBytes, 0o755))
}
}

runConfForIdx := func(idx int) app.Config {
opID := fmt.Sprintf("op%d", idx)
Expand All @@ -130,10 +225,10 @@ func Test_RunFlow(t *testing.T) {
Format: "console",
Color: "false",
},
BeaconNodeURL: bnapiServer.URL,
BeaconNodeURL: servers.bnapiServer.URL,
EjectorExitPath: filepath.Join(ejectorDir, opID),
CharonRuntimeDir: filepath.Join(baseDir, opID),
ObolAPIURL: oapiServer.URL,
CharonRuntimeDir: filepath.Join(root, opID),
ObolAPIURL: servers.obolAPIServer.URL,
ExitEpoch: 194048,
}
}
Expand All @@ -155,7 +250,7 @@ func Test_RunFlow(t *testing.T) {

require.NoError(t, eg.Wait())

mockEth2Cl := eth2Client(t, context.Background(), bnapiServer.URL)
mockEth2Cl := eth2Client(t, context.Background(), servers.bnapiServer.URL)

// check that all produced exit messages are signed by all partial keys for all operators
for opIdx := 0; opIdx < operatorAmt; opIdx++ {
Expand Down
4 changes: 3 additions & 1 deletion app/obolapi/obolapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ const (

var ErrNoExit = errors.New("no exit for the given validator public key")

// partialExitURL returns the partial exit Obol API URL for a given lock hash.
func partialExitURL(lockHash string) string {
return strings.NewReplacer(
lockHashPath,
lockHash,
).Replace(partialExitTmpl)
}

// bearerString returns the bearer token authentication string given a token.
func bearerString(data []byte) string {
return fmt.Sprintf("Bearer %s", base64.StdEncoding.EncodeToString(data))
}

// TODO(gsora): validate public key.
// fullExitURL returns the full exit Obol API URL for a given validator public key.
func fullExitURL(valPubkey, lockHash string, shareIndex int) string {
return strings.NewReplacer(
valPubkeyPath,
Expand Down
5 changes: 2 additions & 3 deletions app/obolapi/test_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,8 @@ func (ts *testServer) HandlePartialExit(writer http.ResponseWriter, request *htt
return
}

if len(ts.partialExits[exit.PublicKey])+1 > len(lock.Operators) { // we're already at threshold
writeErr(writer, http.StatusBadRequest, "already at threshold for selected validator")
return
if len(ts.partialExits[exit.PublicKey])+1 > len(lock.Operators) { // we're already at threshold, ignore
continue
}

ts.partialExits[exit.PublicKey] = append(ts.partialExits[exit.PublicKey], exitBlob{
Expand Down

0 comments on commit 08ae85f

Please sign in to comment.