Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ config.NAMESPACE_CACHING = {
CACHED_PERCENTAGE_LOW_THRESHOLD: 40,
CACHED_PERCENTAGE_HIGH_THRESHOLD: 80,
UPLOAD_SEMAPHORE_TIMEOUT: 30 * 1000,
MIN_OBJECT_AGE_FOR_GC: 1000 * 60 * 60 * 24,
UPLOAD_SEMAPHORE_CAP: Math.floor(
Number(process.env.CONTAINER_MEM_REQUEST ? process.env.CONTAINER_MEM_REQUEST : os.totalmem()) / 8),
};
Expand Down
3 changes: 3 additions & 0 deletions src/api/scrubber_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ module.exports = {
items: { objectid: true }
},
tier: { objectid: true },
evict: {
type: 'boolean',
},
}
},
auth: { system: 'admin' }
Expand Down
9 changes: 9 additions & 0 deletions src/sdk/map_api_types.js
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,17 @@ function get_all_chunks_blocks(chunks) {
(_.flatMapDeep(chunks, chunk => chunk.frags.map(frag => frag.blocks)))
);
}
/**
*
* @param {nb.Chunk[]} chunks
* @returns {nb.Part[]}
*/
function get_all_chunk_parts(chunks) {
return chunks.flatMap(chunk => chunk.parts);
}

