Skip to content

Commit

Permalink
Merge pull request #548 from 0xPolygonHermez/rick/testing_KVDatabaseR…
Browse files Browse the repository at this point in the history
…emote

Fixing issues in key-value database
  • Loading branch information
rickb80 authored Sep 14, 2023
2 parents bf042de + d299ccc commit 4fcffd3
Show file tree
Hide file tree
Showing 13 changed files with 429 additions and 205 deletions.
2 changes: 1 addition & 1 deletion src/config/zkresult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct
{ ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH, "ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH" },
{ ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE" },
{ ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE" },
{ ZKR_DB_VERSION_NOT_FOUND, "ZKR_DB_VERSION_NOT_FOUND" },
{ ZKR_DB_VERSION_NOT_FOUND_KVDB, "ZKR_DB_VERSION_NOT_FOUND_KBDB" },
{ ZKR_DB_VERSION_NOT_FOUND_GLOBAL, "ZKR_DB_VERSION_NOT_FOUND_GLOBAL"}


Expand Down
2 changes: 1 addition & 1 deletion src/config/zkresult.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ typedef enum : int
ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH = 78, // Main state memory align read ROM operation check failed
ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE = 79, // Main state Keccak hash check found read out of range
ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE = 80, // Main state Poseidon hash check found read out of range
ZKR_DB_VERSION_NOT_FOUND = 81, // Version not found in KeyValue database
ZKR_DB_VERSION_NOT_FOUND_KVDB = 81, // Version not found in KeyValue database
ZKR_DB_VERSION_NOT_FOUND_GLOBAL = 82, // Version not found in KeyValue database and not present in hashDB neither


Expand Down
223 changes: 109 additions & 114 deletions src/hashdb64/database_64.cpp

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions src/hashdb64/database_64.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,22 @@ class Database64
pthread_t senderPthread; // Database sender thread
pthread_t cacheSynchPthread; // Cache synchronization thread
int maxVersions; // Maximum number of versions to store in the database KV
int maxVersionsUpload; // Maximum number of versions to upload from the database KV to the cache when there is a cache miss

private:
// Remote database based on Postgres (PostgreSQL)
void initRemote(void);
zkresult readRemote(bool bProgram, const string &key, string &value);
zkresult writeRemote(bool bProgram, const string &key, const string &value);

zkresult readRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class value);
zkresult writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool noMultiWrite = false);
zkresult readRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class value, vector<VersionValue> &upstreamVersionValues);
zkresult writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool useMultiWrite = true);
zkresult readRemoteVersion(const Goldilocks::Element (&root)[4], uint64_t version);
zkresult writeRemoteVersion(const Goldilocks::Element (&root)[4], const uint64_t version);
zkresult readRemoteLatestVersion(uint64_t &version);
zkresult writeRemoteLatestVersion(const uint64_t version);

zkresult extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value);
zkresult extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value, vector<VersionValue> &upstreamVersionValues);

public:
#ifdef DATABASE_USE_CACHE
Expand Down
191 changes: 172 additions & 19 deletions src/hashdb64/database_kv_associative_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ DatabaseKVAssociativeCache::DatabaseKVAssociativeCache()
indexesSize = 0;
log2CacheSize = 0;
cacheSize = 0;
maxVersions = 100; //rick: as parameter
maxVersions = 0;
indexes = NULL;
keys = NULL;
values = NULL;
Expand Down Expand Up @@ -61,8 +61,7 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac
exitProcess();
}
cacheSize = 1 << log2CacheSize_;
maxVersions = 100; //rick: as parameter

maxVersions = cacheSize;

if(indexes != NULL) delete[] indexes;
indexes = new uint32_t[indexesSize];
Expand All @@ -81,6 +80,11 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac

if(versions != NULL) delete[] versions;
versions = new uint64_t[2 * cacheSize];
#pragma omp parallel for schedule(static) num_threads(4)
for (size_t i = 0; i < cacheSize; i++)
{
versions[i*2+1] = UINT64_MAX;
}

currentCacheIndex = 0;
attempts = 0;
Expand All @@ -92,15 +96,14 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac
indexesMask = indexesSize - 1;
};

