Skip to content

Commit

Permalink
Merge pull request #1501 from 0chain/fix/refpath-lock
Browse files Browse the repository at this point in the history
Fix alloc lock
  • Loading branch information
dabasov authored Nov 6, 2024
2 parents c8b0659 + 9a0bac9 commit 22ca751
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 22 deletions.
5 changes: 3 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
a.StartTime = sa.StartTime
a.StorageVersion = uint8(sa.StorageVersion)
a.OwnerSigningPublicKey = sa.OwnerSigningPublicKey
logging.Logger.Info("OwnerSigningPublicKey", zap.String("OwnerSigningPublicKey", a.OwnerSigningPublicKey), zap.String("allocation_id", a.ID))
logging.Logger.Info("OwnerSigningPublicKey", zap.String("OwnerSigningPublicKey", a.OwnerSigningPublicKey), zap.String("allocation_id", a.ID), zap.String("allocation_tx", a.Tx))

m := map[string]interface{}{
"allocation_id": a.ID,
Expand Down Expand Up @@ -141,7 +141,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
return a, nil
}

logging.Logger.Info("Saving the allocation to DB", zap.String("allocation_id", a.ID))
logging.Logger.Info("Saving the allocation to DB", zap.String("allocation_id", a.ID), zap.String("allocation_tx", a.Tx))

if !isExist {
err = Repo.Save(ctx, a)
Expand Down Expand Up @@ -174,6 +174,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
alloc.StartTime = a.StartTime
alloc.BlobberSize = a.BlobberSize
alloc.OwnerSigningPublicKey = a.OwnerSigningPublicKey
logging.Logger.Info("updatingAllocation", zap.String("allocation_id", a.ID), zap.String("allocation_tx", a.Tx))
}
err = Repo.UpdateAllocation(ctx, a, updateMap, updateOption)
}
Expand Down
5 changes: 4 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (
Allocation: a,
}
return a, nil
} else if a != nil {
logging.Logger.Debug("allocation_cache_miss", zap.String("allocation_id", allocationID), zap.String("tx_hash", txHash), zap.String("cached_tx_hash", a.Tx))
}

alloc := &Allocation{}
Expand Down Expand Up @@ -260,7 +262,6 @@ func (r *Repository) Commit(tx *datastore.EnhancedDB) {
return
}
for _, txnCache := range cache {
alloc := r.getAllocFromGlobalCache(txnCache.Allocation.ID)
mapLock.Lock()
mut, ok := r.allocLock[txnCache.Allocation.ID]
if !ok {
Expand All @@ -269,11 +270,13 @@ func (r *Repository) Commit(tx *datastore.EnhancedDB) {
}
mapLock.Unlock()
mut.Lock()
alloc := r.getAllocFromGlobalCache(txnCache.Allocation.ID)
if alloc != nil {
for _, update := range txnCache.AllocationUpdates {
update(alloc)
}
if len(txnCache.AllocationUpdates) > 0 {
logging.Logger.Debug("committing_allocation", zap.String("allocation_id", alloc.ID), zap.String("tx", alloc.Tx))
r.setAllocToGlobalCache(alloc)
}
}
Expand Down
11 changes: 7 additions & 4 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
// unlocking it as it will be locked for longer time and handler.CommitWrite
// will fail.
allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), cr.AllocationID)
logging.Logger.Debug("[challenge]load: ", zap.String("challenge_id", cr.ChallengeID), zap.String("allocation_id", cr.AllocationID))
allocMu.RLock()

allocationObj, err := allocation.Repo.GetAllocationFromDB(ctx, cr.AllocationID)
Expand All @@ -105,15 +106,14 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
var (
postData map[string]any
)

