Skip to content

Commit

Permalink
Merge pull request #775 from milroy/unpack-at
Browse files Browse the repository at this point in the history
Implement unpack_at for JGF
  • Loading branch information
mergify[bot] authored Jan 22, 2021
2 parents ae1628f + d931055 commit a51b004
Show file tree
Hide file tree
Showing 32 changed files with 412 additions and 65 deletions.
170 changes: 113 additions & 57 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
\*****************************************************************************/

#include <map>
#include <unordered_set>
#include <unistd.h>
#include <jansson.h>
#include "resource/readers/resource_reader_jgf.hpp"
Expand Down Expand Up @@ -466,6 +467,21 @@ vtx_t resource_reader_jgf_t::create_vtx (resource_graph_t &g,
return v;
}

vtx_t resource_reader_jgf_t::vtx_in_graph (const resource_graph_metadata_t &m,
const std::map<std::string,
std::string> &paths)
{
std::map<std::string, vtx_t>::const_iterator bp_it;
for (auto const &paths_it : paths) {
bp_it = m.by_path.find (paths_it.second);
if (bp_it != m.by_path.end ()) {
return bp_it->second;
}
}

return boost::graph_traits<resource_graph_t>::null_vertex ();
}

bool resource_reader_jgf_t::is_root (const std::string &path)
{
return (std::count (path.begin (), path.end (), '/') == 1);
Expand Down Expand Up @@ -499,8 +515,14 @@ int resource_reader_jgf_t::add_graph_metadata (vtx_t v,
for (auto kv : g[v].paths) {
if (is_root (kv.second)) {
ptr = m.roots.emplace (kv.first, v);
if (!ptr.second)
if (!ptr.second) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": failed to add root metadata for ";
m_err_msg += kv.first + " subsystem. ";
m_err_msg += "Possible duplicate root.\n";
goto done;
}
}
m.by_path[kv.second] = v;
}
Expand All @@ -513,6 +535,31 @@ int resource_reader_jgf_t::add_graph_metadata (vtx_t v,
return rc;
}

int resource_reader_jgf_t::update_vmap (std::map<std::string,
vmap_val_t> &vmap,
vtx_t v,
const std::map<std::string,
bool> &root_checks,
const fetch_helper_t &fetcher)
{
int rc = -1;
std::pair<std::map<std::string, vmap_val_t>::iterator, bool> ptr;
ptr = vmap.emplace (std::string (fetcher.vertex_id),
vmap_val_t{v, root_checks,
static_cast<unsigned int> (fetcher.size),
static_cast<unsigned int> (fetcher.exclusive)});
if (!ptr.second) {
m_err_msg += __FUNCTION__;
m_err_msg += ": can't insert into vmap for ";
m_err_msg += std::string (fetcher.vertex_id) + ".\n";
goto done;
}
rc = 0;

done:
return rc;
}

int resource_reader_jgf_t::add_vtx (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
Expand All @@ -537,17 +584,8 @@ int resource_reader_jgf_t::add_vtx (resource_graph_t &g,
goto done;
if ( (rc = add_graph_metadata (v, g, m)) == -1)
goto done;

ptr = vmap.emplace (std::string (fetcher.vertex_id),
vmap_val_t{v, root_checks,
static_cast<unsigned int> (fetcher.size),
static_cast<unsigned int> (fetcher.exclusive)});
if (!ptr.second) {
m_err_msg += __FUNCTION__;
m_err_msg += ": can't insert into vmap for ";
m_err_msg += std::string (fetcher.vertex_id) + ".\n";
if ( (rc = update_vmap (vmap, v, root_checks, fetcher)) != 0)
goto done;
}
rc = 0;

done:
Expand Down Expand Up @@ -674,19 +712,8 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
goto done;
if ( (rc = check_root (v, g, root_checks)) != 0)
goto done;

ptr = vmap.emplace (std::string (fetcher.vertex_id),
vmap_val_t{v, root_checks,
static_cast<unsigned int> (fetcher.size),
static_cast<unsigned int> (fetcher.exclusive)});
if (!ptr.second) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": can't insert into vmap.\n";
rc = -1;
if ( (rc = update_vmap (vmap, v, root_checks, fetcher)) != 0)
goto done;
}

if ( (rc = update_vtx_plan (v, g, fetcher, jobid, at, dur, rsv)) != 0)
goto done;

Expand Down Expand Up @@ -737,18 +764,39 @@ int resource_reader_jgf_t::unpack_vertices (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string,
vmap_val_t> &vmap,
json_t *nodes)
json_t *nodes,
std::unordered_set<std::string>
&added_vtcs)
{
int rc = -1;
unsigned int i = 0;
fetch_helper_t fetcher;
vtx_t null_vtx = boost::graph_traits<resource_graph_t>::null_vertex ();
std::map<std::string, bool> root_checks;
std::pair<std::map<std::string, vmap_val_t>::iterator, bool> ptr;

for (i = 0; i < json_array_size (nodes); i++) {
fetcher.scrub ();
if (unpack_vtx (json_array_get (nodes, i), fetcher) != 0)
goto done;
if (add_vtx (g, m, vmap, fetcher) != 0)
goto done;

// If the vertex isn't in the graph, add it
vtx_t v = boost::graph_traits<resource_graph_t>::null_vertex ();
if ( (v = vtx_in_graph (m, fetcher.paths)) == null_vtx) {
if (add_vtx (g, m, vmap, fetcher) != 0)
goto done;
auto res = added_vtcs.insert (std::string (fetcher.vertex_id));
if (!res.second) {
errno = EEXIST;
m_err_msg += __FUNCTION__;
m_err_msg += ": can't insert into added_vtcs for ";
m_err_msg += std::string (fetcher.vertex_id) + ".\n";
goto done;
}
} else {
if ( (rc = update_vmap (vmap, v, root_checks, fetcher)) != 0)
goto done;
}
}
rc = 0;

Expand Down Expand Up @@ -833,7 +881,9 @@ int resource_reader_jgf_t::unpack_edges (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string,
vmap_val_t> &vmap,
json_t *edges)
json_t *edges,
const std::unordered_set
<std::string> &added_vtcs)
{
edg_t e;
int rc = -1;
Expand All @@ -850,19 +900,23 @@ int resource_reader_jgf_t::unpack_edges (resource_graph_t &g,
element = json_array_get (edges, i);
if ( (unpack_edge (element, vmap, source, target, &name)) != 0)
goto done;
tie (e, inserted) = add_edge (vmap[source].v, vmap[target].v, g);
if (inserted == false) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": couldn't add an edge to the graph for ";
m_err_msg += source + std::string (" -> ") + target + ".\n";
goto done;
}
json_object_foreach (name, key, value) {
g[e].name[std::string (key)]
= std::string (json_string_value (value));
g[e].idata.member_of[std::string (key)]
= std::string (json_string_value (value));
// We only add the edge when it connects at least one newly added vertex
if ( (added_vtcs.count (source) == 1)
|| (added_vtcs.count (target) == 1)) {
tie (e, inserted) = add_edge (vmap[source].v, vmap[target].v, g);
if (inserted == false) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": couldn't add an edge to the graph for ";
m_err_msg += source + std::string (" -> ") + target + ".\n";
goto done;
}
json_object_foreach (name, key, value) {
g[e].name[std::string (key)]
= std::string (json_string_value (value));
g[e].idata.member_of[std::string (key)]
= std::string (json_string_value (value));
}
}
}
rc = 0;
Expand Down Expand Up @@ -907,24 +961,18 @@ int resource_reader_jgf_t::update_tgt_edge (resource_graph_t &g,
boost::tie (ei, ei_end) = boost::out_edges (vmap[source].v, g);

for (; ei != ei_end; ++ei) {
if (boost::target (*ei, g) == vmap[target].v) {
e = *ei;
found = true;
break;
}
if (boost::target (*ei, g) == vmap[target].v) {
e = *ei;
found = true;
break;
}
}
if (!found) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": JGF edge not found in resource graph.\n";
goto done;
}
if (!(vmap[target].is_roots.empty ())) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": an edge into a root detected!\n";
goto done;
}
g[e].idata.set_for_trav_update (vmap[target].needs,
vmap[target].exclusive, token);
rc = 0;
Expand Down Expand Up @@ -984,6 +1032,7 @@ int resource_reader_jgf_t::unpack (resource_graph_t &g,
json_t *nodes = NULL;
json_t *edges = NULL;
std::map<std::string, vmap_val_t> vmap;
std::unordered_set<std::string> added_vtcs;

if (rank != -1) {
errno = ENOTSUP;
Expand All @@ -993,9 +1042,9 @@ int resource_reader_jgf_t::unpack (resource_graph_t &g,
}
if ( (rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0)
goto done;
if ( (rc = unpack_vertices (g, m, vmap, nodes)) != 0)
if ( (rc = unpack_vertices (g, m, vmap, nodes, added_vtcs)) != 0)
goto done;
if ( (rc = unpack_edges (g, m, vmap, edges)) != 0)
if ( (rc = unpack_edges (g, m, vmap, edges, added_vtcs)) != 0)
goto done;

done:
Expand All @@ -1004,11 +1053,18 @@ int resource_reader_jgf_t::unpack (resource_graph_t &g,
}

int resource_reader_jgf_t::unpack_at (resource_graph_t &g,
resource_graph_metadata_t &m, vtx_t &vtx,
const std::string &str, int rank)
resource_graph_metadata_t &m, vtx_t &vtx,
const std::string &str, int rank)
{
errno = ENOTSUP; // GRUG reader does not support unpack_at
return -1;
/* This functionality is currently experimental, as resource graph
* growth causes a resize of the boost vecS vertex container type.
* Resizing the vecS results in lost job allocations and reservations
* as there is no copy constructor for planner.
* vtx_t vtx is not implemented and may be used in the future
* for optimization.
*/

return unpack (g, m, str, rank);
}

int resource_reader_jgf_t::update (resource_graph_t &g,
Expand Down
24 changes: 18 additions & 6 deletions resource/readers/resource_reader_jgf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#define RESOURCE_READER_JGF_HPP

#include <string>
#include <unordered_set>
#include <jansson.h>
#include "resource/schema/resource_graph.hpp"
#include "resource/readers/resource_reader_base.hpp"
Expand Down Expand Up @@ -104,11 +105,16 @@ class resource_reader_jgf_t : public resource_reader_base_t {
json_t **path, json_t **properties);
int unpack_vtx (json_t *element, fetch_helper_t &f);
vtx_t create_vtx (resource_graph_t &g, const fetch_helper_t &fetcher);
vtx_t vtx_in_graph (const resource_graph_metadata_t &m,
const std::map<std::string, std::string> &paths);
bool is_root (const std::string &path);
int check_root (vtx_t v, resource_graph_t &g,
std::map<std::string, bool> &is_roots);
int add_graph_metadata (vtx_t v, resource_graph_t &g,
resource_graph_metadata_t &m);
int update_vmap (std::map<std::string, vmap_val_t> &vmap, vtx_t v,
const std::map<std::string, bool> &root_checks,
const fetch_helper_t &fetcher);
int add_vtx (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher);
Expand All @@ -123,16 +129,20 @@ class resource_reader_jgf_t : public resource_reader_base_t {
const fetch_helper_t &fetcher, uint64_t jobid, int64_t at,
uint64_t dur, bool rsv);
int unpack_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap, json_t *nodes);
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes,
std::unordered_set<std::string> &added_vtcs);
int undo_vertices (resource_graph_t &g,
std::map<std::string, vmap_val_t> &vmap,
uint64_t jobid, bool rsv);
int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap, json_t *nodes,
int64_t jobid, int64_t at, uint64_t dur, bool rsv);
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes, int64_t jobid, int64_t at,
uint64_t dur, bool rsv);
int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap, json_t *nodes,
int64_t jobid, int64_t at, uint64_t dur);
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes, int64_t jobid, int64_t at,
uint64_t dur);
int unpack_edge (json_t *element, std::map<std::string, vmap_val_t> &vmap,
std::string &source, std::string &target, json_t **name);
int update_src_edge (resource_graph_t &g, resource_graph_metadata_t &m,
Expand All @@ -143,7 +153,9 @@ class resource_reader_jgf_t : public resource_reader_base_t {
std::string &source, std::string &target,
uint64_t token);
int unpack_edges (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap, json_t *edges);
std::map<std::string, vmap_val_t> &vmap,
json_t *edges,
const std::unordered_set<std::string> &added_vtcs);
int update_edges (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *edges, uint64_t token);
Expand Down
2 changes: 2 additions & 0 deletions resource/traversers/dfu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ int dfu_traverser_t::initialize ()
return -1;
}