void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool update){
void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value){

lock_guard<recursive_mutex> guard(mlock);
bool emptySlot = false;
bool present = false;
bool presentSameVersion = false;
uint32_t cacheIndex;
uint32_t tableIndexUse=0;
uint32_t cacheIndexPrev;
uint64_t cacheIndexPrev=UINT64_MAX;

//
// Check if present in one of the four slots
Expand All @@ -114,6 +117,7 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexVersions = cacheIndex * 2;
uint32_t cacheIndexValue = cacheIndex;

if (!emptyCacheSlot(cacheIndexRaw)){
if( keys[cacheIndexKey + 0].fe == key[0].fe &&
Expand All @@ -122,9 +126,16 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
keys[cacheIndexKey + 3].fe == key[3].fe){
present = true;
if(versions[cacheIndexVersions] == version){
presentSameVersion = true;
if(update == false) return;
return; //no update
} else if (version < versions[cacheIndexVersions]){
zklog.error("DatabaseKVAssociativeCache::addKeyValueVersion() adding lower version than the current one");
exitProcess();
} else {
if(value==values[cacheIndexValue]){
return;
}
}

tableIndexUse = tableIndex;
cacheIndexPrev = cacheIndex;
break;
Expand All @@ -138,13 +149,12 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
//
// Evaluate cacheIndexKey and
//
if(!presentSameVersion){
if(emptySlot == true || present){
indexes[tableIndexUse] = currentCacheIndex;
}
cacheIndex = (uint32_t)(currentCacheIndex & cacheMask);
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);
if(emptySlot == true || present){
indexes[tableIndexUse] = currentCacheIndex;
}
cacheIndex = (uint32_t)(currentCacheIndex & cacheMask);
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);

uint64_t cacheIndexKey, cacheIndexValue, cacheIndexVersions;
cacheIndexKey = cacheIndex * 4;
cacheIndexValue = cacheIndex;
Expand All @@ -159,10 +169,10 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
keys[cacheIndexKey + 3].fe = key[3].fe;
values[cacheIndexValue] = value;
versions[cacheIndexVersions] = version;
if(present & !presentSameVersion){
if(present){
versions[cacheIndexVersions+1] = cacheIndexPrev;
}else{
versions[cacheIndexVersions+1] = 0;
versions[cacheIndexVersions+1] = UINT64_MAX;
}
//
// Forced index insertion
Expand All @@ -175,6 +185,148 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
}
}

void DatabaseKVAssociativeCache::downstreamAddKeyZeroVersion(const uint64_t version, const Goldilocks::Element (&key)[4]){

lock_guard<recursive_mutex> guard(mlock);
bool presentSameVersion = false;
uint32_t cacheIndexZero = 0;
//
// Check if present in one of the four slots
//
Goldilocks::Element key_hashed[4];
hashKey(key_hashed, key);
bool breakOuterLoop = false;
for (int i = 0; i < 4; ++i)
{
uint32_t tableIndex = (uint32_t)(key_hashed[i].fe & indexesMask);
uint32_t cacheIndexRaw = indexes[tableIndex];
uint32_t cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexVersions = cacheIndex * 2;

if (!emptyCacheSlot(cacheIndexRaw)){
for(int j=0; j<maxVersions; j++){
if (keys[cacheIndexKey + 0].fe == key[0].fe &&
keys[cacheIndexKey + 1].fe == key[1].fe &&
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe){

if( versions[cacheIndexVersions] == version &&
versions[cacheIndexVersions+1] == UINT64_MAX){ //if linked zero can not be added){

presentSameVersion = true;
cacheIndexZero = currentCacheIndex & cacheMask;
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);
versions[cacheIndexVersions+1] = cacheIndexZero;
breakOuterLoop = true;
break;
}
if(versions[cacheIndexVersions+1]==UINT64_MAX) return; //No more versions
cacheIndex = versions[cacheIndexVersions+1] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexVersions = cacheIndex * 2;

}else{
if(j>0){
return;
}else{
break;
}
}
}
}
if(breakOuterLoop) break;
}

//
// Evaluate cacheIndexKey and
//
if(presentSameVersion){

uint64_t cacheIndexKey = cacheIndexZero * 4;
uint64_t cacheIndexValue = cacheIndexZero;
uint64_t cacheIndexVersions = cacheIndexZero * 2;

//
// Add value
//
keys[cacheIndexKey + 0].fe = key[0].fe;
keys[cacheIndexKey + 1].fe = key[1].fe;
keys[cacheIndexKey + 2].fe = key[2].fe;
keys[cacheIndexKey + 3].fe = key[3].fe;
values[cacheIndexValue] = 0;
versions[cacheIndexVersions] = 0;
versions[cacheIndexVersions+1] = UINT64_MAX;
}
return;

}

void DatabaseKVAssociativeCache::uploadKeyValueVersions(const Goldilocks::Element (&key)[4], vector<VersionValue> &versionsValues){

lock_guard<recursive_mutex> guard(mlock);
//Get last version for the key
vector<uint64_t> versions;
getLastCachedVersions(key, versions, versionsValues.size());
if(versions.size()==0){
for (std::vector<VersionValue>::reverse_iterator it = versionsValues.rbegin(); it != versionsValues.rend(); ++it) {
addKeyValueVersion(it->version, key, it->value);
}
}else{
vector<VersionValue>::const_iterator it = versionsValues.begin();
for(vector<uint64_t>::const_iterator it2 = versions.begin(); it2 != versions.end(); ++it2, ++it){
if(*it2 != it->version){
zklog.error("DatabaseKVAssociativeCache::uploadKeyValueVersions() versions mismatch between cache and db");
exitProcess();
}
}
//If vector.zize() < versionsValues.size() we could add some backward versions in the cache but not supported yet
}
}

