Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix alloc lock #1501

Merged
merged 8 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
a.StartTime = sa.StartTime
a.StorageVersion = uint8(sa.StorageVersion)
a.OwnerSigningPublicKey = sa.OwnerSigningPublicKey
logging.Logger.Info("OwnerSigningPublicKey", zap.String("OwnerSigningPublicKey", a.OwnerSigningPublicKey), zap.String("allocation_id", a.ID))
logging.Logger.Info("OwnerSigningPublicKey", zap.String("OwnerSigningPublicKey", a.OwnerSigningPublicKey), zap.String("allocation_id", a.ID), zap.String("allocation_tx", a.Tx))

m := map[string]interface{}{
"allocation_id": a.ID,
Expand Down Expand Up @@ -141,7 +141,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
return a, nil
}

logging.Logger.Info("Saving the allocation to DB", zap.String("allocation_id", a.ID))
logging.Logger.Info("Saving the allocation to DB", zap.String("allocation_id", a.ID), zap.String("allocation_tx", a.Tx))

if !isExist {
err = Repo.Save(ctx, a)
Expand Down Expand Up @@ -174,6 +174,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
alloc.StartTime = a.StartTime
alloc.BlobberSize = a.BlobberSize
alloc.OwnerSigningPublicKey = a.OwnerSigningPublicKey
logging.Logger.Info("updatingAllocation", zap.String("allocation_id", a.ID), zap.String("allocation_tx", a.Tx))
}
err = Repo.UpdateAllocation(ctx, a, updateMap, updateOption)
}
Expand Down
5 changes: 4 additions & 1 deletion code/go/0chain.net/blobbercore/allocation/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (
Allocation: a,
}
return a, nil
} else if a != nil {
logging.Logger.Debug("allocation_cache_miss", zap.String("allocation_id", allocationID), zap.String("tx_hash", txHash), zap.String("cached_tx_hash", a.Tx))
}

alloc := &Allocation{}
Expand Down Expand Up @@ -260,7 +262,6 @@ func (r *Repository) Commit(tx *datastore.EnhancedDB) {
return
}
for _, txnCache := range cache {
alloc := r.getAllocFromGlobalCache(txnCache.Allocation.ID)
mapLock.Lock()
mut, ok := r.allocLock[txnCache.Allocation.ID]
if !ok {
Expand All @@ -269,11 +270,13 @@ func (r *Repository) Commit(tx *datastore.EnhancedDB) {
}
mapLock.Unlock()
mut.Lock()
alloc := r.getAllocFromGlobalCache(txnCache.Allocation.ID)
if alloc != nil {
for _, update := range txnCache.AllocationUpdates {
update(alloc)
}
if len(txnCache.AllocationUpdates) > 0 {
logging.Logger.Debug("committing_allocation", zap.String("allocation_id", alloc.ID), zap.String("tx", alloc.Tx))
r.setAllocToGlobalCache(alloc)
}
}
Expand Down
11 changes: 7 additions & 4 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
// unlocking it as it will be locked for longer time and handler.CommitWrite
// will fail.
allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), cr.AllocationID)
logging.Logger.Debug("[challenge]load: ", zap.String("challenge_id", cr.ChallengeID), zap.String("allocation_id", cr.AllocationID))
allocMu.RLock()

allocationObj, err := allocation.Repo.GetAllocationFromDB(ctx, cr.AllocationID)
Expand All @@ -105,15 +106,14 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
var (
postData map[string]any
)

