Skip to content

Commit

Permalink
Merge pull request #1101 from zekemorton/reapi-match-allocate
Browse files Browse the repository at this point in the history
RQ2 minimal functionality and bug fix to reapi match allocate
  • Loading branch information
mergify[bot] authored Dec 14, 2023
2 parents 250eac7 + e4f265d commit fe872c8
Show file tree
Hide file tree
Showing 16 changed files with 941 additions and 32 deletions.
1 change: 1 addition & 0 deletions resource/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ schema/test/schema_test01
schema/test/schema_test02
utilities/grug2dot
utilities/resource-query
utilities/rq2
6 changes: 6 additions & 0 deletions resource/reapi/bindings/c++/reapi_cli.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class resource_query_t {
const std::shared_ptr<job_info_t> &get_job (const uint64_t jobid);
const bool reservation_exists (const uint64_t jobid);
const bool allocation_exists (const uint64_t jobid);
const unsigned int preorder_count ();
const unsigned int postorder_count ();

/* Mutators */
void clear_resource_query_err_msg ();
Expand Down Expand Up @@ -141,6 +143,10 @@ class reapi_cli_t : public reapi_t {
static int find (void *h, std::string criteria, json_t *&o );
static int info (void *h, const uint64_t jobid, std::string &mode,
bool &reserved, int64_t &at, double &ov);
static int info (void *h, const uint64_t jobid,
std::shared_ptr<job_info_t> &job);
static unsigned int preorder_count (void *h);
static unsigned int postorder_count (void *h);
static int stat (void *h, int64_t &V, int64_t &E,int64_t &J,
double &load, double &min, double &max, double &avg);
static const std::string &get_err_message ();
Expand Down
100 changes: 78 additions & 22 deletions resource/reapi/bindings/c++/reapi_cli_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ int reapi_cli_t::match_allocate (void *h, bool orelse_reserve,
std::shared_ptr<job_info_t> job_info = nullptr;
struct timeval start_time, end_time;
std::stringstream o;
bool matched = false;

try {
Flux::Jobspec::Jobspec job {jobspec};
Expand Down Expand Up @@ -99,6 +100,19 @@ int reapi_cli_t::match_allocate (void *h, bool orelse_reserve,
rc = -1;
goto out;
}

if ( (rc != 0) && (errno == ENOMEM)) {
m_err_msg += __FUNCTION__;
m_err_msg += ": ERROR: Memory error for "
+ std::to_string (rq->get_job_counter ());
rc = -1;
goto out;
}

// Check for an unsuccessful match
if (rc == 0) {
matched = true;
}

if ( (rc = rq->writers->emit (o)) < 0) {
m_err_msg += __FUNCTION__;
Expand All @@ -109,28 +123,6 @@ int reapi_cli_t::match_allocate (void *h, bool orelse_reserve,

R = o.str ();


reserved = (at != 0)? true : false;
st = (reserved)?
job_lifecycle_t::RESERVED : job_lifecycle_t::ALLOCATED;
if (reserved)
rq->set_reservation (jobid);
else
rq->set_allocation (jobid);

job_info = std::make_shared<job_info_t> (jobid, st, at, "", "", ov);
if (job_info == nullptr) {
errno = ENOMEM;
m_err_msg += __FUNCTION__;
m_err_msg += ": ERROR: can't allocate memory: "
+ std::string (strerror (errno))+ "\n";
rc = -1;
goto out;
}

rq->set_job (jobid, job_info);
rq->incr_job_counter ();

if ( (rc = gettimeofday (&end_time, NULL)) < 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": ERROR: gettimeofday: "
Expand All @@ -140,6 +132,30 @@ int reapi_cli_t::match_allocate (void *h, bool orelse_reserve,

ov = get_elapsed_time (start_time, end_time);

if (matched) {
reserved = (at != 0)? true : false;
st = (reserved)?
job_lifecycle_t::RESERVED : job_lifecycle_t::ALLOCATED;
if (reserved)
rq->set_reservation (jobid);
else
rq->set_allocation (jobid);

job_info = std::make_shared<job_info_t> (jobid, st, at, "", "", ov);
if (job_info == nullptr) {
errno = ENOMEM;
m_err_msg += __FUNCTION__;
m_err_msg += ": ERROR: can't allocate memory: "
+ std::string (strerror (errno))+ "\n";
rc = -1;
goto out;
}
rq->set_job (jobid, job_info);

}

rq->incr_job_counter ();

out:
return rc;
}
Expand Down Expand Up @@ -234,6 +250,36 @@ int reapi_cli_t::info (void *h, const uint64_t jobid, std::string &mode,
return 0;
}

int reapi_cli_t::info (void *h, const uint64_t jobid,
std::shared_ptr<job_info_t> &job)
{
resource_query_t *rq = static_cast<resource_query_t *> (h);

if ( !(rq->job_exists (jobid))) {
m_err_msg += __FUNCTION__;
m_err_msg += ": ERROR: nonexistent job "
+ std::to_string (jobid) + "\n";
return -1;
}

job = rq->get_job (jobid);
return 0;
}

unsigned int reapi_cli_t::preorder_count (void *h)
{
resource_query_t *rq = static_cast<resource_query_t *> (h);

return rq->preorder_count ();
}

unsigned int reapi_cli_t::postorder_count (void *h)
{
resource_query_t *rq = static_cast<resource_query_t *> (h);

return rq->postorder_count ();
}

int reapi_cli_t::stat (void *h, int64_t &V, int64_t &E,int64_t &J,
double &load, double &min, double &max, double &avg)
{
Expand Down Expand Up @@ -605,6 +651,16 @@ const bool resource_query_t::reservation_exists (const uint64_t jobid)
return reservations.find (jobid) != reservations.end ();
}

const unsigned int resource_query_t::preorder_count ()
{
return traverser->get_total_preorder_count ();
}

const unsigned int resource_query_t::postorder_count ()
{
return traverser->get_total_postorder_count ();
}

void resource_query_t::clear_resource_query_err_msg ()
{
m_err_msg = "";
Expand Down
18 changes: 15 additions & 3 deletions resource/reapi/bindings/go/src/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func main() {
jgfPtr := flag.String("jgf", "", "path to jgf")
jobspecPtr := flag.String("jobspec", "", "path to jobspec")
reserve := flag.Bool("reserve", false, "or else reserve?")
reserve := false
flag.Parse()

jgf, err := os.ReadFile(*jgfPtr)
Expand All @@ -44,20 +44,32 @@ func main() {
}
fmt.Printf("Jobspec:\n %s\n", jobspec)

reserved, allocated, at, overhead, jobid, err := cli.MatchAllocate(*reserve, string(jobspec))
reserved, allocated, at, overhead, jobid, err := cli.MatchAllocate(reserve, string(jobspec))
if err != nil {
fmt.Printf("Error in ReapiClient MatchAllocate: %v\n", err)
return
}
printOutput(reserved, allocated, at, jobid, err)
reserved, allocated, at, overhead, jobid, err = cli.MatchAllocate(*reserve, string(jobspec))

reserve = true
reserved, allocated, at, overhead, jobid, err = cli.MatchAllocate(reserve, string(jobspec))
fmt.Println("Errors so far: \n", cli.GetErrMsg())

if err != nil {
fmt.Printf("Error in ReapiClient MatchAllocate: %v\n", err)
return
}
printOutput(reserved, allocated, at, jobid, err)

reserved, allocated, at, overhead, jobid, err = cli.MatchAllocate(reserve, string(jobspec))
fmt.Println("Errors so far: \n", cli.GetErrMsg())

if err != nil {
fmt.Printf("Error in ReapiClient MatchAllocate: %v\n", err)
return
}
printOutput(reserved, allocated, at, jobid, err)

err = cli.Cancel(1, false)
if err != nil {
fmt.Printf("Error in ReapiClient Cancel: %v\n", err)
Expand Down
8 changes: 8 additions & 0 deletions resource/utilities/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ target_link_libraries(resource-query
Boost::filesystem
Boost::headers
)
add_executable(rq2
rq2.cpp
rq2.hpp
)
target_link_libraries(rq2
resource
PkgConfig::LIBEDIT
)

add_subdirectory(test)

Loading

0 comments on commit fe872c8

Please sign in to comment.