Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check large objects before replication #2558

Open
wants to merge 2 commits into
base: development/8.7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const joiSchema = joi.object({
minMPUSizeMB: joi.number().greater(0).default(20),
probeServer: probeServerJoi.default(),
circuitBreaker: joi.object().optional(),
sourceCheckIfSizeGreaterThanMB: joi.number().positive().default(100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be a good occasion to review this option:

  • can we find a better name (maxObjectSize ?)
  • this option is weird, as it breaks crr semantics: what is the purpose of this option? Is it really used? Is there a matching option in cloudserver?

Copy link
Contributor Author

@KillianG KillianG Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, This option simply adds a double check before repushing the object to replication, if an object is so large it takes time to replicate to avoid sending multiple times the same object for replication

We can probably find a better name for it though..
Why do you think it breaks crr sementic ? I don't know if it's really used but there is a default value so I guess it is no ?

}).required(),
replicationStatusProcessor: {
groupId: joi.string().required(),
Expand Down
61 changes: 61 additions & 0 deletions extensions/replication/tasks/ReplicateObject.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
const RoleCredentials = require('../../../lib/credentials/RoleCredentials');
const { metricsExtension, metricsTypeQueued, metricsTypeCompleted, replicationStages } = require('../constants');

const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');

const errorAlreadyCompleted = {};

function _extractAccountIdFromRole(role) {
return role.split(':')[4];
}
Expand Down Expand Up @@ -331,6 +335,49 @@
});
}

_refreshSourceEntry(sourceEntry, log, cb) {
const params = {
Bucket: sourceEntry.getBucket(),
Key: sourceEntry.getObjectKey(),
VersionId: sourceEntry.getEncodedVersionId(),
};
return this.backbeatSource.getMetadata(params, (err, blob) => {
if (err) {
err.origin = 'source';

Check warning on line 346 in extensions/replication/tasks/ReplicateObject.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'err'

Check warning on line 346 in extensions/replication/tasks/ReplicateObject.js

View workflow job for this annotation

GitHub Actions / tests

Assignment to property of function parameter 'err'
log.error('error getting metadata blob from S3', {
method: 'ReplicateObject._refreshSourceEntry',
error: err,
});
return cb(err);
}
const parsedEntry = ObjectQueueEntry.createFromBlob(blob.Body);
if (parsedEntry.error) {
log.error('error parsing metadata blob', {

Check warning on line 355 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L355

Added line #L355 was not covered by tests
error: parsedEntry.error,
method: 'ReplicateObject._refreshSourceEntry',
});
return cb(errors.InternalError.

Check warning on line 359 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L359

Added line #L359 was not covered by tests
customizeDescription('error parsing metadata blob'));
}
const refreshedEntry = new ObjectQueueEntry(sourceEntry.getBucket(),
sourceEntry.getObjectVersionedKey(), parsedEntry.result);
return cb(null, refreshedEntry);
});
}

_checkSourceReplication(sourceEntry, log, cb) {
this._refreshSourceEntry(sourceEntry, log, (err, refreshedEntry) => {
if (err) {
return cb(err);
}
const status = refreshedEntry.getReplicationSiteStatus(this.site);
if (status === 'COMPLETED') {
return cb(errorAlreadyCompleted);

Check warning on line 375 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L375

Added line #L375 was not covered by tests
}
return cb();
});
}

_getAndPutData(sourceEntry, destEntry, log, cb) {
log.debug('replicating data', { entry: sourceEntry.getLogInfo() });
if (sourceEntry.getLocation().some(part => {
Expand Down Expand Up @@ -701,6 +748,14 @@
this._setTargetAccountMd(destEntry, targetRole, log, next);
},
// Get data from source bucket and put it on the target bucket
next => {
if (!mdOnly &&
sourceEntry.getContentLength() / 1000000 >=
this.repConfig.queueProcessor.sourceCheckIfSizeGreaterThanMB) {
return this._checkSourceReplication(sourceEntry, log, next);
}
return next();
},
next => {
if (!mdOnly) {
const extMetrics = getExtMetrics(this.site,
Expand Down Expand Up @@ -768,6 +823,12 @@
});
return done();
}
if (err === errorAlreadyCompleted) {
log.warn('replication skipped: ' +

Check warning on line 827 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L827

Added line #L827 was not covered by tests
'source object version already COMPLETED',
{ entry: sourceEntry.getLogInfo() });
return done();

Check warning on line 830 in extensions/replication/tasks/ReplicateObject.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

extensions/replication/tasks/ReplicateObject.js#L830

Added line #L830 was not covered by tests
}
if (err.ObjNotFound || err.code === 'ObjNotFound') {
if (err.origin === 'source') {
log.info('replication skipped: ' +
Expand Down
13 changes: 13 additions & 0 deletions tests/functional/replication/queueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ describe('queue processor functional tests with mocking', () => {
},
groupId: 'backbeat-func-test-group-id',
mpuPartsConcurrency: 10,
sourceCheckIfSizeGreaterThanMB: 0,
},
},
{ host: '127.0.0.1',
Expand Down Expand Up @@ -929,6 +930,18 @@ describe('queue processor functional tests with mocking', () => {
], done);
});

it('should fail a replication if unable to get metadata', done => {
s3mock.installBackbeatErrorResponder('source.s3.getMetadata',
errors.ObjNotFound,
{ once: true });
async.parallel([
done => queueProcessorSF.processReplicationEntry(
s3mock.getParam('kafkaEntry'), () => {
done();
}),
], done);
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have tests verifying that "large" objects are not replicated...

it('should complete a "copy location" action', done => {
sendCopyLocationAction(
s3mock, queueProcessorSF, response => {
Expand Down
Loading