if allocationObj.IsStorageV2() {
postData, err = cr.getPostDataV2(ctx, allocationObj)
} else {
postData, err = cr.getPostData(ctx, allocationObj)
}
allocMu.RUnlock()
if err != nil {
logging.Logger.Error("[challenge]load: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
logging.Logger.Error("[challenge]load: ", zap.String("challenge_id", cr.ChallengeID), zap.String("allocation_id", cr.AllocationID), zap.Error(err))
cr.CancelChallenge(ctx, err)
return err
}
Expand Down Expand Up @@ -294,7 +294,7 @@ func (cr *ChallengeEntity) SaveChallengeResult(ctx context.Context, t *coreTxn.T
func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *allocation.Allocation) (map[string]any, error) {
trie := allocationObj.GetTrie()
copyTrie := wmpt.New(trie.CopyRoot(filestore.COLLAPSE_DEPTH), datastore.GetBlockStore())

logging.Logger.Info("[challenge]getPostDataV2: ", zap.String("allocation_id", cr.AllocationID))
var (
blockNum = int64(0)
postData = make(map[string]interface{})
Expand All @@ -307,7 +307,7 @@ func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *all
blockNum = r.Int63n(int64(copyTrie.Weight()))
blockNum++
cr.BlockNum = blockNum
logging.Logger.Info("[challenge]rand: ", zap.Uint64("trie.NumBlocks", trie.Weight()), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
logging.Logger.Info("[challenge]rand: ", zap.Uint64("trie.NumBlocks", trie.Weight()), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber), zap.String("allocation_id", cr.AllocationID))
key, objectProof, err := copyTrie.GetBlockProof(uint64(blockNum))
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,6 +353,8 @@ func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *all
zap.Int64("file size", ref.Size),
zap.String("file path", ref.Path),
zap.Int64("proof gen time", proofGenTime),
zap.String("allocation_id", cr.AllocationID),
zap.String("challenge_id", cr.ChallengeID),
)
postData["challenge_proof"] = challengeResponse
objectSize = ref.Size
Expand Down Expand Up @@ -385,6 +387,7 @@ func (cr *ChallengeEntity) getPostDataV2(ctx context.Context, allocationObj *all

return nil, err
}
logging.Logger.Debug("[challenge]getPostDataV2Return ", zap.Any("blockNum", blockNum), zap.String("allocation_id", cr.AllocationID))
return postData, nil
}

Expand Down
8 changes: 5 additions & 3 deletions code/go/0chain.net/blobbercore/handler/handler_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/0chain/gosdk/core/client"
"net/http"
"time"

"github.com/0chain/gosdk/core/client"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
Expand Down Expand Up @@ -142,12 +143,12 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu

mutex := lock.GetMutex(allocation.Allocation{}.TableName(), allocationID)
Logger.Info("Locking allocation", zap.String("allocation_id", allocationID))
mutex.Lock()
defer mutex.Unlock()
wmSet := writemarker.SetCommittingMarker(allocationID, true)
if !wmSet {
return nil, http.StatusBadRequest, common.NewError("pending_markers", "Committing marker set failed")
}
mutex.Lock()
defer mutex.Unlock()
ctx = GetMetaDataStore().CreateTransaction(ctx)
tx := GetMetaDataStore().GetTransaction(ctx)
resp, statusCode, err = handler(ctx, r)
Expand All @@ -161,6 +162,7 @@ func WithStatusConnectionForWM(handler common.StatusCodeResponderF) common.Statu
}
writemarker.SetCommittingMarker(allocationID, false)
}
Logger.Debug("Unlocking allocation", zap.String("allocation_id", allocationID))
}()

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewError("blacklisted_client", "Client is blacklisted: "+clientID)
}

allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, false)
allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, true)
if err != nil {
return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error())
}
Expand Down Expand Up @@ -912,7 +912,7 @@ func (fsh *StorageHandler) CommitWriteV2(ctx context.Context, r *http.Request) (
return nil, common.NewError("blacklisted_client", "Client is blacklisted: "+clientID)
}

allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, false)
allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, true)
if err != nil {
return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error())
}
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/handler/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ func (fsh *StorageHandler) GetReferencePathV2(ctx context.Context, r *http.Reque
case result := <-resCh:
return result, nil
case err := <-errCh:
logging.Logger.Error("GetReferencePathV2", zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -708,10 +709,11 @@ func (fsh *StorageHandler) getReferencePathV2(ctx context.Context, r *http.Reque
return
}
allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), allocationId)
logging.Logger.Debug("getReferencePathV2Lock", zap.String("allocation_id", allocationId))
allocMu.RLock()
defer allocMu.RUnlock()
now := time.Now()