exports.ChunkAPI = ChunkAPI;
exports.FragAPI = FragAPI;
exports.BlockAPI = BlockAPI;
exports.get_all_chunks_blocks = get_all_chunks_blocks;
exports.get_all_chunk_parts = get_all_chunk_parts;
4 changes: 4 additions & 0 deletions src/sdk/nb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ interface Bucket extends Base {
namespace?: {
read_resources: NamespaceResource[];
write_resource: NamespaceResource;
caching?: CacheConfig;
};
quota?: Object;
storage_stats: {
Expand All @@ -178,6 +179,9 @@ interface Bucket extends Base {
lambda_triggers?: Object;
}

interface CacheConfig {
ttl?: number;
}
interface NamespaceResource {
_id: ID;
name: string;
Expand Down
13 changes: 7 additions & 6 deletions src/server/bg_services/scrubber.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,21 @@ async function background_worker() {
}

/**
*
*
* Scrubber RPC API to build chunks
*
*
* This is meant to be the only way to run MapBuilder by any non-scrubber creatures
* such as the nodes monitor or other modules that will need it in the future.
*
* The reason they need to call the scrubber to do it is that we want MapBuilder's builder_lock
*
* The reason they need to call the scrubber to do it is that we want MapBuilder's builder_lock
* to prevent concurrent rebuild of the same chunk in concurrency.
*
*
*/
async function build_chunks(req) {
const chunk_ids = _.map(req.rpc_params.chunk_ids, id => MDStore.instance().make_md_id(id));
const tier = req.rpc_params.tier && system_store.data.get_by_id(req.rpc_params.tier);
const builder = new MapBuilder(chunk_ids, tier);
const evict = req.rpc_params.evict;
const builder = new MapBuilder(chunk_ids, tier, evict);
await builder.run();
}

Expand Down
49 changes: 38 additions & 11 deletions src/server/bg_services/tier_ttf_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,26 @@ class TieringTTFWorker {
if (!this._can_run()) return;

console.log('TieringTTFWorker: start running');
const multi_tiered_buckets = this._get_multi_tiered_buckets();
if (!multi_tiered_buckets || !multi_tiered_buckets.length) {
const candidate_buckets = this._get_candidate_buckets();
if (!candidate_buckets || !candidate_buckets.length) {
dbg.log0('no buckets with more than one tier. nothing to do');
this.last_run = 'force';
return config.TIER_TTF_WORKER_EMPTY_DELAY;
}

const wait = await this._rebuild_need_to_move_chunks(multi_tiered_buckets);
const wait = await this._rebuild_need_to_move_chunks(candidate_buckets);
console.log(`TieringTTFWorker: will wait ${wait} ms till next run`);
return wait;
}

_get_multi_tiered_buckets() {
return system_store.data.buckets.filter(bucket => _.isUndefined(bucket.deleting) && bucket.tiering.tiers.length > 1);
_get_candidate_buckets() {
return system_store.data.buckets.filter(bucket =>
!bucket.deleting && (
// including buckets that have 2 or more tiers
bucket.tiering.tiers.length > 1 ||
// including cache buckets to handle chunk eviction
(bucket.namespace && bucket.namespace.caching)
));
}

async _rebuild_need_to_move_chunks(buckets) {
Expand Down Expand Up @@ -120,16 +126,37 @@ class TieringTTFWorker {
default:
chunks_to_rebuild = 1;
}


if (!chunks_to_rebuild) continue;

const tiering_status = node_allocator.get_tiering_status(bucket.tiering);
const previous_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status);
const next_tier_order = this.find_tier_order_in_tiering(bucket, previous_tier) + 1;
const next_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status, next_tier_order);

// for cache buckets when we are on the last tier, we evict the chunks
const cache_evict = bucket.namespace.caching && !next_tier;

// no point in calling build_chunks when there is no next_tier or evict
if (!next_tier && !cache_evict) continue;

const next_tier_id = next_tier ? next_tier._id : undefined;

const chunk_ids = await MDStore.instance().find_oldest_tier_chunk_ids(previous_tier._id, chunks_to_rebuild, 1);
if (!chunk_ids.length) continue;
const next_tier = mapper.select_tier_for_write(bucket.tiering, tiering_status, next_tier_order);
if (!next_tier) continue;
console.log(`TieringTTFWorker: Moving the following ${chunks_to_rebuild} from ${previous_tier._id} to chunks to next tier ${next_tier._id}`, chunk_ids);
await this._build_chunks(chunk_ids, next_tier._id);

if (cache_evict) {
console.log(`TieringTTFWorker: Evicting following ${chunks_to_rebuild} from ${previous_tier._id} `, chunk_ids);
} else {
console.log(`TieringTTFWorker: Moving the following ${chunks_to_rebuild} from ${previous_tier._id} to chunks to next tier ${next_tier_id}`, chunk_ids);
}

await this._build_chunks(
chunk_ids,
next_tier_id,
cache_evict
);
}
this.last_run = undefined;
return config.TIER_TTF_WORKER_BATCH_DELAY;
Expand All @@ -139,8 +166,8 @@ class TieringTTFWorker {
return bucket.tiering.tiers.find(t => String(t.tier._id) === String(tier._id)).order;
}

async _build_chunks(chunk_ids, next_tier) {
return this.client.scrubber.build_chunks({ chunk_ids, tier: next_tier }, {
async _build_chunks(chunk_ids, next_tier, cache_evict) {
return this.client.scrubber.build_chunks({ chunk_ids, tier: next_tier, evict: cache_evict }, {
auth_token: auth_server.make_auth_token({
system_id: system_store.data.systems[0]._id,
role: 'admin'
Expand Down
35 changes: 29 additions & 6 deletions src/server/object_services/map_builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
'use strict';

const _ = require('lodash');
const assert = require('assert');

const P = require('../../util/promise');
const dbg = require('../../util/debug_module')(__filename);
const config = require('../../../config');
// const config = require('../../../config.js');
// const mapper = require('./mapper');
const MDStore = require('./md_store').MDStore;
Expand Down Expand Up @@ -35,12 +37,17 @@ const builder_lock = new KeysLock();
class MapBuilder {

/**
* @param {nb.ID[]} chunk_ids
* @param {nb.ID[]} chunk_ids
* @param {nb.Tier} [move_to_tier]
* @param {boolean} [evict]
*/
constructor(chunk_ids, move_to_tier) {
constructor(chunk_ids, move_to_tier, evict) {
this.chunk_ids = chunk_ids;
this.move_to_tier = move_to_tier;
this.evict = evict;

// eviction and move to tier are mutually exclusive
if (evict) assert.strictEqual(move_to_tier, undefined);

/** @type {nb.ID[]} */
this.second_pass_chunk_ids = [];
Expand All @@ -67,7 +74,7 @@ class MapBuilder {
}

/**
* @param {nb.ID[]} chunk_ids
* @param {nb.ID[]} chunk_ids
*/
async run_build(chunk_ids) {
await system_store.refresh();
Expand All @@ -80,7 +87,7 @@ class MapBuilder {
* Note that there is always a possibility that the chunks will cease to exist
* TODO: We can release the unrelevant chunks from the surround_keys
* This will allow other batches to run if they wait on non existing chunks
* @param {nb.ID[]} chunk_ids
* @param {nb.ID[]} chunk_ids
* @returns {Promise<nb.Chunk[]>}
*/
async reload_chunks(chunk_ids) {
Expand All @@ -93,6 +100,7 @@ class MapBuilder {
const loaded_chunks = loaded_chunks_db.map(chunk_db => new ChunkDB(chunk_db));
dbg.log1('MapBuilder.reload_chunks:', loaded_chunks);


/** @type {nb.Block[]} */
const blocks_to_delete = [];

Expand Down Expand Up @@ -127,7 +135,10 @@ class MapBuilder {
// const tiering_status = node_allocator.get_tiering_status(chunk.bucket.tiering);
// chunk.tier = mapper.select_tier_for_write(chunk.bucket.tiering, tiering_status);
// }

if (this.evict) {
chunks_to_delete.push(chunk);
return;
}
if (!chunk.parts || !chunk.parts.length || !chunk.bucket) {
const last_hour = this.start_run - (60 * 60 * 1000); // chunks that were created in the last hour will not be deleted
dbg.log0('unreferenced chunk to delete', chunk);
Expand All @@ -151,15 +162,27 @@ class MapBuilder {
'blocks_to_delete', blocks_to_delete.length);

await Promise.all([
this.evict && map_deleter.delete_parts_by_chunks(chunks_to_delete_uniq),
map_deleter.delete_blocks(blocks_to_delete),
map_deleter.delete_chunks(chunks_to_delete_uniq),

]);

// Deleting objects with no parts here as the delete_parts_by_chunks need to finish before
// any attempt is made to delete the objects.
if (this.evict) {
const objects_to_gc = _.uniq(loaded_chunks_db.flatMap(chunk => chunk.objects))
.filter(obj => Date.now() - obj.create_time.getTime() > config.NAMESPACE_CACHING.MIN_OBJECT_AGE_FOR_GC);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guymguym Should we not do this cached object aging check at the start of the map builder for the eviction case? This check is just marking object entries as deleted when they are aged out AND objects have no parts.
The chunks, parts and blocks related to eviction have already been deleted by this point.

Copy link
Member

@guymguym guymguym Aug 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - here are my thoughts:

For chunks (with their parts and blocks) we trigger eviction based on capacity and evict chunks from the cache based on LRU. FIFO is even easier to implement just by not moving chunks to the front of the LRU on hit - and FIFO is age based. So if we will prefer simple age based vs access based caching, we can do that - actually we should just add this for dev/test.

For objects_md cleanup - keep in mind that even if we are limited on cache capacity and have to evict, it is still very much likely that cached object_mds have not been deleted or overwritten in the hub, so keeping them is not a bad thing at all. But we do need to evict eventually if the workload will read too many of these to the cache than we can store in the DB. Today we do not have a capacity limit for object_mds and we do not order them in LRU. However we can easily iterate object_mds in FIFO order by traversing the _id index in order or creation in the cache and then check if this object has no parts and then delete.

Ultimately I think object_md cleanup calls for a separate worker - because the iteration subject and order are completely different than tier_ttf_worker. In its own worker we should check how close we are to the object_mds limit we can store in the DB (either config or calculate based on PV), and cleanup in FIFO order.

For now I thought we can piggy back the chunks eviction, BUT adding the age condition I suggested is presenting an issue - if we evict all chunks/parts of an object but it is too young and we filter it out from objects_to_gc, we may never come back to evict more chunks for this same object_md and it will never be evicted. So I would remove the filter for now, but in any case to make it work well we need to move this to its own worker.

if (objects_to_gc.length) {
dbg.log1('MapBuilder.delete_objects_if_no_parts:', objects_to_gc);
await Promise.all(objects_to_gc.map(map_deleter.delete_object_if_no_parts));
}
}
return chunks_to_build;
}

/**
* @param {nb.Chunk[]} chunks
* @param {nb.Chunk[]} chunks
*/
async build_chunks(chunks) {

Expand Down
32 changes: 31 additions & 1 deletion src/server/object_services/map_deleter.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const mongo_utils = require('../../util/mongo_utils');
const map_server = require('./map_server');
const { ChunkDB } = require('./map_db_types');
const { get_all_chunks_blocks } = require('../../sdk/map_api_types');
const { get_all_chunk_parts } = require('../../sdk/map_api_types');
/**
*
* delete_object_mappings
Expand All @@ -29,7 +30,7 @@ async function delete_object_mappings(obj) {
}

/**
* @param {nb.ID[]} chunk_ids
* @param {nb.ID[]} chunk_ids
*/
async function delete_chunks_if_unreferenced(chunk_ids) {
if (!chunk_ids || !chunk_ids.length) return;
Expand All @@ -43,6 +44,22 @@ async function delete_chunks_if_unreferenced(chunk_ids) {
}
}

/**
* For eviction in cache buckets we check if there are no parts left
* and then we can delete the object. A new object md will be allocated
* if the same object will be cached again later.
*
* @param {nb.ObjectMD} object
*/
async function delete_object_if_no_parts(object) {
if (!object) return;
dbg.log1('delete_object_if_no_parts: object_ids', object);
const has_parts = await MDStore.instance().has_any_parts_for_object(object);
if (!has_parts) {
await MDStore.instance().delete_object_by_id(object._id);
}
}

/**
* @param {nb.Chunk[]} chunks
*/
Expand Down Expand Up @@ -74,6 +91,17 @@ async function delete_blocks(blocks) {
}
}

/**
* @param {nb.Chunk[]} chunks
*/
async function delete_parts_by_chunks(chunks) {
if (!chunks || !chunks.length) return;
const parts = get_all_chunk_parts(chunks).filter(part => !part.to_db().deleted);
const part_ids = parts.map(part => part._id);
dbg.log1('delete_parts: parts ', part_ids);
await MDStore.instance().delete_parts_by_ids(part_ids);
}

/**
* delete_blocks_from_agents
* send delete request for the deleted DataBlocks to the agents
Expand Down Expand Up @@ -128,7 +156,9 @@ async function delete_blocks_from_node(blocks) {

// EXPORTS
exports.delete_object_mappings = delete_object_mappings;
exports.delete_object_if_no_parts = delete_object_if_no_parts;
exports.delete_chunks_if_unreferenced = delete_chunks_if_unreferenced;
exports.delete_chunks = delete_chunks;
exports.delete_blocks = delete_blocks;
exports.delete_parts_by_chunks = delete_parts_by_chunks;
exports.delete_blocks_from_nodes = delete_blocks_from_nodes;
28 changes: 28 additions & 0 deletions src/server/object_services/md_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,25 @@ class MDStore {
}).toArray();
}

/**
* @param {nb.ID[]} part_ids
*
*/
async delete_parts_by_ids(part_ids) {
if (!part_ids || !part_ids.length) return;

const delete_date = new Date();
return this._parts.col().updateMany({
_id: {
$in: part_ids
},
deleted: null
}, {
$set: {
deleted: delete_date
},
});
}
/**
* @param {nb.ID[]} chunk_ids
* @returns {Promise<nb.ID[]>}
Expand Down Expand Up @@ -1514,6 +1533,15 @@ class MDStore {
.then(obj => Boolean(obj));
}


has_any_parts_for_object(obj) {
return this._parts.col().findOne({
obj: obj._id,
deleted: null
})
.then(part => Boolean(part));
}

db_delete_chunks(chunk_ids) {
if (!chunk_ids || !chunk_ids.length) return;
dbg.warn('Removing the following chunks from DB:', chunk_ids);
Expand Down
Loading