Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
round, _, err0 := ledgercore.ParseCatchpointLabel(label)

if err0 != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to patse label : %v", err0))
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0))
}

// download balances file.
Expand Down
6 changes: 4 additions & 2 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
maxCatchpointFileChunkSize = ledger.BalancesPerCatchpointFileChunk * basics.MaxEncodedAccountDataSize
// defaultMinCatchpointFileDownloadBytesPerSecond defines the worst-case scenario download speed we expect to get while downloading a catchpoint file
defaultMinCatchpointFileDownloadBytesPerSecond = 20 * 1024
// catchpointFileStreamReadSize defines the number of bytes we would attempt to read at each itration from the incoming http data stream
// catchpointFileStreamReadSize defines the number of bytes we would attempt to read at each iteration from the incoming http data stream
catchpointFileStreamReadSize = 4096
)

Expand Down Expand Up @@ -146,10 +146,12 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee
"writing balances to disk took %d seconds, "+
"writing creatables to disk took %d seconds, "+
"writing hashes to disk took %d seconds, "+
"writing kv pairs to disk took %d seconds, "+
"total duration is %d seconds",
downloadProgress.BalancesWriteDuration/time.Second,
downloadProgress.CreatablesWriteDuration/time.Second,
downloadProgress.HashesWriteDuration/time.Second,
downloadProgress.KVWriteDuration/time.Second,
writeDuration/time.Second)
}

Expand Down Expand Up @@ -191,5 +193,5 @@ func (lf *ledgerFetcher) getPeerLedger(ctx context.Context, peer network.HTTPPee
}

func (lf *ledgerFetcher) processBalancesBlock(ctx context.Context, sectionName string, bytes []byte, downloadProgress *ledger.CatchpointCatchupAccessorProgress) error {
return lf.accessor.ProgressStagingBalances(ctx, sectionName, bytes, downloadProgress)
return lf.accessor.ProcessStagingBalances(ctx, sectionName, bytes, downloadProgress)
}
67 changes: 65 additions & 2 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bufio"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
Expand Down Expand Up @@ -128,6 +130,10 @@ var fileCmd = &cobra.Command{
if err != nil {
reportErrorf("Unable to print account database : %v", err)
}
err = printKeyValueStore("./ledger.tracker.sqlite", outFile)
if err != nil {
reportErrorf("Unable to print key value store : %v", err)
}
}
},
}
Expand Down Expand Up @@ -176,7 +182,7 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
return fileHeader, err
}
}
err = catchupAccessor.ProgressStagingBalances(ctx, header.Name, balancesBlockBytes, &downloadProgress)
err = catchupAccessor.ProcessStagingBalances(ctx, header.Name, balancesBlockBytes, &downloadProgress)
if err != nil {
return fileHeader, err
}
Expand Down Expand Up @@ -380,7 +386,64 @@ func printAccountsDatabase(databaseName string, fileHeader ledger.CatchpointFile
}

// increase the deadline warning to disable the warning message.
db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(5*time.Second))
_, _ = db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(5*time.Second))
return err
})
}

func printKeyValue(writer *bufio.Writer, key, value []byte) {
var pretty string
ai, rest, err := logic.SplitBoxKey(string(key))
if err == nil {
pretty = fmt.Sprintf("box(%d, %s)", ai, base64.StdEncoding.EncodeToString([]byte(rest)))
} else {
pretty = base64.StdEncoding.EncodeToString(key)
}

fmt.Fprintf(writer, "%s : %v\n", pretty, base64.StdEncoding.EncodeToString(value))
}

