Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource Id remapping support #773

Merged
merged 10 commits into from
Dec 17, 2020
2 changes: 2 additions & 0 deletions resource/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ libresource_la_SOURCES = \
traversers/dfu_impl_update.cpp \
policies/base/dfu_match_cb.cpp \
policies/base/matcher.cpp \
readers/resource_namespace_remapper.cpp \
readers/resource_reader_base.cpp \
readers/resource_spec_grug.cpp \
readers/resource_reader_grug.cpp \
Expand Down Expand Up @@ -61,6 +62,7 @@ libresource_la_SOURCES = \
traversers/dfu_impl.hpp \
policies/base/dfu_match_cb.hpp \
policies/base/matcher.hpp \
readers/resource_namespace_remapper.hpp \
readers/resource_reader_base.hpp \
readers/resource_spec_grug.hpp \
readers/resource_reader_grug.hpp \
Expand Down
180 changes: 160 additions & 20 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class msg_wrap_t {
const flux_msg_t *m_msg = nullptr;
};

struct resobj_t {
std::string exec_target_range;
std::vector<uint64_t> core;
std::vector<uint64_t> gpu;
};

class resource_interface_t {
public:
resource_interface_t () = default;
Expand Down Expand Up @@ -259,6 +265,9 @@ static void find_request_cb (flux_t *h, flux_msg_handler_t *w,
static void status_request_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg);

static void ns_info_request_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg);

static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST,
"sched-fluxion-resource.match", match_request_cb, 0 },
Expand All @@ -284,6 +293,8 @@ static const struct flux_msg_handler_spec htab[] = {
"sched-fluxion-resource.find", find_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST,
"sched-fluxion-resource.status", status_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST,
"sched-fluxion-resource.ns-info", ns_info_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END
};

Expand Down Expand Up @@ -546,12 +557,98 @@ static const char *get_array_string (json_t *array, size_t index)
return s;
}

static int expand_ids (const char *resources, std::vector<uint64_t> &id_vec)
{
int rc = -1;
struct idset *ids = NULL;
try {
unsigned int id;
if ( !(ids = idset_decode (resources)))
goto inval;
if ( (id = idset_first (ids)) == IDSET_INVALID_ID)
goto inval;
id_vec.push_back (id);
while ( (id = idset_next (ids, id)) != IDSET_INVALID_ID)
id_vec.push_back (id);
rc = 0;
} catch (std::bad_alloc &) {
errno = ENOMEM;
goto ret;
}

inval:
errno = EINVAL;
ret:
idset_destroy (ids);
return rc;
}

static int unpack_resobj (json_t *resobj, std::vector<resobj_t> &out)
{
if (!resobj)
goto inval;
try {
size_t index;
json_t *val;

json_array_foreach (resobj, index, val) {
std::string exec_target_range;
std::istringstream istr;
const char *rank = NULL, *core = NULL, *gpu = NULL;
if (json_unpack (val, "{s:s s:{s?:s s?:s}}",
"rank", &rank,
"children",
"core", &core,
"gpu", &gpu) < 0)
goto inval;
istr.str (rank);
while (std::getline (istr, exec_target_range, ',')) {
resobj_t robj;
robj.exec_target_range = exec_target_range;
if (core && expand_ids (core, robj.core) < 0)
goto error;
if (gpu && expand_ids (gpu, robj.gpu) < 0)
goto error;
out.push_back (robj);
}
}
return 0;
} catch (std::bad_alloc &) {
errno = ENOMEM;
goto error;
}
inval:
errno = EINVAL;
error:
return -1;
}

static int remap_hwloc_namespace (std::shared_ptr<resource_ctx_t> &ctx,
json_t *r_lite)
{
std::vector<resobj_t> resobjs;
if (unpack_resobj (r_lite, resobjs) < 0)
return -1;
for (auto &resobj : resobjs) {
/* hwloc reader only needs to remap gpu IDs */
size_t logical;
for (logical = 0; logical < resobj.gpu.size (); logical++) {
if (ctx->reader->namespace_remapper.add (resobj.exec_target_range,
"gpu",
logical,
resobj.gpu[logical]) < 0)
return -1;
}
}
return 0;
}

