Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add l2 caching for alloc #1258

Merged
merged 24 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func setupMockForFileManagerInit(mock sqlmock.Sqlmock) {
WillReturnRows(
sqlmock.NewRows([]string{"file_size"}).AddRow(6553600),
)

mock.ExpectCommit()
}

func init() {
Expand Down
183 changes: 163 additions & 20 deletions code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
lru "github.com/hashicorp/golang-lru/v2"
"go.uber.org/zap"
"gorm.io/gorm"
"gorm.io/gorm/clause"
Expand All @@ -14,17 +15,29 @@ import (
const (
SQLWhereGetById = "allocations.id = ?"
SQLWhereGetByTx = "allocations.tx = ?"
lruSize = 100
)

var (
Repo *Repository
)

type AllocationUpdate func(a *Allocation)

func init() {
Repo = &Repository{}
allocCache, _ := lru.New[string, *Allocation](lruSize)
Repo = &Repository{
allocCache: allocCache,
}
}

type Repository struct {
allocCache *lru.Cache[string, *Allocation]
}

type AllocationCache struct {
Allocation *Allocation
AllocationUpdates []AllocationUpdate
}

type Res struct {
Expand All @@ -43,6 +56,11 @@ func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error
}

if a, ok := cache[id]; ok {
return a.Allocation, nil
}

a := r.getAllocFromGlobalCache(id)
if a != nil {
return a, nil
}

Expand All @@ -52,8 +70,10 @@ func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error
return alloc, err
}

cache[id] = alloc

cache[id] = AllocationCache{
Allocation: alloc,
}
r.setAllocToGlobalCache(alloc)
return alloc, nil
}

Expand All @@ -69,16 +89,17 @@ func (r *Repository) GetByIdAndLock(ctx context.Context, id string) (*Allocation
}

alloc := &Allocation{}

err = tx.Model(&Allocation{}).
Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).
Where("id=?", id).
Take(alloc).Error
if err != nil {
return alloc, err
}
cache[id] = alloc

cache[id] = AllocationCache{
Allocation: alloc,
}
r.setAllocToGlobalCache(alloc)
return alloc, err
}

Expand All @@ -93,32 +114,49 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (
return nil, err
}
if a, ok := cache[allocationID]; ok {
if a.Tx == txHash {
return a, nil
if a.Allocation.Tx == txHash {
return a.Allocation, nil
}
}

a := r.getAllocFromGlobalCache(allocationID)
if a != nil && a.Tx == txHash {
return a, nil
}

alloc := &Allocation{}
err = tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, txHash).Take(alloc).Error
if err != nil {
return alloc, err
}
cache[allocationID] = alloc

cache[allocationID] = AllocationCache{
Allocation: alloc,
}
r.setAllocToGlobalCache(alloc)
return alloc, err
}

func (r *Repository) GetAllocations(ctx context.Context, offset int64) ([]*Allocation, error) {
var tx = datastore.GetStore().GetTransaction(ctx)

const query = `finalized = false AND cleaned_up = false`
allocs := make([]*Allocation, 0)
return allocs, tx.Model(&Allocation{}).
allocs := make([]*Allocation, 0, 10)
err := tx.Model(&Allocation{}).
Where(query).
Limit(UPDATE_LIMIT).
Offset(int(offset)).
Order("id ASC").
Find(&allocs).Error
if err != nil {
return allocs, err
}
for ind, alloc := range allocs {
if ind == lruSize {
break
}
r.setAllocToGlobalCache(alloc)
}
return allocs, nil
}

func (r *Repository) GetAllocationIds(ctx context.Context) []Res {
Expand Down Expand Up @@ -149,40 +187,145 @@ func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, A
if err != nil {
return err
}
delete(cache, allocationID)

allocationUpdates := make(map[string]interface{})
allocationUpdates["latest_redeemed_write_marker"] = AllocationRoot
allocationUpdates["is_redeem_required"] = false
err = tx.Model(allocationObj).Updates(allocationUpdates).Error
return err
if err != nil {
return err
}
allocationObj.LatestRedeemedWM = AllocationRoot
dabasov marked this conversation as resolved.
Show resolved Hide resolved
allocationObj.IsRedeemRequired = false
txnCache := cache[allocationID]
txnCache.Allocation = allocationObj
updateAlloc := func(a *Allocation) {
a.LatestRedeemedWM = AllocationRoot
a.IsRedeemRequired = false
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[allocationID] = txnCache
return nil
}

func (r *Repository) Save(ctx context.Context, a *Allocation) error {
func (r *Repository) UpdateAllocation(ctx context.Context, allocationObj *Allocation, updateMap map[string]interface{}, updateOption AllocationUpdate) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}
cache, err := getCache(tx)
if err != nil {
return err
}
err = tx.Model(allocationObj).Updates(updateMap).Error
if err != nil {
return err
}
txnCache := cache[allocationObj.ID]
txnCache.Allocation = allocationObj
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateOption)
cache[allocationObj.ID] = txnCache
return nil
}

func (r *Repository) Commit(tx *datastore.EnhancedDB) {
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}
cache, _ := getCache(tx)
if cache == nil {
return
}
for _, txnCache := range cache {
var alloc *Allocation
for _, update := range txnCache.AllocationUpdates {
alloc = r.getAllocFromGlobalCache(txnCache.Allocation.ID)
if alloc != nil {
update(alloc)
}
}
if alloc != nil {
r.setAllocToGlobalCache(alloc)
}
}
}