logging.Logger.Debug("getTrie", zap.String("allocation_id", allocationId))
trie := allocationObj.GetTrie()
if trie == nil {
errCh <- common.NewError("invalid_parameters", "Trie not found for allocation.")
Expand Down
11 changes: 4 additions & 7 deletions code/go/0chain.net/blobbercore/writemarker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ func SetupWorkers(ctx context.Context) {
}

func redeemWriteMarker(md *markerData) error {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
timeoutCtx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
defer cancel()
ctx := datastore.GetStore().CreateTransaction(timeoutCtx)
db := datastore.GetStore().GetTransaction(ctx)
allocationID := md.allocationID
shouldRollback := false
start := time.Now()
logging.Logger.Info("redeeming_write_marker", zap.String("allocationID", allocationID))
allocMu := lock.GetMutex(allocation.Allocation{}.TableName(), allocationID)
allocMu.RLock()
defer allocMu.RUnlock()
defer func() {
if shouldRollback {
if rollbackErr := db.Rollback().Error; rollbackErr != nil {
Expand All @@ -135,8 +136,8 @@ func redeemWriteMarker(md *markerData) error {
} else {
deleteMarkerData(allocationID)
}
allocMu.RUnlock()
}()

alloc, err := allocation.Repo.GetAllocationFromDB(ctx, allocationID)
if err != nil {
logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", allocationID), zap.Any("error", err))
Expand All @@ -153,7 +154,6 @@ func redeemWriteMarker(md *markerData) error {
shouldRollback = true
return nil
}

wm, err := GetWriteMarkerEntity(ctx, alloc.ID, alloc.AllocationRoot)
if err != nil {
logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", alloc.AllocationRoot), zap.Any("error", err))
Expand All @@ -163,7 +163,6 @@ func redeemWriteMarker(md *markerData) error {
shouldRollback = true
return err
}

err = wm.RedeemMarker(ctx, alloc.LastRedeemedSeq+1)
if err != nil {
elapsedTime := time.Since(start)
Expand All @@ -178,7 +177,6 @@ func redeemWriteMarker(md *markerData) error {
shouldRollback = true
return err
}

err = allocation.Repo.UpdateAllocationRedeem(ctx, allocationID, wm.WM.AllocationRoot, alloc, wm.Sequence)
if err != nil {
logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed",
Expand All @@ -188,7 +186,6 @@ func redeemWriteMarker(md *markerData) error {
go tryAgain(md)
return err
}

err = db.Commit().Error
if err != nil {
logging.Logger.Error("Error committing the writemarker redeem",
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/validatorcore/storage/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,13 @@ func (cr *ChallengeRequest) verifyObjectProof(latestWM *writemarker.WriteMarker,
hashData := fmt.Sprintf("%s:%s:%s:%s", cr.Meta.ActualFileHash, cr.Meta.ValidationRoot, cr.Meta.FixedMerkleRoot, blobberID)
validationRootHash := encryption.Hash(hashData)
var verify bool
if cr.Meta.SignatureVersion == 1 {
if len(cr.Meta.ValidationRootSignature) == 128 {
verify, err = encryption.VerifyEd25519(ownerSigningPublicKey, cr.Meta.ValidationRootSignature, validationRootHash)
} else {
verify, err = encryption.Verify(ownerPublicKey, cr.Meta.ValidationRootSignature, validationRootHash)
}
if err != nil {
logging.Logger.Error("Failed to verify the validation root signature", zap.Error(err), zap.String("validation_root", cr.Meta.ValidationRoot), zap.String("validation_root_signature", cr.Meta.ValidationRootSignature), zap.String("owner_public_key", ownerPublicKey))
logging.Logger.Error("Failed to verify the validation root signature", zap.Error(err), zap.String("validation_root", cr.Meta.ValidationRoot), zap.String("validation_root_signature", cr.Meta.ValidationRootSignature), zap.String("owner_public_key", ownerPublicKey), zap.String("allocation_id", latestWM.AllocationID))
return common.NewError("invalid_object_proof", "Failed to verify the validation root signature. "+err.Error())
}
if !verify {
Expand Down
Loading