func printKeyValueStore(databaseName string, outFile *os.File) error {
fmt.Printf("\n")
printDumpingCatchpointProgressLine(0, 50, 0)
lastProgressUpdate := time.Now()
progress := uint64(0)
defer printDumpingCatchpointProgressLine(0, 0, 0)

fileWriter := bufio.NewWriterSize(outFile, 1024*1024)
defer fileWriter.Flush()

dbAccessor, err := db.MakeAccessor(databaseName, true, false)
if err != nil || dbAccessor.Handle == nil {
return err
}

return dbAccessor.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var rowsCount int64
err := tx.QueryRow("SELECT count(*) from catchpointkvstore").Scan(&rowsCount)
if err != nil {
return err
}

rows, err := tx.Query("SELECT key, value FROM catchpointkvstore")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this need to be ordered. The result should be the same across all nodes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only so that the catchpoint dump utility prints in the same order? I want to check my understanding that the catchpoint files don't actually need to be identical (down to the level of ordering) for any other reason. All that matters for their use that they produce the same merkle trie, which is order independent. (I think.)

I'll certainly make the change, but I want to be sure I know what's going on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! This is actually in the catchpoint dump utility! So certainly we're talking about the same reason.

So now, just to be extra safe, can you confirm that the similar SELECT, in catchpointwriter.go does NOT need to be ordered?

	// Create the *Rows iterator JIT
	if cw.kvRows == nil {
		rows, err := tx.QueryContext(ctx, "SELECT key, value FROM kvstore")
		if err != nil {
			return err
		}
		cw.kvRows = rows
	}

if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
progress++
var key []byte
var value []byte
err := rows.Scan(&key, &value)
if err != nil {
return err
}
printKeyValue(fileWriter, key, value)
if time.Since(lastProgressUpdate) > 50*time.Millisecond {
lastProgressUpdate = time.Now()
printDumpingCatchpointProgressLine(int(float64(progress)*50.0/float64(rowsCount)), 50, int64(progress))
}
}
return nil
})
}
5 changes: 5 additions & 0 deletions cmd/catchpointdump/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ func loadAndDump(addr string, tarFile string, genesisInitState ledgercore.InitSt
if err != nil {
return err
}
err = printKeyValueStore("./ledger.tracker.sqlite", outFile)
if err != nil {
return err
}

}
return nil
}
4 changes: 2 additions & 2 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (m *MockCatchpointCatchupAccessor) ResetStagingBalances(ctx context.Context
return nil
}

