Skip to content

Commit

Permalink
1. Reject any versioning request to buckets with cache property
Browse files Browse the repository at this point in the history
2. Server Side copy for a cache miss for a source bucket different than the target bucket
3. Multipart upload copy from a same/different sources
4. List objects from cache/hub
5. List Object versions defaults to List objects for cache enabled buckets
6. List multipart uploads from hub
7. Handle add/remove/get object tags from cache/hub buckets
8. Store last modified time from hub for cached objects. Return this value as the last modified time with cached objects.
9. Update Unit test

Signed-off-by: Sanjeev Gupta <guptsanj@us.ibm.com>
  • Loading branch information
guptsanj committed Aug 25, 2020
1 parent 7f9a801 commit e9b614e
Show file tree
Hide file tree
Showing 12 changed files with 510 additions and 85 deletions.
2 changes: 2 additions & 0 deletions src/api/object_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ module.exports = {
content_type: { type: 'string' },
xattr: { $ref: '#/definitions/xattr' },
cache_last_valid_time: { idate: true },
last_modified_time: { idate: true },
}
},
auth: { system: ['admin', 'user'] }
Expand Down Expand Up @@ -1366,6 +1367,7 @@ module.exports = {
content_type: { type: 'string' },
create_time: { idate: true },
cache_last_valid_time: { idate: true },
last_modified_time: { idate: true},
upload_started: { idate: true },
upload_size: { type: 'integer' },
etag: { type: 'string' },
Expand Down
10 changes: 8 additions & 2 deletions src/endpoint/s3/ops/s3_get_bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ async function get_bucket(req) {
(cont_tok_to_key_marker(cont_tok) || start_after) : req.query.marker,
};

if (req.query.get_from_cache) {
params.get_from_cache = req.query.get_from_cache;
}
const reply = await req.object_sdk.list_objects(params);

let res_params = {
'Name': req.params.bucket,
'Prefix': req.query.prefix,
Expand All @@ -41,7 +45,8 @@ async function get_bucket(req) {
if (req.query.delimiter !== '') {
res_params.Delimiter = req.query.delimiter;
}

// Always have last_modified_time take precedence. This time is set only for cached objects.
// Non cached objects will always default to obj.create_time
return {
ListBucketResult: [res_params, list_type === '2' ? {
'ContinuationToken': cont_tok,
Expand All @@ -55,7 +60,8 @@ async function get_bucket(req) {
_.map(reply.objects, obj => ({
Contents: {
Key: obj.key,
LastModified: s3_utils.format_s3_xml_date(obj.create_time),
// if the object specifies last_modified_time we use it, otherwise take create_time.
LastModified: s3_utils.format_s3_xml_date(obj.last_modified_time || obj.create_time),
ETag: `"${obj.etag}"`,
Size: obj.size,
Owner: (!list_type || req.query['fetch-owner']) && s3_utils.DEFAULT_S3_USER,
Expand Down
10 changes: 10 additions & 0 deletions src/endpoint/s3/s3_rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ async function handle_request(req, res) {
usage_report.s3_usage_info.total_calls += 1;
usage_report.s3_usage_info[op_name] = (usage_report.s3_usage_info[op_name] || 0) + 1;



if (req.query && req.query.versionId) {
const caching = await req.object_sdk.read_bucket_sdk_caching_info(req.params.bucket);
if (caching) {
dbg.error('S3 Version request not (NotImplemented) for buckets with caching', op_name, req.method, req.originalUrl);
throw new S3Error(S3Error.NotImplemented);
}
}

const op = S3_OPS[op_name];
if (!op || !op.handler) {
dbg.error('S3 TODO (NotImplemented)', op_name, req.method, req.originalUrl);
Expand Down
22 changes: 21 additions & 1 deletion src/endpoint/s3/s3_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@ function format_copy_source(copy_source) {

function set_response_object_md(res, object_md) {
res.setHeader('ETag', '"' + object_md.etag + '"');
res.setHeader('Last-Modified', time_utils.format_http_header_date(new Date(object_md.create_time)));

if (object_md.last_modified_time) {
res.setHeader('Last-Modified', time_utils.format_http_header_date(new Date(object_md.last_modified_time)));
} else {
res.setHeader('Last-Modified', time_utils.format_http_header_date(new Date(object_md.create_time)));
}
res.setHeader('Content-Type', object_md.content_type);
res.setHeader('Content-Length', object_md.content_length === undefined ? object_md.size : object_md.content_length);
res.setHeader('Accept-Ranges', 'bytes');
Expand Down Expand Up @@ -549,6 +554,18 @@ function parse_website_to_body(website) {
return reply;
}

function get_http_response_date(res) {
const r = get_http_response_from_resp(res);
if (!r.httpResponse.headers.date) throw new Error("date not found in response header");
return r.httpResponse.headers.date;
}

function get_http_response_from_resp(res) {
const r = res.$response;
if (!r) throw new Error("no $response in s3 returned object");
return r;
}

exports.STORAGE_CLASS_STANDARD = STORAGE_CLASS_STANDARD;
exports.DEFAULT_S3_USER = DEFAULT_S3_USER;
exports.OP_NAME_TO_ACTION = OP_NAME_TO_ACTION;
Expand All @@ -573,3 +590,6 @@ exports.parse_lock_header = parse_lock_header;
exports.parse_body_object_lock_conf_xml = parse_body_object_lock_conf_xml;
exports.parse_to_camel_case = parse_to_camel_case;
exports._is_valid_retention = _is_valid_retention;
exports.get_http_response_from_resp = get_http_response_from_resp;
exports.get_http_response_date = get_http_response_date;

85 changes: 71 additions & 14 deletions src/sdk/namespace_cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const RangeStream = require('../util/range_stream');
const P = require('../util/promise');
const buffer_utils = require('../util/buffer_utils');
const Semaphore = require('../util/semaphore');
const S3Error = require('../endpoint/s3/s3_errors').S3Error;

const _global_cache_uploader = new Semaphore(cache_config.UPLOAD_SEMAPHORE_CAP, {
timeout: cache_config.UPLOAD_SEMAPHORE_TIMEOUT,
Expand All @@ -27,7 +28,17 @@ class NamespaceCache {
}

get_write_resource() {
return this.namespace_hub;
return this;
}

get_bucket() {
return this.namespace_hub.get_bucket();
}

is_same_namespace(other) {
return other instanceof NamespaceCache &&
this.namespace_hub === other.namespace_hub &&
this.namespace_nb === other.namespace_nb;
}

async _delete_object_from_cache(params, object_sdk) {
Expand Down Expand Up @@ -157,18 +168,30 @@ class NamespaceCache {
/////////////////

async list_objects(params, object_sdk) {
// TODO listing from cache only for deevelopment
return this.namespace_nb.list_objects(params, object_sdk);
const get_from_cache = params.get_from_cache;
params = _.omit(params, 'get_from_cache');
if (get_from_cache) {
return this.namespace_nb.list_objects(params, object_sdk);
}
return this.namespace_hub.list_objects(params, object_sdk);
}

async list_uploads(params, object_sdk) {
// TODO listing from cache only for deevelopment
return this.namespace_nb.list_uploads(params, object_sdk);
const get_from_cache = params.get_from_cache;
params = _.omit(params, 'get_from_cache');
if (get_from_cache) {
return this.namespace_nb.list_uploads(params, object_sdk);
}
return this.namespace_hub.list_uploads(params, object_sdk);
}

async list_object_versions(params, object_sdk) {
// TODO listing from cache only for deevelopment
return this.namespace_nb.list_object_versions(params, object_sdk);
const get_from_cache = params.get_from_cache;
params = _.omit(params, 'get_from_cache');
if (get_from_cache) {
return this.namespace_nb.list_objects(params, object_sdk);
}
return this.list_object_versions(params, object_sdk);
}

/////////////////
Expand Down Expand Up @@ -468,6 +491,9 @@ class NamespaceCache {
const operation = 'ObjectCreated';
const load_for_trigger = object_sdk.should_run_triggers({ active_triggers: this.active_triggers, operation });

function stream_final_from_promise(promise) {
return callback => promise.then(() => callback(), err => callback(err));
}
const bucket_free_space_bytes = await this._get_bucket_free_space_bytes(params, object_sdk);
let upload_response;
let etag;
Expand All @@ -487,8 +513,14 @@ class NamespaceCache {
const hub_params = { ...params, source_stream: hub_stream };
const hub_promise = this.namespace_hub.upload_object(hub_params, object_sdk);

// defer the final callback of the cache stream until the hub ack
const cache_finalizer = callback => hub_promise.then(() => callback(), err => callback(err));
const cache_final_promise = hub_promise.then(async upload_res => {
const update_params = _.pick(_.defaults(
{ bucket: this.namespace_nb.target_bucket }, params), 'bucket', 'key', 'last_modified_time');
update_params.last_modified_time = (new Date(upload_res.last_modified_time)).getTime();
await object_sdk.rpc_client.object.update_object_md(update_params);
});
const cache_finalizer = stream_final_from_promise(cache_final_promise);

const cache_stream = new stream.PassThrough({ final: cache_finalizer });
const cache_params = { ...params, source_stream: cache_stream };
const cache_promise = _global_cache_uploader.surround_count(
Expand Down Expand Up @@ -570,9 +602,9 @@ class NamespaceCache {

async complete_object_upload(params, object_sdk) {

setImmediate(() => this._delete_object_from_cache(params, object_sdk));

return this.namespace_hub.complete_object_upload(params, object_sdk);
const res = await this.namespace_hub.complete_object_upload(params, object_sdk);
await this._delete_object_from_cache(params, object_sdk);
return res;
}

async abort_object_upload(params, object_sdk) {
Expand Down Expand Up @@ -609,6 +641,11 @@ class NamespaceCache {

async delete_multiple_objects(params, object_sdk) {
const operation = 'ObjectRemoved';
const objects = params.objects.filter(obj => obj.version_id);
if (objects.length > 0) {
dbg.error('S3 Version request not (NotImplemented) for s3_post_bucket_delete', params);
throw new S3Error(S3Error.NotImplemented);
}
const load_for_trigger = object_sdk.should_run_triggers({ active_triggers: this.active_triggers, operation });
const head_res = load_for_trigger && await P.map(params.objects, async obj => {
const request = {
Expand Down Expand Up @@ -655,15 +692,35 @@ class NamespaceCache {
////////////////////

async get_object_tagging(params, object_sdk) {

const object_md = await this.read_object_md(params, object_sdk);
if (object_md.should_read_from_cache) {
return this.namespace_nb.get_object_tagging(params, object_sdk);
}

return this.namespace_hub.get_object_tagging(params, object_sdk);
}

async delete_object_tagging(params, object_sdk) {
return this.namespace_hub.delete_object_tagging(params, object_sdk);

const res = this.namespace_hub.delete_object_tagging(params, object_sdk);
try {
await this.namespace_nb.delete_object_tagging(params, object_sdk);
} catch (err) {
dbg.log0('failed to delete tags in cache', { params: _.omit(params, 'source_stream')});
}
return res;
}

async put_object_tagging(params, object_sdk) {
return this.namespace_hub.put_object_tagging(params, object_sdk);

const res = await this.namespace_hub.put_object_tagging(params, object_sdk);
try {
await this.namespace_nb.put_object_tagging(params, object_sdk);
} catch (err) {
dbg.log0('failed to store tags in cache', { params: _.omit(params, 'source_stream')});
}
return res;
}

//////////////////////////
Expand Down
3 changes: 2 additions & 1 deletion src/sdk/namespace_s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ class NamespaceS3 {
}
dbg.log0('NamespaceS3.upload_object:', this.bucket, inspect(params), 'res', inspect(res));
const etag = s3_utils.parse_etag(res.ETag);
return { etag, version_id: res.VersionId };
const last_modified_time = s3_utils.get_http_response_date(res);
return { etag, version_id: res.VersionId, last_modified_time };
}

////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions src/sdk/nb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ interface ObjectMD {
upload_started?: ID;
create_time?: Date;
cache_last_valid_time?: Date;
last_modified_time?: Date;
etag: string;
md5_b64: string;
sha256_b64: string;
Expand All @@ -388,6 +389,7 @@ interface ObjectInfo {
upload_started?: number;
create_time?: number;
cache_last_valid_time?: number;
last_modified_time?: number;
etag: string;
md5_b64: string;
sha256_b64: string;
Expand Down
18 changes: 18 additions & 0 deletions src/sdk/object_sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ class ObjectSDK {
return bucket.namespace;
}

async read_bucket_sdk_caching_info(name) {
try {
const { bucket } = await bucket_namespace_cache.get_with_cache({ sdk: this, name });
return bucket.namespace ? bucket.namespace.caching : undefined;
} catch (error) {
dbg.log0('read_bucket_sdk_caching_info error', error);
}
}

async read_bucket_sdk_policy_info(name) {
const { bucket } = await bucket_namespace_cache.get_with_cache({ sdk: this, name });
const policy_info = {
Expand Down Expand Up @@ -439,10 +448,19 @@ class ObjectSDK {
} else {
// source cannot be copied directly (different plaforms, accounts, etc.)
// set the source_stream to read from the copy source
// Source params need these for read operations
source_params.object_md = source_md;
source_params.obj_id = source_md.obj_id;
source_params.version_id = source_md.version_id;
// param size is needed when doing an upload. Can be overrided during ranged writes
params.size = source_md.size;

if (ranges) {
if (ranges.length !== 1) throw new Error('fix_copy_source_params: multiple ranges not supported');
source_params.start = ranges[0].start;
source_params.end = ranges[0].end;
// Update the param size with the ranges to be written
params.size = source_params.end - source_params.start;
}
params.source_stream = await source_ns.read_object_stream(source_params, this);
if (params.size > (100 * size_utils.MEGABYTE)) {
Expand Down
6 changes: 5 additions & 1 deletion src/server/object_services/object_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -822,13 +822,16 @@ function _check_encryption_permissions(src_enc, req_enc) {
async function update_object_md(req) {
dbg.log0('object_server.update object md', req.rpc_params);
throw_if_maintenance(req);
const set_updates = _.pick(req.rpc_params, 'content_type', 'xattr', 'cache_last_valid_time');
const set_updates = _.pick(req.rpc_params, 'content_type', 'xattr', 'cache_last_valid_time', 'last_modified_time');
if (set_updates.xattr) {
set_updates.xattr = _.mapKeys(set_updates.xattr, (v, k) => k.replace(/\./g, '@'));
}
if (set_updates.cache_last_valid_time) {
set_updates.cache_last_valid_time = new Date(set_updates.cache_last_valid_time);
}
if (set_updates.last_modified_time) {
set_updates.last_modified_time = new Date(set_updates.last_modified_time);
}
const obj = await find_object_md(req);
await MDStore.instance().update_object_by_id(obj._id, set_updates);
}
Expand Down Expand Up @@ -1342,6 +1345,7 @@ function get_object_info(md, options = {}) {
sha256_b64: md.sha256_b64 || undefined,
content_type: md.content_type || 'application/octet-stream',
create_time: md.create_time ? md.create_time.getTime() : md._id.getTimestamp().getTime(),
last_modified_time: md.last_modified_time ? (md.last_modified_time.getTime()) : undefined,
cache_last_valid_time: md.cache_last_valid_time ? md.cache_last_valid_time.getTime() : undefined,
upload_started: md.upload_started ? md.upload_started.getTimestamp().getTime() : undefined,
upload_size: _.isNumber(md.upload_size) ? md.upload_size : undefined,
Expand Down
8 changes: 7 additions & 1 deletion src/server/object_services/schemas/object_md_schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,14 @@ module.exports = {
upload_size: { type: 'integer' },
upload_started: { objectid: true },
create_time: { date: true },
// cache_last_valid_time is an optional property set for objects in cache buckets.
// This property indicates the time when the cached object was in sync with the
// object on the hub bucket.
cache_last_valid_time: { date: true },

// last_modified_time is an optional property that is set on cache buckets
// to separate the times of creation of the cache object (create_time)
// vs the "real" LastModifiedTime from the hub.
last_modified_time: { date: true },
// etag is the object md5 hex for objects uploaded in single action.
// for multipart upload etag is a special aggregated md5 of the parts md5's.
etag: { type: 'string', },
Expand Down
Loading

0 comments on commit e9b614e

Please sign in to comment.