func (r *Repository) Save(ctx context.Context, alloc *Allocation) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}

cache, err := getCache(tx)
if err != nil {
return err
}

txnCache := cache[alloc.ID]
txnCache.Allocation = alloc
err = tx.Save(alloc).Error
if err != nil {
return err
}
updateAlloc := func(a *Allocation) {
*a = *alloc
}
txnCache.AllocationUpdates = append(txnCache.AllocationUpdates, updateAlloc)
cache[alloc.ID] = txnCache
return nil
}

func (r *Repository) Create(ctx context.Context, alloc *Allocation) error {
var tx = datastore.GetStore().GetTransaction(ctx)
if tx == nil {
logging.Logger.Panic("no transaction in the context")
}
cache, err := getCache(tx)
if err != nil {
return err
}

cache[a.ID] = a
return tx.Save(a).Error
txnCache := cache[alloc.ID]
txnCache.Allocation = alloc
err = tx.Create(alloc).Error
if err != nil {
return err
}
cache[alloc.ID] = txnCache
return nil
}

func getCache(tx *datastore.EnhancedDB) (map[string]*Allocation, error) {
func getCache(tx *datastore.EnhancedDB) (map[string]AllocationCache, error) {
c, ok := tx.SessionCache[TableNameAllocation]
if ok {
cache, ok := c.(map[string]*Allocation)
cache, ok := c.(map[string]AllocationCache)
if !ok {
return nil, fmt.Errorf("type assertion failed")
}
return cache, nil
}
cache := make(map[string]*Allocation)
cache := make(map[string]AllocationCache)
tx.SessionCache[TableNameAllocation] = cache
if tx.CommitAllocCache == nil {
tx.CommitAllocCache = func(tx *datastore.EnhancedDB) {
Repo.Commit(tx)
}
}
return cache, nil
}

func (r *Repository) getAllocFromGlobalCache(id string) *Allocation {
a, ok := r.allocCache.Get(id)
if !ok {
return nil
}
return a
}

func (r *Repository) setAllocToGlobalCache(a *Allocation) {
r.allocCache.Add(a.ID, a)
}

func (r *Repository) DeleteAllocation(allocationID string) {
r.allocCache.Remove(allocationID)
}
41 changes: 33 additions & 8 deletions code/go/0chain.net/blobbercore/allocation/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package allocation
import (
"context"
"encoding/json"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"math"
"time"

"github.com/0chain/blobber/code/go/0chain.net/core/node"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
Expand Down Expand Up @@ -49,10 +51,12 @@ func UpdateWorker(ctx context.Context, interval time.Duration) {
for {
select {
case <-tick:
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
updateCtx := datastore.GetStore().CreateTransaction(context.TODO())
_ = datastore.GetStore().WithTransaction(updateCtx, func(ctx context.Context) error {
updateWork(ctx)
return nil
})
updateCtx.Done()
case <-quit:
return
}
Expand Down Expand Up @@ -239,17 +243,42 @@ func requestExpiredAllocations() (allocs []string, err error) {
func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.StorageAllocation) (ua *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)
var changed bool = a.Tx != sa.Tx
if !changed {
return a, nil
}

// transaction
a.Tx = sa.Tx
a.OwnerID = sa.OwnerID
a.OwnerPublicKey = sa.OwnerPublicKey

// update fields
// // update fields
a.Expiration = sa.Expiration
a.TotalSize = sa.Size
a.Finalized = sa.Finalized
a.FileOptions = sa.FileOptions
a.BlobberSize = int64(math.Ceil(float64(sa.Size) / float64(sa.DataShards)))

updateMap := make(map[string]interface{})
updateMap["tx"] = a.Tx
updateMap["owner_id"] = a.OwnerID
updateMap["owner_public_key"] = a.OwnerPublicKey
updateMap["expiration"] = a.Expiration
updateMap["total_size"] = a.TotalSize
updateMap["finalized"] = a.Finalized
updateMap["file_options"] = a.FileOptions
updateMap["blobber_size"] = a.BlobberSize

updateOption := func(alloc *Allocation) {
alloc.Tx = a.Tx
alloc.OwnerID = a.OwnerID
alloc.OwnerPublicKey = a.OwnerPublicKey
alloc.Expiration = a.Expiration
alloc.TotalSize = a.TotalSize
alloc.Finalized = a.Finalized
alloc.FileOptions = a.FileOptions
alloc.BlobberSize = a.BlobberSize
}

// update terms
a.Terms = make([]*Terms, 0, len(sa.BlobberDetails))
Expand All @@ -263,14 +292,10 @@ func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.St
}

// save allocations
if err := Repo.Save(ctx, a); err != nil {
if err := Repo.UpdateAllocation(ctx, a, updateMap, updateOption); err != nil {
return nil, err
}

if !changed {
return a, nil
}

// save allocation terms
for _, t := range a.Terms {
if err := tx.Save(t).Error; err != nil {
Expand Down
2 changes: 1 addition & 1 deletion code/go/0chain.net/blobbercore/allocation/zcn.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func SyncAllocation(allocationId string) (*Allocation, error) {

err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
var e error
if e := Repo.Save(ctx, alloc); e != nil {
if e := Repo.Create(ctx, alloc); e != nil {
return e
}
tx := datastore.GetStore().GetTransaction(ctx)
Expand Down
Loading
Loading