Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NS-CACHE: Support S3 operations for cache enabled buckets #6130

Merged
merged 1 commit into from
Aug 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/api/object_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ module.exports = {
content_type: { type: 'string' },
xattr: { $ref: '#/definitions/xattr' },
cache_last_valid_time: { idate: true },
last_modified_time: { idate: true },
}
},
auth: { system: ['admin', 'user'] }
Expand Down Expand Up @@ -1366,6 +1367,7 @@ module.exports = {
content_type: { type: 'string' },
create_time: { idate: true },
cache_last_valid_time: { idate: true },
last_modified_time: { idate: true},
upload_started: { idate: true },
upload_size: { type: 'integer' },
etag: { type: 'string' },
Expand Down
10 changes: 8 additions & 2 deletions src/endpoint/s3/ops/s3_get_bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ async function get_bucket(req) {
(cont_tok_to_key_marker(cont_tok) || start_after) : req.query.marker,
};

if (req.query.get_from_cache) {
params.get_from_cache = req.query.get_from_cache;
}
const reply = await req.object_sdk.list_objects(params);

let res_params = {
'Name': req.params.bucket,
'Prefix': req.query.prefix,
Expand All @@ -41,7 +45,8 @@ async function get_bucket(req) {
if (req.query.delimiter !== '') {
res_params.Delimiter = req.query.delimiter;
}

// Always have last_modified_time take precedence. This time is set only for cached objects.
// Non cached objects will always default to obj.create_time
return {
ListBucketResult: [res_params, list_type === '2' ? {
'ContinuationToken': cont_tok,
Expand All @@ -55,7 +60,8 @@ async function get_bucket(req) {
_.map(reply.objects, obj => ({
Contents: {
Key: obj.key,
LastModified: s3_utils.format_s3_xml_date(obj.create_time),
// if the object specifies last_modified_time we use it, otherwise take create_time.
LastModified: s3_utils.format_s3_xml_date(obj.last_modified_time || obj.create_time),
ETag: `"${obj.etag}"`,
Size: obj.size,
Owner: (!list_type || req.query['fetch-owner']) && s3_utils.DEFAULT_S3_USER,
Expand Down
10 changes: 10 additions & 0 deletions src/endpoint/s3/s3_rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ async function handle_request(req, res) {
usage_report.s3_usage_info.total_calls += 1;
usage_report.s3_usage_info[op_name] = (usage_report.s3_usage_info[op_name] || 0) + 1;



if (req.query && req.query.versionId) {
const caching = await req.object_sdk.read_bucket_sdk_caching_info(req.params.bucket);
if (caching) {
dbg.error('S3 Version request not (NotImplemented) for buckets with caching', op_name, req.method, req.originalUrl);
throw new S3Error(S3Error.NotImplemented);
}
}

const op = S3_OPS[op_name];
if (!op || !op.handler) {
dbg.error('S3 TODO (NotImplemented)', op_name, req.method, req.originalUrl);
Expand Down
22 changes: 21 additions & 1 deletion src/endpoint/s3/s3_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,12 @@ function format_copy_source(copy_source) {

function set_response_object_md(res, object_md) {
res.setHeader('ETag', '"' + object_md.etag + '"');
res.setHeader('Last-Modified', time_utils.format_http_header_date(new Date(object_md.create_time)));

if (object_md.last_modified_time) {
res.setHeader('Last-Modified', time_utils.format_http_header_date(new Date(object_md.last_modified_time)));
} else {
res.setHeader('Last-Modified', time_utils.format_http_header_date(new Date(object_md.create_time)));
}
res.setHeader('Content-Type', object_md.content_type);
res.setHeader('Content-Length', object_md.content_length === undefined ? object_md.size : object_md.content_length);
res.setHeader('Accept-Ranges', 'bytes');
Expand Down Expand Up @@ -549,6 +554,18 @@ function parse_website_to_body(website) {
return reply;
}

function get_http_response_date(res) {
const r = get_http_response_from_resp(res);
if (!r.httpResponse.headers.date) throw new Error("date not found in response header");
return r.httpResponse.headers.date;
}

function get_http_response_from_resp(res) {
const r = res.$response;
if (!r) throw new Error("no $response in s3 returned object");
return r;
}

exports.STORAGE_CLASS_STANDARD = STORAGE_CLASS_STANDARD;
exports.DEFAULT_S3_USER = DEFAULT_S3_USER;
exports.OP_NAME_TO_ACTION = OP_NAME_TO_ACTION;
Expand All @@ -573,3 +590,6 @@ exports.parse_lock_header = parse_lock_header;
exports.parse_body_object_lock_conf_xml = parse_body_object_lock_conf_xml;
exports.parse_to_camel_case = parse_to_camel_case;
exports._is_valid_retention = _is_valid_retention;
exports.get_http_response_from_resp = get_http_response_from_resp;
exports.get_http_response_date = get_http_response_date;

85 changes: 71 additions & 14 deletions src/sdk/namespace_cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const RangeStream = require('../util/range_stream');
const P = require('../util/promise');
const buffer_utils = require('../util/buffer_utils');
const Semaphore = require('../util/semaphore');
const S3Error = require('../endpoint/s3/s3_errors').S3Error;

const _global_cache_uploader = new Semaphore(cache_config.UPLOAD_SEMAPHORE_CAP, {
timeout: cache_config.UPLOAD_SEMAPHORE_TIMEOUT,
Expand All @@ -27,7 +28,17 @@ class NamespaceCache {
}

get_write_resource() {
return this.namespace_hub;
return this;
}

get_bucket() {
return this.namespace_hub.get_bucket();
}

is_same_namespace(other) {
return other instanceof NamespaceCache &&
this.namespace_hub === other.namespace_hub &&
this.namespace_nb === other.namespace_nb;
}

async _delete_object_from_cache(params, object_sdk) {
Expand Down Expand Up @@ -157,18 +168,30 @@ class NamespaceCache {
/////////////////

async list_objects(params, object_sdk) {
// TODO listing from cache only for deevelopment
return this.namespace_nb.list_objects(params, object_sdk);
const get_from_cache = params.get_from_cache;
params = _.omit(params, 'get_from_cache');
if (get_from_cache) {
return this.namespace_nb.list_objects(params, object_sdk);
}
return this.namespace_hub.list_objects(params, object_sdk);
}

async list_uploads(params, object_sdk) {
// TODO listing from cache only for deevelopment
return this.namespace_nb.list_uploads(params, object_sdk);
const get_from_cache = params.get_from_cache;
params = _.omit(params, 'get_from_cache');
if (get_from_cache) {
return this.namespace_nb.list_uploads(params, object_sdk);
}
return this.namespace_hub.list_uploads(params, object_sdk);
guptsanj marked this conversation as resolved.
Show resolved Hide resolved
}

async list_object_versions(params, object_sdk) {
// TODO listing from cache only for deevelopment
return this.namespace_nb.list_object_versions(params, object_sdk);
const get_from_cache = params.get_from_cache;
params = _.omit(params, 'get_from_cache');
if (get_from_cache) {
return this.namespace_nb.list_objects(params, object_sdk);
}
return this.list_object_versions(params, object_sdk);
}

/////////////////
Expand Down Expand Up @@ -468,6 +491,9 @@ class NamespaceCache {
const operation = 'ObjectCreated';
const load_for_trigger = object_sdk.should_run_triggers({ active_triggers: this.active_triggers, operation });

function stream_final_from_promise(promise) {
return callback => promise.then(() => callback(), err => callback(err));
}
const bucket_free_space_bytes = await this._get_bucket_free_space_bytes(params, object_sdk);
let upload_response;
let etag;
Expand All @@ -487,8 +513,14 @@ class NamespaceCache {
const hub_params = { ...params, source_stream: hub_stream };
const hub_promise = this.namespace_hub.upload_object(hub_params, object_sdk);

// defer the final callback of the cache stream until the hub ack
const cache_finalizer = callback => hub_promise.then(() => callback(), err => callback(err));
const cache_final_promise = hub_promise.then(async upload_res => {
const update_params = _.pick(_.defaults(
{ bucket: this.namespace_nb.target_bucket }, params), 'bucket', 'key', 'last_modified_time');
update_params.last_modified_time = (new Date(upload_res.last_modified_time)).getTime();
await object_sdk.rpc_client.object.update_object_md(update_params);
});
const cache_finalizer = stream_final_from_promise(cache_final_promise);

const cache_stream = new stream.PassThrough({ final: cache_finalizer });
const cache_params = { ...params, source_stream: cache_stream };
const cache_promise = _global_cache_uploader.surround_count(
Expand Down Expand Up @@ -570,9 +602,9 @@ class NamespaceCache {

async complete_object_upload(params, object_sdk) {

setImmediate(() => this._delete_object_from_cache(params, object_sdk));

return this.namespace_hub.complete_object_upload(params, object_sdk);
const res = await this.namespace_hub.complete_object_upload(params, object_sdk);
await this._delete_object_from_cache(params, object_sdk);
return res;
}

async abort_object_upload(params, object_sdk) {
Expand Down Expand Up @@ -609,6 +641,11 @@ class NamespaceCache {

async delete_multiple_objects(params, object_sdk) {
const operation = 'ObjectRemoved';
const objects = params.objects.filter(obj => obj.version_id);
if (objects.length > 0) {
dbg.error('S3 Version request not (NotImplemented) for s3_post_bucket_delete', params);
throw new S3Error(S3Error.NotImplemented);
}
const load_for_trigger = object_sdk.should_run_triggers({ active_triggers: this.active_triggers, operation });
const head_res = load_for_trigger && await P.map(params.objects, async obj => {
const request = {
Expand Down Expand Up @@ -655,15 +692,35 @@ class NamespaceCache {
////////////////////

async get_object_tagging(params, object_sdk) {

const object_md = await this.read_object_md(params, object_sdk);
if (object_md.should_read_from_cache) {
return this.namespace_nb.get_object_tagging(params, object_sdk);
}

return this.namespace_hub.get_object_tagging(params, object_sdk);
}

async delete_object_tagging(params, object_sdk) {
return this.namespace_hub.delete_object_tagging(params, object_sdk);

const res = this.namespace_hub.delete_object_tagging(params, object_sdk);
try {
await this.namespace_nb.delete_object_tagging(params, object_sdk);
} catch (err) {
dbg.log0('failed to delete tags in cache', { params: _.omit(params, 'source_stream')});
}
return res;
}

async put_object_tagging(params, object_sdk) {
return this.namespace_hub.put_object_tagging(params, object_sdk);

const res = await this.namespace_hub.put_object_tagging(params, object_sdk);
try {
await this.namespace_nb.put_object_tagging(params, object_sdk);
} catch (err) {
dbg.log0('failed to store tags in cache', { params: _.omit(params, 'source_stream')});
}
return res;
}

//////////////////////////
Expand Down
3 changes: 2 additions & 1 deletion src/sdk/namespace_s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ class NamespaceS3 {
}
dbg.log0('NamespaceS3.upload_object:', this.bucket, inspect(params), 'res', inspect(res));
const etag = s3_utils.parse_etag(res.ETag);
return { etag, version_id: res.VersionId };
const last_modified_time = s3_utils.get_http_response_date(res);
return { etag, version_id: res.VersionId, last_modified_time };
}

////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions src/sdk/nb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ interface ObjectMD {
upload_started?: ID;
create_time?: Date;
cache_last_valid_time?: Date;
last_modified_time?: Date;
etag: string;
md5_b64: string;
sha256_b64: string;
Expand All @@ -388,6 +389,7 @@ interface ObjectInfo {
upload_started?: number;
create_time?: number;
cache_last_valid_time?: number;
last_modified_time?: number;
etag: string;
md5_b64: string;
sha256_b64: string;
Expand Down
18 changes: 18 additions & 0 deletions src/sdk/object_sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ class ObjectSDK {
return bucket.namespace;
}

async read_bucket_sdk_caching_info(name) {
try {
const { bucket } = await bucket_namespace_cache.get_with_cache({ sdk: this, name });
return bucket.namespace ? bucket.namespace.caching : undefined;
} catch (error) {
dbg.log0('read_bucket_sdk_caching_info error', error);
}
}

async read_bucket_sdk_policy_info(name) {
const { bucket } = await bucket_namespace_cache.get_with_cache({ sdk: this, name });
const policy_info = {
Expand Down Expand Up @@ -439,10 +448,19 @@ class ObjectSDK {
} else {
// source cannot be copied directly (different plaforms, accounts, etc.)
// set the source_stream to read from the copy source
// Source params need these for read operations
source_params.object_md = source_md;
source_params.obj_id = source_md.obj_id;
source_params.version_id = source_md.version_id;
// param size is needed when doing an upload. Can be overrided during ranged writes
params.size = source_md.size;

if (ranges) {
if (ranges.length !== 1) throw new Error('fix_copy_source_params: multiple ranges not supported');
source_params.start = ranges[0].start;
source_params.end = ranges[0].end;
// Update the param size with the ranges to be written
params.size = source_params.end - source_params.start;
}
params.source_stream = await source_ns.read_object_stream(source_params, this);
if (params.size > (100 * size_utils.MEGABYTE)) {
Expand Down
6 changes: 5 additions & 1 deletion src/server/object_services/object_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -822,13 +822,16 @@ function _check_encryption_permissions(src_enc, req_enc) {
async function update_object_md(req) {
dbg.log0('object_server.update object md', req.rpc_params);
throw_if_maintenance(req);
const set_updates = _.pick(req.rpc_params, 'content_type', 'xattr', 'cache_last_valid_time');
const set_updates = _.pick(req.rpc_params, 'content_type', 'xattr', 'cache_last_valid_time', 'last_modified_time');
if (set_updates.xattr) {
set_updates.xattr = _.mapKeys(set_updates.xattr, (v, k) => k.replace(/\./g, '@'));
}
if (set_updates.cache_last_valid_time) {
set_updates.cache_last_valid_time = new Date(set_updates.cache_last_valid_time);
}
if (set_updates.last_modified_time) {
set_updates.last_modified_time = new Date(set_updates.last_modified_time);
}
const obj = await find_object_md(req);
await MDStore.instance().update_object_by_id(obj._id, set_updates);
}
Expand Down Expand Up @@ -1342,6 +1345,7 @@ function get_object_info(md, options = {}) {
sha256_b64: md.sha256_b64 || undefined,
content_type: md.content_type || 'application/octet-stream',
create_time: md.create_time ? md.create_time.getTime() : md._id.getTimestamp().getTime(),
last_modified_time: md.last_modified_time ? (md.last_modified_time.getTime()) : undefined,
cache_last_valid_time: md.cache_last_valid_time ? md.cache_last_valid_time.getTime() : undefined,
upload_started: md.upload_started ? md.upload_started.getTimestamp().getTime() : undefined,
upload_size: _.isNumber(md.upload_size) ? md.upload_size : undefined,
Expand Down
8 changes: 7 additions & 1 deletion src/server/object_services/schemas/object_md_schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,14 @@ module.exports = {
upload_size: { type: 'integer' },
upload_started: { objectid: true },
create_time: { date: true },
// cache_last_valid_time is an optional property set for objects in cache buckets.
// This property indicates the time when the cached object was in sync with the
// object on the hub bucket.
cache_last_valid_time: { date: true },

// last_modified_time is an optional property that is set on cache buckets
// to separate the times of creation of the cache object (create_time)
// vs the "real" LastModifiedTime from the hub.
last_modified_time: { date: true },
guptsanj marked this conversation as resolved.
Show resolved Hide resolved
// etag is the object md5 hex for objects uploaded in single action.
// for multipart upload etag is a special aggregated md5 of the parts md5's.
etag: { type: 'string', },
Expand Down
Loading