Skip to content

Commit 2e6cd8d

Browse files
authored
replication: perform bucket resync in parallel (minio#16707)
1 parent 31fd025 commit 2e6cd8d

File tree

1 file changed

+98
-73
lines changed

1 file changed

+98
-73
lines changed

cmd/bucket-replication.go

Lines changed: 98 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/minio/minio/internal/hash"
4949
xhttp "github.com/minio/minio/internal/http"
5050
"github.com/minio/minio/internal/logger"
51+
"github.com/minio/minio/internal/workers"
5152
"github.com/zeebo/xxh3"
5253
)
5354

@@ -2252,7 +2253,10 @@ func (s *replicationResyncer) PersistToDisk(ctx context.Context, objectAPI Objec
22522253
}
22532254
}
22542255

2255-
const resyncWorkerCnt = 50 // limit of number of bucket resyncs is progress at any given time
2256+
const (
2257+
resyncWorkerCnt = 10 // limit of number of bucket resyncs is progress at any given time
2258+
resyncParallelRoutines = 10 // number of parallel resync ops per bucket
2259+
)
22562260

22572261
func newresyncer() *replicationResyncer {
22582262
rs := replicationResyncer{
@@ -2267,6 +2271,36 @@ func newresyncer() *replicationResyncer {
22672271
return &rs
22682272
}
22692273

2274+
// mark status of replication resync on remote target for the bucket
2275+
func (s *replicationResyncer) markStatus(status ResyncStatusType, opts resyncOpts) {
2276+
s.Lock()
2277+
defer s.Unlock()
2278+
2279+
m := s.statusMap[opts.bucket]
2280+
st := m.TargetsMap[opts.arn]
2281+
st.LastUpdate = UTCNow()
2282+
st.ResyncStatus = status
2283+
m.TargetsMap[opts.arn] = st
2284+
m.LastUpdate = UTCNow()
2285+
s.statusMap[opts.bucket] = m
2286+
}
2287+
2288+
// update replication resync stats for bucket's remote target
2289+
func (s *replicationResyncer) incStats(ts TargetReplicationResyncStatus, opts resyncOpts) {
2290+
s.Lock()
2291+
defer s.Unlock()
2292+
m := s.statusMap[opts.bucket]
2293+
st := m.TargetsMap[opts.arn]
2294+
st.Object = ts.Object
2295+
st.ReplicatedCount += ts.ReplicatedCount
2296+
st.FailedCount += ts.FailedCount
2297+
st.ReplicatedSize += ts.ReplicatedSize
2298+
st.FailedSize += ts.FailedSize
2299+
m.TargetsMap[opts.arn] = st
2300+
m.LastUpdate = UTCNow()
2301+
s.statusMap[opts.bucket] = m
2302+
}
2303+
22702304
// resyncBucket resyncs all qualifying objects as per replication rules for the target
22712305
// ARN
22722306
func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI ObjectLayer, heal bool, opts resyncOpts) {
@@ -2278,15 +2312,7 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
22782312

22792313
resyncStatus := ResyncFailed
22802314
defer func() {
2281-
s.Lock()
2282-
m := s.statusMap[opts.bucket]
2283-
st := m.TargetsMap[opts.arn]
2284-
st.LastUpdate = UTCNow()
2285-
st.ResyncStatus = resyncStatus
2286-
m.TargetsMap[opts.arn] = st
2287-
m.LastUpdate = UTCNow()
2288-
s.statusMap[opts.bucket] = m
2289-
s.Unlock()
2315+
s.markStatus(resyncStatus, opts)
22902316
globalSiteResyncMetrics.incBucket(opts, resyncStatus)
22912317
s.workerCh <- struct{}{}
22922318
}()
@@ -2322,15 +2348,9 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
23222348
}
23232349
// mark resync status as resync started
23242350
if !heal {
2325-
s.Lock()
2326-
m := s.statusMap[opts.bucket]
2327-
st := m.TargetsMap[opts.arn]
2328-
st.ResyncStatus = ResyncStarted
2329-
m.TargetsMap[opts.arn] = st
2330-
m.LastUpdate = UTCNow()
2331-
s.statusMap[opts.bucket] = m
2332-
s.Unlock()
2351+
s.markStatus(ResyncStarted, opts)
23332352
}
2353+
23342354
// Walk through all object versions - Walk() is always in ascending order needed to ensure
23352355
// delete marker replicated to target after object version is first created.
23362356
if err := objectAPI.Walk(ctx, opts.bucket, "", objInfoCh, ObjectOptions{}); err != nil {
@@ -2346,80 +2366,85 @@ func (s *replicationResyncer) resyncBucket(ctx context.Context, objectAPI Object
23462366
if st.ResyncStatus == ResyncStarted || st.ResyncStatus == ResyncFailed {
23472367
lastCheckpoint = st.Object
23482368
}
2369+
workers, err := workers.New(resyncParallelRoutines)
23492370
for obj := range objInfoCh {
23502371
select {
23512372
case <-s.resyncCancelCh:
23522373
resyncStatus = ResyncCanceled
23532374
return
2375+
case <-ctx.Done():
2376+
return
23542377
default:
23552378
}
23562379
if heal && lastCheckpoint != "" && lastCheckpoint != obj.Name {
23572380
continue
23582381
}
23592382
lastCheckpoint = ""
2383+
obj := obj
2384+
workers.Take()
2385+
go func() {
2386+
defer workers.Give()
2387+
roi := getHealReplicateObjectInfo(obj, rcfg)
2388+
if !roi.ExistingObjResync.mustResync() {
2389+
return
2390+
}
2391+
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
2392+
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
2393+
versionID := ""
2394+
dmVersionID := ""
2395+
if roi.VersionPurgeStatus.Empty() {
2396+
dmVersionID = roi.VersionID
2397+
} else {
2398+
versionID = roi.VersionID
2399+
}
23602400

2361-
roi := getHealReplicateObjectInfo(obj, rcfg)
2362-
if !roi.ExistingObjResync.mustResync() {
2363-
continue
2364-
}
2365-
traceFn := s.trace(tgt.ResetID, fmt.Sprintf("%s/%s (%s)", opts.bucket, roi.Name, roi.VersionID))
2366-
if roi.DeleteMarker || !roi.VersionPurgeStatus.Empty() {
2367-
versionID := ""
2368-
dmVersionID := ""
2369-
if roi.VersionPurgeStatus.Empty() {
2370-
dmVersionID = roi.VersionID
2401+
doi := DeletedObjectReplicationInfo{
2402+
DeletedObject: DeletedObject{
2403+
ObjectName: roi.Name,
2404+
DeleteMarkerVersionID: dmVersionID,
2405+
VersionID: versionID,
2406+
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
2407+
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
2408+
DeleteMarker: roi.DeleteMarker,
2409+
},
2410+
Bucket: roi.Bucket,
2411+
OpType: replication.ExistingObjectReplicationType,
2412+
EventType: ReplicateExistingDelete,
2413+
}
2414+
replicateDelete(ctx, doi, objectAPI)
23712415
} else {
2372-
versionID = roi.VersionID
2416+
roi.OpType = replication.ExistingObjectReplicationType
2417+
roi.EventType = ReplicateExisting
2418+
replicateObject(ctx, roi, objectAPI)
23732419
}
2374-
2375-
doi := DeletedObjectReplicationInfo{
2376-
DeletedObject: DeletedObject{
2377-
ObjectName: roi.Name,
2378-
DeleteMarkerVersionID: dmVersionID,
2379-
VersionID: versionID,
2380-
ReplicationState: roi.getReplicationState(roi.Dsc.String(), versionID, true),
2381-
DeleteMarkerMTime: DeleteMarkerMTime{roi.ModTime},
2382-
DeleteMarker: roi.DeleteMarker,
2420+
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
2421+
VersionID: roi.VersionID,
2422+
Internal: minio.AdvancedGetOptions{
2423+
ReplicationProxyRequest: "false",
23832424
},
2384-
Bucket: roi.Bucket,
2385-
OpType: replication.ExistingObjectReplicationType,
2386-
EventType: ReplicateExistingDelete,
2425+
})
2426+
st := TargetReplicationResyncStatus{
2427+
Object: roi.Name,
2428+
Bucket: roi.Bucket,
23872429
}
2388-
replicateDelete(ctx, doi, objectAPI)
2389-
} else {
2390-
roi.OpType = replication.ExistingObjectReplicationType
2391-
roi.EventType = ReplicateExisting
2392-
replicateObject(ctx, roi, objectAPI)
2393-
}
2394-
_, err = tgt.StatObject(ctx, tgt.Bucket, roi.Name, minio.StatObjectOptions{
2395-
VersionID: roi.VersionID,
2396-
Internal: minio.AdvancedGetOptions{
2397-
ReplicationProxyRequest: "false",
2398-
},
2399-
})
2400-
s.Lock()
2401-
m = s.statusMap[opts.bucket]
2402-
st = m.TargetsMap[opts.arn]
2403-
st.Object = roi.Name
2404-
success := true
2405-
if err != nil {
2406-
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
2407-
st.ReplicatedCount++
2430+
success := true
2431+
if err != nil {
2432+
if roi.DeleteMarker && isErrMethodNotAllowed(ErrorRespToObjectError(err, opts.bucket, roi.Name)) {
2433+
st.ReplicatedCount++
2434+
} else {
2435+
st.FailedCount++
2436+
success = false
2437+
}
24082438
} else {
2409-
st.FailedCount++
2410-
success = false
2439+
st.ReplicatedCount++
2440+
st.ReplicatedSize += roi.Size
24112441
}
2412-
} else {
2413-
st.ReplicatedCount++
2414-
st.ReplicatedSize += roi.Size
2415-
}
2416-
m.TargetsMap[opts.arn] = st
2417-
m.LastUpdate = UTCNow()
2418-
s.statusMap[opts.bucket] = m
2419-
s.Unlock()
2420-
traceFn(err)
2421-
globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
2442+
s.incStats(st, opts)
2443+
traceFn(err)
2444+
globalSiteResyncMetrics.updateMetric(roi, success, opts.resyncID)
2445+
}()
24222446
}
2447+
workers.Wait()
24232448
resyncStatus = ResyncCompleted
24242449
}
24252450

0 commit comments

Comments
 (0)