Skip to content

Commit

Permalink
feat: make regular primary & secondary sp gc also batch delete with k…
Browse files Browse the repository at this point in the history
…ey prefix
  • Loading branch information
annielz committed Nov 15, 2024
1 parent 7f4d7dc commit 9982dd1
Showing 1 changed file with 15 additions and 33 deletions.
48 changes: 15 additions & 33 deletions modular/executor/execute_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
currentGCBlockID uint64
currentGCObjectID uint64
responseEndBlockID uint64
storageParams *storagetypes.Params
gcObjectNumber int
tryAgainLater bool
taskIsCanceled bool
Expand Down Expand Up @@ -254,30 +253,18 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
}

for _, object := range waitingGCObjects {
if storageParams, err = e.baseApp.Consensus().QueryStorageParamsByTimestamp(
context.Background(), object.GetObjectInfo().GetCreateAt()); err != nil {
log.Errorw("failed to query storage params", "task_info", task.Info(), "error", err)
return
}

currentGCBlockID = uint64(object.GetDeleteAt())
objectInfo := object.GetObjectInfo()
objectVersion := objectInfo.Version
currentGCObjectID = objectInfo.Id.Uint64()
if currentGCBlockID < task.GetCurrentBlockNumber() {
log.Errorw("skip gc object", "object_info", objectInfo,
"task_current_gc_block_id", task.GetCurrentBlockNumber())
continue
}
segmentCount := e.baseApp.PieceOp().SegmentPieceCount(objectInfo.GetPayloadSize(),
storageParams.VersionedParams.GetMaxSegmentSize())
for segIdx := uint32(0); segIdx < segmentCount; segIdx++ {
pieceKey := e.baseApp.PieceOp().SegmentPieceKey(currentGCObjectID, segIdx, objectVersion)
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key", pieceKey, "error", deleteErr)
}
segmentPieceKeyPrefix := fmt.Sprintf("s%d_", currentGCObjectID)
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, segmentPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key_prefix", segmentPieceKeyPrefix, "error", deleteErr)
bucketInfo, err := e.baseApp.GfSpClient().GetBucketInfoByBucketName(ctx, objectInfo.BucketName)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket_name", objectInfo.BucketName, "error", err)
Expand All @@ -290,35 +277,30 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
}

var redundancyIndex int32 = -1
// since in GC the object will be completely deleted, simply find all pieces with the piece key prefix and remove them
ECPieceKeyPrefix := fmt.Sprintf("e%d_", currentGCObjectID)
if len(gvg.GetSecondarySpIds()) != 0 {
for rIdx, sspId := range gvg.GetSecondarySpIds() {
if spId == sspId {
redundancyIndex = int32(rIdx)
for segIdx := uint32(0); segIdx < segmentCount; segIdx++ {
pieceKey := e.baseApp.PieceOp().ECPieceKey(currentGCObjectID, segIdx, uint32(rIdx), objectVersion)
if objectInfo.GetRedundancyType() == storagetypes.REDUNDANCY_REPLICA_TYPE {
pieceKey = e.baseApp.PieceOp().SegmentPieceKey(objectInfo.Id.Uint64(), segIdx, objectVersion)
}
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the secondary sp pieces",
"object_info", objectInfo, "piece_key", pieceKey, "error", deleteErr)
}
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the secondary sp pieces by prefix",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "error", deleteErr)
}
}
} else {
// if failed to get secondary sps, iterate through all files with key prefix to delete possible files in storage and metadata in sp
pieceKeyPrefix := fmt.Sprintf("e%d_", currentGCObjectID)
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, pieceKeyPrefix)
log.CtxDebugw(ctx, "delete the secondary sp pieces by prefix",
"object_info", objectInfo, "piece_key_prefix", pieceKeyPrefix, "error", deleteErr)
// if failed to get secondary sps, check the current sp
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the sp pieces by prefix in current sp when secondary sp not found",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "error", deleteErr)

// signal as delete any integrity meta related with the object
redundancyIndex = math.MaxInt32
}

// ignore this delete api error, TODO: refine gc workflow by enrich metadata index
deleteErr := e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex)
deleteErr = e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex)
log.CtxDebugw(ctx, "delete the object integrity meta", "object_info", objectInfo, "error", deleteErr)
task.SetCurrentBlockNumber(currentGCBlockID)
task.SetLastDeletedObjectId(currentGCObjectID)
Expand Down

0 comments on commit 9982dd1

Please sign in to comment.