Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lwm #1338

Merged
merged 6 commits into from
Dec 5, 2023
Merged

Fix lwm #1338

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func (cc *AllocationChangeCollector) ComputeProperties() {
func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocationRoot string,
ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {
rootRef, err := cc.GetRootRef(ctx)
logging.Logger.Info("GetRootRef", zap.Any("rootRef", rootRef))
if err != nil {
return rootRef, err
}
Expand Down
31 changes: 30 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,36 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc

logging.Logger.Info("Saving the allocation to DB")

err = Repo.Save(ctx, a)
if !isExist {
err = Repo.Save(ctx, a)
} else {
updateMap := map[string]interface{}{
"tx": a.Tx,
"expiration": a.Expiration,
"owner_id": a.OwnerID,
"owner_public_key": a.OwnerPublicKey,
"repairer_id": a.RepairerID,
"total_size": a.TotalSize,
"finalized": a.Finalized,
"time_unit": a.TimeUnit,
"file_options": a.FileOptions,
"start_time": a.StartTime,
}

updateOption := func(alloc *Allocation) {
alloc.Tx = a.Tx
alloc.Expiration = a.Expiration
alloc.OwnerID = a.OwnerID
alloc.OwnerPublicKey = a.OwnerPublicKey
alloc.RepairerID = a.RepairerID
alloc.TotalSize = a.TotalSize
alloc.Finalized = a.Finalized
alloc.TimeUnit = a.TimeUnit
alloc.FileOptions = a.FileOptions
alloc.StartTime = a.StartTime
}
err = Repo.UpdateAllocation(ctx, a, updateMap, updateOption)
}

if err != nil {
return nil, err
Expand Down
40 changes: 29 additions & 11 deletions code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package allocation
import (
"context"
"fmt"
"sync"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
Expand All @@ -19,20 +20,23 @@ const (
)

var (
Repo *Repository
Repo *Repository
mapLock sync.Mutex
)

type AllocationUpdate func(a *Allocation)

func init() {
allocCache, _ := lru.New[string, *Allocation](lruSize)
allocCache, _ := lru.New[string, Allocation](lruSize)
Repo = &Repository{
allocCache: allocCache,
allocLock: make(map[string]*sync.Mutex),
}
}

type Repository struct {
allocCache *lru.Cache[string, *Allocation]
allocCache *lru.Cache[string, Allocation]
allocLock map[string]*sync.Mutex
}

type AllocationCache struct {
Expand Down Expand Up @@ -61,6 +65,9 @@ func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error

a := r.getAllocFromGlobalCache(id)
if a != nil {
cache[id] = AllocationCache{
Allocation: a,
}
return a, nil
}

Expand Down Expand Up @@ -121,6 +128,9 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (

a := r.getAllocFromGlobalCache(allocationID)
if a != nil && a.Tx == txHash {
cache[allocationID] = AllocationCache{
Allocation: a,
}
return a, nil
}

Expand Down Expand Up @@ -237,16 +247,22 @@ func (r *Repository) Commit(tx *datastore.EnhancedDB) {
return
}
for _, txnCache := range cache {
var alloc *Allocation
for _, update := range txnCache.AllocationUpdates {
alloc = r.getAllocFromGlobalCache(txnCache.Allocation.ID)
if alloc != nil {
update(alloc)
}
alloc := r.getAllocFromGlobalCache(txnCache.Allocation.ID)
mapLock.Lock()
mut, ok := r.allocLock[txnCache.Allocation.ID]
if !ok {
mut = &sync.Mutex{}
r.allocLock[txnCache.Allocation.ID] = mut
}
mapLock.Unlock()
mut.Lock()
if alloc != nil {
for _, update := range txnCache.AllocationUpdates {
update(alloc)
}
r.setAllocToGlobalCache(alloc)
}
mut.Unlock()
}
}

Expand Down Expand Up @@ -319,11 +335,13 @@ func (r *Repository) getAllocFromGlobalCache(id string) *Allocation {
if !ok {
return nil
}
return a
return &a
}

func (r *Repository) setAllocToGlobalCache(a *Allocation) {
r.allocCache.Add(a.ID, a)
if a != nil {
r.allocCache.Add(a.ID, *a)
}
}

func (r *Repository) DeleteAllocation(allocationID string) {
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/handler/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,11 @@ func (fsh *StorageHandler) GetLatestWriteMarker(ctx context.Context, r *http.Req
} else {
latestWM, err = writemarker.GetWriteMarkerEntity(ctx, allocationObj.AllocationRoot)
if err != nil {
return nil, common.NewError("latest_write_marker_read_error", "Error reading the latest write marker for allocation."+err.Error())
Logger.Error("[latest_write_marker]", zap.String("allocation_root", allocationObj.AllocationRoot), zap.String("allocation_id", allocationObj.ID))
return nil, common.NewError("latest_write_marker_read_error", "Error reading the latest write marker for allocation. "+err.Error())
}
if latestWM == nil {
Logger.Info("[latest_write_marker]", zap.String("allocation_root", allocationObj.AllocationRoot), zap.String("allocation_id", allocationObj.ID))
return nil, common.NewError("latest_write_marker_read_error", "Latest write marker not found for allocation.")
}
if latestWM.WM.PreviousAllocationRoot != "" {
Expand Down
Loading