// ProgressStagingBalances deserialize the given bytes as a temporary staging balances
func (m *MockCatchpointCatchupAccessor) ProgressStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *ledger.CatchpointCatchupAccessorProgress) (err error) {
// ProcessStagingBalances deserialize the given bytes as a temporary staging balances
func (m *MockCatchpointCatchupAccessor) ProcessStagingBalances(ctx context.Context, sectionName string, bytes []byte, progress *ledger.CatchpointCatchupAccessorProgress) (err error) {
return nil
}

Expand Down
6 changes: 1 addition & 5 deletions data/transactions/logic/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (sv stackValue) address() (addr basics.Address, err error) {

func (sv stackValue) uint() (uint64, error) {
if sv.Bytes != nil {
return 0, errors.New("not a uint64")
return 0, fmt.Errorf("%#v is not a uint64", sv.Bytes)
}
return sv.Uint, nil
}
Expand Down Expand Up @@ -648,10 +648,6 @@ func (st StackType) Typed() bool {
return false
}

func (sts StackTypes) plus(other StackTypes) StackTypes {
return append(sts, other...)
}

// PanicError wraps a recover() catching a panic()
type PanicError struct {
PanicValue interface{}
Expand Down
6 changes: 6 additions & 0 deletions data/transactions/logic/evalAppTxn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func TestFieldTypes(t *testing.T) {
TestApp(t, NoTrack("itxn_begin; byte 0x01; itxn_field XferAsset;"), ep, "not a uint64")
TestApp(t, NoTrack("itxn_begin; byte 0x01; itxn_field AssetAmount;"), ep, "not a uint64")

// get coverage on uintMaxed()
TestApp(t, NoTrack("itxn_begin; byte \"pay\"; itxn_field ExtraProgramPages;"), ep, "not a uint64")
// get coverage on bool()
TestApp(t, NoTrack("itxn_begin; byte \"pay\"; itxn_field Nonparticipation;"), ep, "not a uint64")
}

func appAddr(id int) basics.Address {
Expand Down Expand Up @@ -770,6 +774,8 @@ func TestFieldSetting(t *testing.T) {
"not an address")

TestApp(t, "itxn_begin; int 6; bzero; itxn_field ConfigAssetUnitName; int 1", ep)
TestApp(t, NoTrack("itxn_begin; int 6; itxn_field ConfigAssetUnitName; int 1"), ep,
"not a byte array")
TestApp(t, "itxn_begin; int 7; bzero; itxn_field ConfigAssetUnitName; int 1", ep,
"value is too long")

Expand Down
5 changes: 5 additions & 0 deletions data/transactions/logic/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4681,9 +4681,11 @@ func TestBytesMath(t *testing.T) {
testAccepts(t, "byte 0x01; byte 0x01; b/; byte 0x01; ==", 4)
testPanics(t, "byte 0x0200; byte b64(); b/; int 1; return", 4)
testPanics(t, "byte 0x01; byte 0x00; b/; int 1; return", 4)
testPanics(t, "int 65; bzero; byte 0x01; b/; int 1; return", 4)

testAccepts(t, "byte 0x10; byte 0x07; b%; byte 0x02; ==; return", 4)
testPanics(t, "byte 0x01; byte 0x00; b%; int 1; return", 4)
testPanics(t, "int 65; bzero; byte 0x10; b%", 4)

// Even 128 byte outputs are ok
testAccepts(t, fmt.Sprintf("byte 0x%s; byte 0x%s; b*; len; int 128; ==", effs, effs), 4)
Expand All @@ -4708,6 +4710,7 @@ func TestBytesCompare(t *testing.T) {

testAccepts(t, "byte 0x10; byte 0x10; b<; !", 4)
testAccepts(t, "byte 0x10; byte 0x10; b<=", 4)
testPanics(t, "byte 0x10; int 65; bzero; b<=", 4)
testAccepts(t, "byte 0x10; int 64; bzero; b>", 4)
testPanics(t, "byte 0x10; int 65; bzero; b>", 4)

Expand All @@ -4716,6 +4719,7 @@ func TestBytesCompare(t *testing.T) {

testAccepts(t, "byte 0x11; byte 0x10; b>=", 4)
testAccepts(t, "byte 0x11; byte 0x0011; b>=", 4)
testPanics(t, "byte 0x10; int 65; bzero; b>=", 4)

testAccepts(t, "byte 0x11; byte 0x11; b==", 4)
testAccepts(t, "byte 0x0011; byte 0x11; b==", 4)
Expand All @@ -4726,6 +4730,7 @@ func TestBytesCompare(t *testing.T) {
testAccepts(t, "byte 0x11; byte 0x00; b!=", 4)
testAccepts(t, "byte 0x0011; byte 0x1100; b!=", 4)
testPanics(t, notrack("byte 0x11; int 17; b!="), 4)
testPanics(t, "byte 0x10; int 65; bzero; b!=", 4)
}

func TestBytesBits(t *testing.T) {
Expand Down
44 changes: 40 additions & 4 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,14 +1102,14 @@ func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, bals []nor
if resData.IsOwning() {
// determine if it's an asset
if resData.IsAsset() {
_, err := insertCreatorsStmt.ExecContext(ctx, basics.CreatableIndex(aidx), balance.address[:], basics.AssetCreatable)
_, err := insertCreatorsStmt.ExecContext(ctx, aidx, balance.address[:], basics.AssetCreatable)
if err != nil {
return err
}
}
// determine if it's an application
if resData.IsApp() {
_, err := insertCreatorsStmt.ExecContext(ctx, basics.CreatableIndex(aidx), balance.address[:], basics.AppCreatable)
_, err := insertCreatorsStmt.ExecContext(ctx, aidx, balance.address[:], basics.AppCreatable)
if err != nil {
return err
}
Expand All @@ -1120,13 +1120,44 @@ func writeCatchpointStagingCreatable(ctx context.Context, tx *sql.Tx, bals []nor
return nil
}

// writeCatchpointStagingKVs inserts all the KVs in the provided array into the
// catchpoint kvstore staging table catchpointkvstore, and their hashes to the pending
func writeCatchpointStagingKVs(ctx context.Context, tx *sql.Tx, kvrs []encodedKVRecordV6) error {
insertKV, err := tx.PrepareContext(ctx, "INSERT INTO catchpointkvstore(key, value) VALUES(?, ?)")
if err != nil {
return err
}
defer insertKV.Close()

insertHash, err := tx.PrepareContext(ctx, "INSERT INTO catchpointpendinghashes(data) VALUES(?)")
if err != nil {
return err
}
defer insertHash.Close()

for _, kvr := range kvrs {
_, err := insertKV.ExecContext(ctx, kvr.Key, kvr.Value)
if err != nil {
return err
}

hash := kvHashBuilderV6(string(kvr.Key), string(kvr.Value))
_, err = insertHash.ExecContext(ctx, hash)
if err != nil {
return err
}
}
return nil
}

func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup bool) (err error) {
s := []string{
"DROP TABLE IF EXISTS catchpointbalances",
"DROP TABLE IF EXISTS catchpointassetcreators",
"DROP TABLE IF EXISTS catchpointaccounthashes",
"DROP TABLE IF EXISTS catchpointpendinghashes",
"DROP TABLE IF EXISTS catchpointresources",
"DROP TABLE IF EXISTS catchpointkvstore",
"DELETE FROM accounttotals where id='catchpointStaging'",
}

Expand All @@ -1147,6 +1178,8 @@ func resetCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, newCatchup
"CREATE TABLE IF NOT EXISTS catchpointpendinghashes (data blob)",
"CREATE TABLE IF NOT EXISTS catchpointaccounthashes (id integer primary key, data blob)",
"CREATE TABLE IF NOT EXISTS catchpointresources (addrid INTEGER NOT NULL, aidx INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY (addrid, aidx) ) WITHOUT ROWID",
"CREATE TABLE IF NOT EXISTS catchpointkvstore (key blob primary key, value blob)",

createNormalizedOnlineBalanceIndex(idxnameBalances, "catchpointbalances"), // should this be removed ?
createUniqueAddressBalanceIndex(idxnameAddress, "catchpointbalances"),
)
Expand All @@ -1170,11 +1203,13 @@ func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRou
"DROP TABLE IF EXISTS assetcreators",
"DROP TABLE IF EXISTS accounthashes",
"DROP TABLE IF EXISTS resources",
"DROP TABLE IF EXISTS kvstore",

"ALTER TABLE catchpointbalances RENAME TO accountbase",
"ALTER TABLE catchpointassetcreators RENAME TO assetcreators",
"ALTER TABLE catchpointaccounthashes RENAME TO accounthashes",
"ALTER TABLE catchpointresources RENAME TO resources",
"ALTER TABLE catchpointkvstore RENAME TO kvstore",
}

for _, stmt := range stmts {
Expand Down Expand Up @@ -4240,8 +4275,9 @@ func (iterator *encodedAccountsBatchIter) Next(ctx context.Context, tx *sql.Tx,
iterator.Close()
return
}
// we just finished reading the table.
iterator.Close()
// Do not Close() the iterator here. It is the caller's responsibility to
// do so, signalled by the return of an empty chunk. If we Close() here, the
// next call to Next() will start all over!
return
}

Expand Down
8 changes: 4 additions & 4 deletions ledger/accountdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,8 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
last64KSize = chunkSize
last64KAccountCreationTime = time.Duration(0)
}
var balances catchpointFileBalancesChunkV6
balances.Balances = make([]encodedBalanceRecordV6, chunkSize)
var chunk catchpointFileChunkV6
chunk.Balances = make([]encodedBalanceRecordV6, chunkSize)
for i := uint64(0); i < chunkSize; i++ {
var randomAccount encodedBalanceRecordV6
accountData := baseAccountData{RewardsBase: accountsLoaded + i}
Expand All @@ -1035,13 +1035,13 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
if ascendingOrder {
binary.LittleEndian.PutUint64(randomAccount.Address[:], accountsLoaded+i)
}
balances.Balances[i] = randomAccount
chunk.Balances[i] = randomAccount
}
balanceLoopDuration := time.Since(balancesLoopStart)
last64KAccountCreationTime += balanceLoopDuration
accountsGenerationDuration += balanceLoopDuration

normalizedAccountBalances, err := prepareNormalizedBalancesV6(balances.Balances, proto)
normalizedAccountBalances, err := prepareNormalizedBalancesV6(chunk.Balances, proto)
require.NoError(b, err)
b.StartTimer()
err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
Expand Down
Loading