/* Grow resources for execution targets 'ids', fetching resource
* details in hwloc XML form from the core resource module.
* If 'ids' is the empty set, an empty resource vertex will be instantiated.
*/
static int grow_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
struct idset *ids)
struct idset *ids, json_t *resobj)
{
int rc = -1;
resource_graph_db_t &db = *(ctx->db);
Expand All @@ -573,6 +670,9 @@ static int grow_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
}
else
hwloc_xml = NULL;
// before hwloc reader is used, set remap
if ( (rc = remap_hwloc_namespace (ctx, resobj)) < 0)
goto done;
if ( (rc = grow (ctx, v, rank, hwloc_xml)) < 0)
goto done;
}
Expand Down Expand Up @@ -705,12 +805,11 @@ static int mark (std::shared_ptr<resource_ctx_t> &ctx,
}

static int update_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
struct idset *grow_set,
const char *up,
const char *down)
struct idset *grow_set, json_t *resobj,
const char *up, const char *down)
{
int rc = 0;
if (grow_set && (rc = grow_resource_db (ctx, grow_set)) < 0) {
if (grow_set && (rc = grow_resource_db (ctx, grow_set, resobj)) < 0) {
flux_log_error (ctx->h, "%s: grow_resource_db", __FUNCTION__);
goto done;
}
Expand All @@ -727,26 +826,26 @@ static int update_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
}

/* Given 'resobj' in Rv1 form, decode the set of execution target ranks
* contained in it.
* contained in it as well as r_lite key.
*/
static struct idset *get_grow_idset (json_t *resobj)
static int unpack_resources (json_t *resobj,
struct idset **idset, json_t **r_lite_p)
{
int rc = 0;
struct idset *ids;
int version;
json_t *r_lite;
size_t index;
json_t *val;

if (!(ids = idset_create (0, IDSET_FLAG_AUTOGROW)))
return NULL;
return -1;
if (resobj) {
if (json_unpack (resobj,
"{s:i s:{s:o}}",
"version",
&version,
"version", &version,
"execution",
"R_lite",
&r_lite) < 0)
"R_lite", &r_lite) < 0)
goto inval;
if (version != 1)
goto inval;
Expand All @@ -770,40 +869,43 @@ static struct idset *get_grow_idset (json_t *resobj)
idset_destroy (r_ids);
}
}
return ids;
*idset = ids;
*r_lite_p = r_lite;
return 0;
inval:
errno = EINVAL;
error:
idset_destroy (ids);
return NULL;
return -1;
}

static void update_resource (flux_future_t *f, void *arg)
{
int rc = -1;
const char *up = NULL;
const char *down = NULL;
json_t *grows = NULL;
json_t *resources = NULL;
json_t *r_lite = NULL;
struct idset *grow_set = NULL;
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

if ( (rc = flux_rpc_get_unpack (f, "{s?:o s?:s s?:s}",
"resources", &grows,
"resources", &resources,
"up", &up,
"down", &down)) < 0) {
flux_log_error (ctx->h, "%s: exiting due to resource.acquire failure",
__FUNCTION__);
flux_reactor_stop (flux_get_reactor (ctx->h));
goto done;
}
if (grows) {
if ( !(grow_set = get_grow_idset (grows))) {
if (resources) {
if ( (rc = unpack_resources (resources, &grow_set, &r_lite)) < 0) {
rc = -1;
flux_log_error (ctx->h, "%s: get_grow_idset", __FUNCTION__);
flux_log_error (ctx->h, "%s: unpack_resources", __FUNCTION__);
goto done;
}
}
if ( (rc = update_resource_db (ctx, grow_set, up, down)) < 0) {
if ( (rc = update_resource_db (ctx, grow_set, r_lite, up, down)) < 0) {
flux_log_error (ctx->h, "%s: update_resource_db", __FUNCTION__);
goto done;
}
Expand Down Expand Up @@ -1944,6 +2046,44 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void ns_info_request_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
uint64_t rank, id, remapped_id;
const char *type_name;
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

if (flux_request_unpack (msg, nullptr,
"{s:I s:s s:I}",
"rank", &rank,
"type-name", &type_name,
"id", &id) < 0) {
flux_log_error (h, "%s: flux_respond_unpack", __FUNCTION__);
goto error;
}
if (ctx->reader->namespace_remapper.query (rank, type_name,
id, remapped_id) < 0) {
flux_log_error (h, "%s: namespace_remapper.query", __FUNCTION__);
goto error;
}
if (remapped_id > std::numeric_limits<int64_t>::max ()) {
errno = EOVERFLOW;
flux_log_error (h, "%s: remapped id too large", __FUNCTION__);
goto error;
}
if (flux_respond_pack (h, msg, "{s:I}",
"id", static_cast<int64_t> (
remapped_id)) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}
return;

error:
if (flux_respond_error (h, msg, errno, nullptr) < 0)
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}


/******************************************************************************
* *
Expand Down
Loading