if allocationObj.IsStorageV2() {
postData, err = cr.getPostDataV2(ctx, allocationObj)
} else {
postData, err = cr.getPostData(ctx, allocationObj)
}
allocMu.RUnlock()
if err != nil {
logging.Logger.Error("[challenge]load: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
logging.Logger.Error("[challenge]load: ", zap.String("challenge_id", cr.ChallengeID), zap.String("allocation_id", cr.AllocationID), zap.Error(err))
cr.CancelChallenge(ctx, err)
return err
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *coreTxn.T
func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *allocation.Allocation) (map[string]any, error) {
trie := allocationObj.GetTrie()
copyTrie := wmpt.New(trie.CopyRoot(filestore.COLLAPSE_DEPTH), datastore.GetBlockStore())

logging.Logger.Info("[challenge]getPostDataV2: ", zap.String("allocation_id", cr.AllocationID))
var (
blockNum = int64(0)
postData = make(map[string]interface{})
Expand All @@ -307,7 +307,7 @@ func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *all
blockNum = r.Int63n(int64(copyTrie.Weight()))
blockNum++
cr.BlockNum = blockNum
logging.Logger.Info("[challenge]rand: ", zap.Uint64("trie.NumBlocks", trie.Weight()), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
logging.Logger.Info("[challenge]rand: ", zap.Uint64("trie.NumBlocks", trie.Weight()), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber), zap.String("allocation_id", cr.AllocationID))
key, objectProof, err := copyTrie.GetBlockProof(uint64(blockNum))
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,6 +353,8 @@ func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *all
zap.Int64("file size", ref.Size),
zap.String("file path", ref.Path),
zap.Int64("proof gen time", proofGenTime),
zap.String("allocation_id", cr.AllocationID),
zap.String("challenge_id", cr.ChallengeID),
)
postData["challenge_proof"] = challengeResponse
objectSize = ref.Size
Expand Down Expand Up @@ -385,6 +387,7 @@ func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *all

return nil, err
}
logging.Logger.Debug("[challenge]getPostDataV2Return ", zap.Any("blockNum", blockNum), zap.String("allocation_id", cr.AllocationID))
return postData, nil
}

Expand Down
8 changes: 5 additions & 3 deletions code/go/0chain.net/blobbercore/handler/handler_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/0chain/gosdk/core/client"
"net/http"
"time"

"github.com/0chain/gosdk/core/client"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
Expand Down Expand Up @@ -142,12 +143,12 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu

mutex := lock.GetMutex(allocation.Allocation{}.TableName(), allocationID)
Logger.Info("Locking allocation", zap.String("allocation_id", allocationID))
mutex.Lock()
defer mutex.Unlock()
wmSet := writemarker.SetCommittingMarker(allocationID, true)
if !wmSet {
return nil, http.StatusBadRequest, common.NewError("pending_markers", "Committing marker set failed")
}
mutex.Lock()
defer mutex.Unlock()
ctx = GetMetaDataStore().CreateTransaction(ctx)
tx := GetMetaDataStore().GetTransaction(ctx)
resp, statusCode, err = handler(ctx, r)
Expand All @@ -161,6 +162,7 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu
}
writemarker.SetCommittingMarker(allocationID, false)
}
Logger.Debug("Unlocking allocation", zap.String("allocation_id", allocationID))
}()

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewError("blacklisted_client", "Client is blacklisted: "+clientID)
}

allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, false)
allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, true)
if err != nil {
return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error())
}
Expand Down Expand Up @@ -912,7 +912,7 @@ func (fsh *StorageHandler) CommitWriteV2(ctx context.Context, r *http.Request) (
return nil, common.NewError("blacklisted_client", "Client is blacklisted: "+clientID)
}

allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, false)
allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, true)
if err != nil {
return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/handler/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ func (fsh *StorageHandler) GetReferencePathV2(ctx context.Context, r *http.Reque
case result := <-resCh:
return result, nil
case err := <-errCh:
logging.Logger.Error("GetReferencePathV2", zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -708,10 +709,11 @@ func (fsh *StorageHandler) getReferencePathV2(ctx context.Context, r *http.Reque
return
}
allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), allocationId)
logging.Logger.Debug("getReferencePathV2Lock", zap.String("allocation_id", allocationId))
allocMu.RLock()
defer allocMu.RUnlock()
now := time.Now()

logging.Logger.Debug("getTrie", zap.String("allocation_id", allocationId))
trie := allocationObj.GetTrie()
if trie == nil {
errCh <- common.NewError("invalid_parameters", "Trie not found for allocation.")
Expand Down
11 changes: 4 additions & 7 deletions code/go/0chain.net/blobbercore/writemarker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ func SetupWorkers(ctx context.Context) {
}

func redeemWriteMarker(md *markerData) error {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
timeoutCtx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
ctx := datastore.GetStore().CreateTransaction(timeoutCtx)
db := datastore.GetStore().GetTransaction(ctx)
allocationID := md.allocationID
shouldRollback := false
start := time.Now()
logging.Logger.Info("redeeming_write_marker", zap.String("allocationID", allocationID))
allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), allocationID)
allocMu.RLock()
defer allocMu.RUnlock()
defer func() {
if shouldRollback {
if rollbackErr := db.Rollback().Error; rollbackErr != nil {
Expand All @@ -135,8 +136,8 @@ func redeemWriteMarker(md *markerData) error {
} else {
deleteMarkerData(allocationID)
}
allocMu.RUnlock()
}()

alloc, err := allocation.Repo.GetAllocationFromDB(ctx, allocationID)
if err != nil {
logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", allocationID), zap.Any("error", err))
Expand All @@ -153,7 +154,6 @@ func redeemWriteMarker(md *markerData) error {
shouldRollback = true
return nil
}

wm, err := GetWriteMarkerEntity(ctx, alloc.ID, alloc.AllocationRoot)
if err != nil {
logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", alloc.AllocationRoot), zap.Any("error", err))
Expand All @@ -163,7 +163,6 @@ func redeemWriteMarker(md *markerData) error {
shouldRollback = true
return err
}

err = wm.RedeemMarker(ctx, alloc.LastRedeemedSeq+1)
if err != nil {
elapsedTime := time.Since(start)
Expand All @@ -178,7 +177,6 @@ func redeemWriteMarker(md *markerData) error {
shouldRollback = true
return err
}

err = allocation.Repo.UpdateAllocationRedeem(ctx, allocationID, wm.WM.AllocationRoot, alloc, wm.Sequence)
if err != nil {
logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed",
Expand All @@ -188,7 +186,6 @@ func redeemWriteMarker(md *markerData) error {
go tryAgain(md)
return err
}

err = db.Commit().Error
if err != nil {
logging.Logger.Error("Error committing the writemarker redeem",
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/validatorcore/storage/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,13 @@ func (cr *ChallengeRequest) verifyObjectProof(latestWM *writemarker.WriteMarker,
hashData := fmt.Sprintf("%s:%s:%s:%s", cr.Meta.ActualFileHash, cr.Meta.ValidationRoot, cr.Meta.FixedMerkleRoot, blobberID)
validationRootHash := encryption.Hash(hashData)
var verify bool
if cr.Meta.SignatureVersion == 1 {
if len(cr.Meta.ValidationRootSignature) == 128 {
verify, err = encryption.VerifyEd25519(ownerSigningPublicKey, cr.Meta.ValidationRootSignature, validationRootHash)
} else {
verify, err = encryption.Verify(ownerPublicKey, cr.Meta.ValidationRootSignature, validationRootHash)
}
if err != nil {
logging.Logger.Error("Failed to verify the validation root signature", zap.Error(err), zap.String("validation_root", cr.Meta.ValidationRoot), zap.String("validation_root_signature", cr.Meta.ValidationRootSignature), zap.String("owner_public_key", ownerPublicKey))
logging.Logger.Error("Failed to verify the validation root signature", zap.Error(err), zap.String("validation_root", cr.Meta.ValidationRoot), zap.String("validation_root_signature", cr.Meta.ValidationRootSignature), zap.String("owner_public_key", ownerPublicKey), zap.String("allocation_id", latestWM.AllocationID))
return common.NewError("invalid_object_proof", "Failed to verify the validation root signature. "+err.Error())
}
if !verify {
Expand Down

0 comments on commit 22ca751

Please sign in to comment.