void DatabaseKVAssociativeCache::getLastCachedVersions(const Goldilocks::Element (&key)[4], vector<uint64_t> &versions, const int maxVersionsOut){

versions.clear();
lock_guard<recursive_mutex> guard(mlock);
//
// Find the value
//
Goldilocks::Element key_hashed[4];
hashKey(key_hashed, key);
for (int i = 0; i < 4; i++)
{
uint32_t cacheIndexRaw = indexes[key_hashed[i].fe & indexesMask];
if (emptyCacheSlot(cacheIndexRaw)) continue;

uint32_t cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexVersions = cacheIndex * 2;

for(int j=0; j<maxVersionsOut; j++){
if (keys[cacheIndexKey + 0].fe == key[0].fe &&
keys[cacheIndexKey + 1].fe == key[1].fe &&
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe){

versions.push_back(versions[cacheIndexVersions]);
if(versions[cacheIndexVersions+1]==UINT64_MAX) return; //No more versions
cacheIndex = versions[cacheIndexVersions+1] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexVersions = cacheIndex * 2;

}else{
if(j>0){
return;
}else{
break;
}
}
}
}
return;

}

void DatabaseKVAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)[10], int &iters)
{
uint32_t inputRawCacheIndex = usedRawCacheIndexes[iters];
Expand All @@ -191,7 +343,7 @@ void DatabaseKVAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)
// find a slot into my indexes
//
Goldilocks::Element key_hashed[4];
hashKey_p(key_hashed, &keys[(inputRawCacheIndex & cacheMask) * 4]);
hashKey(key_hashed, (Goldilocks::Element(&)[4])keys[(inputRawCacheIndex & cacheMask) * 4]);
Goldilocks::Element *inputKey = &key_hashed[0];
uint32_t minRawCacheIndex = UINT32_MAX;
int pos = -1;
Expand Down Expand Up @@ -265,16 +417,17 @@ bool DatabaseKVAssociativeCache::findKey( const uint64_t version, const Goldiloc
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe){

if( versions[cacheIndexVersions] <= version){ //rick: I assume they are ordered
if( version >= versions[cacheIndexVersions] ){
uint32_t cacheIndexValue = cacheIndex;
++hits;
value = values[cacheIndexValue];
return true;
}
if(versions[cacheIndexVersions+1]==UINT64_MAX) return false; //No more versions
cacheIndex = versions[cacheIndexVersions+1] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexVersions = cacheIndex * 2;

}else{
if(j>0){
return false;
Expand Down
17 changes: 6 additions & 11 deletions src/hashdb64/database_kv_associative_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "zklog.hpp"
#include "zkmax.hpp"
#include "poseidon_goldilocks.hpp"
#include "version_value.hpp"


using namespace std;
Expand Down Expand Up @@ -42,8 +43,12 @@ class DatabaseKVAssociativeCache
~DatabaseKVAssociativeCache();
void postConstruct(int log2IndexesSize_, int log2CacheSize_, string name_);

void addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool update);
void addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value);
void downstreamAddKeyZeroVersion(const uint64_t version, const Goldilocks::Element (&key)[4]);
void uploadKeyValueVersions(const Goldilocks::Element (&key)[4], vector<VersionValue> &versionsValues);
bool findKey( const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class &value);
void getLastCachedVersions(const Goldilocks::Element (&key)[4], vector<uint64_t> &versions, const int maxVersions);


inline bool enabled() const { return (log2IndexesSize > 0); };
inline uint32_t getCacheSize() const { return cacheSize; };
Expand All @@ -66,16 +71,6 @@ class DatabaseKVAssociativeCache
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};
inline void hashKey_p(Goldilocks::Element (&keyOut)[4], const Goldilocks::Element * keyIn) const{ //rick: convertir
Goldilocks::Element key_hash_imput[12];
for(int i=0; i<4; i++){
key_hash_imput[i] = keyIn[i];
key_hash_imput[i+4] = keyIn[i];
key_hash_imput[i+8] = keyIn[i];
}
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};

};
#endif
2 changes: 1 addition & 1 deletion src/hashdb64/database_versions_associtive_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void DatabaseVersionsAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIn
// find a slot into my indexes
//
Goldilocks::Element key_hashed[4];
hashKey_p(key_hashed, &keys[(inputRawCacheIndex & cacheMask) * 4]);
hashKey(key_hashed, (Goldilocks::Element(&)[4])keys[(inputRawCacheIndex & cacheMask) * 4]);
Goldilocks::Element *inputKey = &key_hashed[0];
uint32_t minRawCacheIndex = UINT32_MAX;
int pos = -1;
Expand Down
10 changes: 0 additions & 10 deletions src/hashdb64/database_versions_associtive_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ class DatabaseVersionsAssociativeCache
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};
inline void hashKey_p(Goldilocks::Element (&keyOut)[4], const Goldilocks::Element * keyIn) const{ //rick: convertir
Goldilocks::Element key_hash_imput[12];
for(int i=0; i<4; i++){
key_hash_imput[i] = keyIn[i];
key_hash_imput[i+4] = keyIn[i];
key_hash_imput[i+8] = keyIn[i];
}
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};
};
#endif

Loading

0 comments on commit 4fcffd3

Please sign in to comment.