m_initialized = false;
detail::dfu_impl_t::reset_color ();
for (auto &subsystem : get_match_cb ()->subsystems ()) {
std::map<std::string, int64_t> from_dfv;
if (get_graph_db ()->metadata.roots.find (subsystem)
Expand Down
5 changes: 5 additions & 0 deletions resource/traversers/dfu_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,11 @@ void dfu_impl_t::clear_err_message ()
m_err_msg = "";
}

void dfu_impl_t::reset_color ()
{
m_color.reset ();
}

int dfu_impl_t::prime_pruning_filter (const subsystem_t &s, vtx_t u,
std::map<std::string, int64_t> &to_parent)
{
Expand Down
1 change: 1 addition & 0 deletions resource/traversers/dfu_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class dfu_impl_t {
void set_graph_db (std::shared_ptr<resource_graph_db_t> db);
void set_match_cb (std::shared_ptr<dfu_match_cb_t> m);
void clear_err_message ();
void reset_color ();

/*! Exclusive request? Return true if a resource in resources vector
* matches resource vertex u and its exclusivity field value is TRUE.
Expand Down
1 change: 1 addition & 0 deletions resource/traversers/dfu_impl_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ int dfu_impl_t::upd_dfv (vtx_t u, std::shared_ptr<match_writers_t> &writers,
const std::string &dom = m_match->dom_subsystem ();
f_out_edg_iterator_t ei, ei_end;
m_trav_level++;
(*m_graph)[u].idata.colors[dom] = m_color.gray ();
for (auto &subsystem : m_match->subsystems ()) {
for (tie (ei, ei_end) = out_edges (u, *m_graph); ei != ei_end; ++ei) {
if (!in_subsystem (*ei, subsystem) || stop_explore (*ei, subsystem))
Expand Down
Loading

0 comments on commit a51b004

Please sign in to comment.