Skip to content

Commit

Permalink
July2024/improve gsfa perf (#124)
Browse files Browse the repository at this point in the history
* Use custom temp dir for gsfa

* Flush based on performance

* Fix tmpDir

* Max is 1000

* Remove /health and /metrics req logging; closes #127

* Move metrics to metrics package

* Prometheus for index and car lookups; closes #126

* Cleanup metrics; closes #128

* gsfa: include pubkeys from address lookup tables

* Miner info: use exponential retry

* Fix tests
  • Loading branch information
gagliardetto authored Oct 4, 2024
1 parent 69c6bf9 commit dd83ea1
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 37 deletions.
83 changes: 74 additions & 9 deletions cmd-x-index-gsfa.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/rpcpool/yellowstone-faithful/indexmeta"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
"github.com/rpcpool/yellowstone-faithful/third_party/solana_proto/confirmed_block"
"github.com/urfave/cli/v2"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -70,7 +72,7 @@ func newCmd_Index_gsfa() *cli.Command {
},
&cli.StringFlag{
Name: "tmp-dir",
Usage: "temporary directory to use for storing intermediate files",
Usage: "temporary directory to use for storing intermediate files; WILL BE DELETED",
Value: os.TempDir(),
},
},
Expand Down Expand Up @@ -137,6 +139,10 @@ func newCmd_Index_gsfa() *cli.Command {
return fmt.Errorf("failed to add network to sig_exists index metadata: %w", err)
}
tmpDir := c.String("tmp-dir")
tmpDir = filepath.Join(tmpDir, fmt.Sprintf("yellowstone-faithful-gsfa-%d", time.Now().UnixNano()))
if err := os.MkdirAll(tmpDir, 0o755); err != nil {
return fmt.Errorf("failed to create tmp dir: %w", err)
}
indexW, err := gsfa.NewGsfaWriter(
gsfaIndexDir,
meta,
Expand Down Expand Up @@ -218,12 +224,17 @@ func newCmd_Index_gsfa() *cli.Command {
for ii := range transactions {
txWithInfo := transactions[ii]
numProcessedTransactions.Add(1)
accountKeys := txWithInfo.Transaction.Message.AccountKeys
if txWithInfo.Metadata != nil {
accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedReadonlyAddresses)...)
accountKeys = append(accountKeys, byteSlicesToKeySlice(txWithInfo.Metadata.LoadedWritableAddresses)...)
}
err = indexW.Push(
txWithInfo.Offset,
txWithInfo.Length,
txWithInfo.Slot,
txWithInfo.Blocktime,
txWithInfo.Transaction.Message.AccountKeys,
accountKeys,
)
if err != nil {
klog.Exitf("Error while pushing to gsfa index: %s", err)
Expand Down Expand Up @@ -270,27 +281,80 @@ func objectsToTransactions(
objects []accum.ObjectWithMetadata,
) ([]*TransactionWithSlot, error) {
transactions := make([]*TransactionWithSlot, 0, len(objects))
dataBlocks := make([]accum.ObjectWithMetadata, 0)
for _, object := range objects {
// check if the object is a transaction:
kind := iplddecoders.Kind(object.ObjectData[1])
if kind == iplddecoders.KindDataFrame {
dataBlocks = append(dataBlocks, object)
continue
}
if kind != iplddecoders.KindTransaction {
continue
}
decoded, err := iplddecoders.DecodeTransaction(object.ObjectData)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex %s: %w", object.Cid, err)
}
tws := &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decoded.Slot),
Blocktime: uint64(block.Meta.Blocktime),
}
if total, ok := decoded.Metadata.GetTotal(); !ok || total == 1 {
completeBuffer := decoded.Metadata.Bytes()
if ha, ok := decoded.Metadata.GetHash(); ok {
err := ipldbindcode.VerifyHash(completeBuffer, ha)
if err != nil {
return nil, fmt.Errorf("failed to verify metadata hash: %w", err)
}
}
if len(completeBuffer) > 0 {
uncompressedMeta, err := decompressZstd(completeBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta)
if err == nil {
tws.Metadata = status
}
}
} else {
metaBuffer, err := loadDataFromDataFrames(&decoded.Metadata, func(ctx context.Context, wantedCid cid.Cid) (*ipldbindcode.DataFrame, error) {
for _, dataBlock := range dataBlocks {
if dataBlock.Cid == wantedCid {
df, err := iplddecoders.DecodeDataFrame(dataBlock.ObjectData)
if err != nil {
return nil, err
}
return df, nil
}
}
return nil, fmt.Errorf("dataframe not found")
})
if err != nil {
return nil, fmt.Errorf("failed to load metadata: %w", err)
}
// reset dataBlocks:
dataBlocks = dataBlocks[:0]
if len(metaBuffer) > 0 {
uncompressedMeta, err := decompressZstd(metaBuffer)
if err != nil {
return nil, fmt.Errorf("failed to decompress metadata: %w", err)
}
status, err := solanatxmetaparsers.ParseTransactionStatusMeta(uncompressedMeta)
if err == nil {
tws.Metadata = status
}
}
}
tx, err := decoded.GetSolanaTransaction()
if err != nil {
return nil, fmt.Errorf("error while getting solana transaction from object %s: %w", object.Cid, err)
}
transactions = append(transactions, &TransactionWithSlot{
Offset: object.Offset,
Length: object.SectionLength,
Slot: uint64(decoded.Slot),
Blocktime: uint64(block.Meta.Blocktime),
Transaction: *tx,
})
tws.Transaction = *tx
transactions = append(transactions, tws)
}
return transactions, nil
}
Expand All @@ -311,4 +375,5 @@ type TransactionWithSlot struct {
Slot uint64
Blocktime uint64
Transaction solana.Transaction
Metadata *confirmed_block.TransactionStatusMeta
}
49 changes: 30 additions & 19 deletions gsfa/gsfa-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type GsfaWriter struct {
mu sync.Mutex
indexRootDir string
popRank *rollingRankOfTopPerformers // top pubkeys by flush count
offsets *hashmap.Map[solana.PublicKey, [2]uint64]
ll *linkedlog.LinkedLog
man *manifest.Manifest
Expand Down Expand Up @@ -61,6 +62,7 @@ func NewGsfaWriter(
ctx, cancel := context.WithCancel(context.Background())
index := &GsfaWriter{
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable
popRank: newRollingRankOfTopPerformers(10_000),
offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)),
ctx: ctx,
Expand Down Expand Up @@ -120,6 +122,9 @@ func (a *GsfaWriter) fullBufferWriter() {
has := tmpBuf.Has(buffer.Key)
if len(tmpBuf) == howManyBuffersToFlushConcurrently || has {
for _, buf := range tmpBuf {
if len(buf.Values) == 0 {
continue
}
// Write the buffer to the linked log.
klog.V(5).Infof("Flushing %d transactions for key %s", len(buf.Values), buf.Key)
if err := a.flushKVs(buf); err != nil {
Expand All @@ -131,7 +136,7 @@ func (a *GsfaWriter) fullBufferWriter() {
tmpBuf = append(tmpBuf, buffer)
}
case <-time.After(1 * time.Second):
klog.Infof("Read %d buffers from channel", numReadFromChan)
klog.V(5).Infof("Read %d buffers from channel", numReadFromChan)
}
}
}
Expand All @@ -153,39 +158,45 @@ func (a *GsfaWriter) Push(
}
publicKeys = publicKeys.Dedupe()
publicKeys.Sort()
if slot%1000 == 0 {
if a.accum.Len() > 130_000 {
// flush all
klog.Infof("Flushing all %d keys", a.accum.Len())
if slot%500 == 0 && a.accum.Len() > 100_000 {
// flush all
klog.V(4).Infof("Flushing all %d keys", a.accum.Len())

var keys solana.PublicKeySlice = a.accum.Keys()
keys.Sort()
var keys solana.PublicKeySlice = a.accum.Keys()
keys.Sort()

for iii := range keys {
key := keys[iii]
values, _ := a.accum.Get(key)
a.popRank.purge()

if len(values) < 100 && len(values) > 0 {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: key,
Values: values,
}); err != nil {
return err
}
a.accum.Delete(key)
for iii := range keys {
key := keys[iii]
values, _ := a.accum.Get(key)
// The objective is to have as big of a batch for each key as possible (max is 1000).
// So we optimize for delaying the flush for the most popular keys (popular=has been flushed a lot of times).
// And we flush the less popular keys, periodically if they haven't seen much activity.

// if this key has less than 100 values and is not in the top list of keys by flush count, then
// it's very likely that this key isn't going to get a lot of values soon
if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: key,
Values: values,
}); err != nil {
return err
}
a.accum.Delete(key)
}
}
}
for _, publicKey := range publicKeys {
current, ok := a.accum.Get(publicKey)
if !ok {
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0)
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch)
current = append(current, oas)
a.accum.Set(publicKey, current)
} else {
current = append(current, oas)
if len(current) >= itemsPerBatch {
a.popRank.Incr(publicKey, 1)
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{
Key: publicKey,
Values: clone(current),
Expand Down
72 changes: 72 additions & 0 deletions gsfa/pop-rank.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package gsfa

import (
"slices"
"sort"

"github.com/gagliardetto/solana-go"
"github.com/tidwall/hashmap"
)

type rollingRankOfTopPerformers struct {
rankListSize int
maxValue int
minValue int
set hashmap.Map[solana.PublicKey, int]
}

func newRollingRankOfTopPerformers(rankListSize int) *rollingRankOfTopPerformers {
return &rollingRankOfTopPerformers{
rankListSize: rankListSize,
}
}

func (r *rollingRankOfTopPerformers) Incr(key solana.PublicKey, delta int) int {
value, ok := r.set.Get(key)
if !ok {
value = 0
}
value = value + delta
r.set.Set(key, value)
if value > r.maxValue {
r.maxValue = value
}
if value < r.minValue {
r.minValue = value
}
return value
}

func (r *rollingRankOfTopPerformers) Get(key solana.PublicKey) (int, bool) {
value, ok := r.set.Get(key)
return value, ok
}

// purge will remove all keys by the lowest values until the rankListSize is reached.
// keys with equivalent values are kept.
func (r *rollingRankOfTopPerformers) purge() {
values := r.set.Values()
sort.Ints(values)
values = slices.Compact(values)
if len(values) <= r.rankListSize {
return
}

// remove the lowest values
for _, value := range values[:len(values)-r.rankListSize] {
for _, key := range r.set.Keys() {
if v, _ := r.set.Get(key); v == value {
r.set.Delete(key)
}
}
}

// update the min and max values
r.minValue = values[len(values)-r.rankListSize]
r.maxValue = values[len(values)-1]
}

func (r *rollingRankOfTopPerformers) has(key solana.PublicKey) bool {
_, ok := r.set.Get(key)
return ok
}
55 changes: 55 additions & 0 deletions gsfa/pop-rank_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package gsfa

import (
"testing"

"github.com/gagliardetto/solana-go"
"github.com/stretchr/testify/require"
)

func TestPopRank(t *testing.T) {
// Test the rollingRankOfTopPerformers type:
{
// Create a new rollingRankOfTopPerformers:
r := newRollingRankOfTopPerformers(5)
if r == nil {
t.Fatal("expected non-nil rollingRankOfTopPerformers")
}
// Test the Incr method:
{
key := solana.SysVarRentPubkey
delta := 1
value := r.Incr(key, delta)
require.Equal(t, 1, value)
}
// Test the purge method:
{
r.purge()
// the value should still be 1
value, ok := r.Get(solana.SysVarRentPubkey)
require.True(t, ok)
require.Equal(t, 1, value)
}
{
// now add a few more values:
r.Incr(solana.SysVarClockPubkey, 6)
r.Incr(solana.SysVarEpochSchedulePubkey, 5)
r.Incr(solana.SysVarFeesPubkey, 4)
r.Incr(solana.SysVarInstructionsPubkey, 3)
r.Incr(solana.SysVarRewardsPubkey, 2)

// there should be 6 values now
require.Equal(t, 6, r.set.Len())

// purge should remove the lowest values
r.purge()

// there should be 5 values now (equivalent values are kept)
require.Equal(t, 5, r.set.Len())

// the lowest value should be 2
require.Equal(t, 2, r.minValue)
require.Equal(t, 6, r.maxValue)
}
}
}
17 changes: 15 additions & 2 deletions http-range.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,26 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) {
prefix := icon + "[READ-UNKNOWN]"
if isIndex {
prefix = icon + azureBG("[READ-INDEX]")
} else if isCar {

// get the index name, which is the part before the .index suffix, after the last .
indexName := strings.TrimSuffix(r.name, ".index")
// split the index name by . and get the last part
byDot := strings.Split(indexName, ".")
if len(byDot) > 0 {
indexName = byDot[len(byDot)-1]
}
// TODO: distinguish between remote and local index reads
metrics.IndexLookupHistogram.WithLabelValues(indexName).Observe(float64(took.Seconds()))
}
// if has suffix .car, then it's a car file
if isCar {
if r.isSplitCar {
prefix = icon + azureBG("[READ-SPLIT-CAR]")
} else {
prefix = icon + purpleBG("[READ-CAR]")
}
carName := filepath.Base(r.name)
// TODO: distinguish between remote and local index reads
metrics.CarLookupHistogram.WithLabelValues(carName).Observe(float64(took.Seconds()))
}

klog.V(5).Infof(prefix+" %s:%d+%d (%s)\n", (r.name), off, len(p), took)
Expand Down
Loading

0 comments on commit dd83ea1

Please sign in to comment.