Skip to content

Commit

Permalink
add OneAggregatePairs worker for #229
Browse files Browse the repository at this point in the history
  • Loading branch information
mpadge committed Jun 5, 2024
1 parent fc6c75a commit 998352b
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 2 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: dodgr
Title: Distances on Directed Graphs
Version: 0.4.0.002
Version: 0.4.0.003
Authors@R: c(
person("Mark", "Padgham", , "mark.padgham@email.com", role = c("aut", "cre")),
person("Andreas", "Petutschnig", role = "aut"),
Expand Down
2 changes: 1 addition & 1 deletion codemeta.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"codeRepository": "https://github.com/UrbanAnalyst/dodgr",
"issueTracker": "https://github.com/UrbanAnalyst/dodgr/issues",
"license": "https://spdx.org/licenses/GPL-3.0",
"version": "0.4.0.002",
"version": "0.4.0.003",
"programmingLanguage": {
"@type": "ComputerLanguage",
"name": "R",
Expand Down
144 changes: 144 additions & 0 deletions src/flows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,150 @@ struct OneAggregate : public RcppParallel::Worker
};


struct OneAggregatePaired : public RcppParallel::Worker
{
RcppParallel::RVector <int> dp_fromtoi;
const RcppParallel::RVector <double> flows;
const std::vector <std::string> vert_name;
const std::unordered_map <std::string, size_t> verts_to_edge_map;
size_t nfrom;
size_t nverts; // can't be const because of reinterpret cast
size_t nedges;
const bool norm_sums;
const double tol;
const std::string heap_type;
std::shared_ptr <DGraph> g;

std::vector <double> output;

// Constructor 1: The main constructor
OneAggregatePaired (
const RcppParallel::RVector <int> fromtoi,
const RcppParallel::RVector <double> flows_in,
const std::vector <std::string> vert_name_in,
const std::unordered_map <std::string, size_t> verts_to_edge_map_in,
const size_t nfrom_in,
const size_t nverts_in,
const size_t nedges_in,
const bool norm_sums_in,
const double tol_in,
const std::string &heap_type_in,
const std::shared_ptr <DGraph> g_in
) :
dp_fromtoi (fromtoi), flows (flows_in),
vert_name (vert_name_in),
verts_to_edge_map (verts_to_edge_map_in),
nfrom (nfrom_in), nverts (nverts_in), nedges (nedges_in),
norm_sums (norm_sums_in), tol (tol_in), heap_type (heap_type_in),
g (g_in), output ()
{
output.resize (nedges, 0.0);
}

// Constructor 2: The Split constructor
OneAggregatePaired (
const OneAggregatePaired& oneAggregatePaired,
RcppParallel::Split) :
dp_fromtoi (oneAggregatePaired.dp_fromtoi),
flows (oneAggregatePaired.flows),
vert_name (oneAggregatePaired.vert_name),
verts_to_edge_map (oneAggregatePaired.verts_to_edge_map),
nfrom (oneAggregatePaired.nfrom),
nverts (oneAggregatePaired.nverts),
nedges (oneAggregatePaired.nedges),
norm_sums (oneAggregatePaired.norm_sums),
tol (oneAggregatePaired.tol),
heap_type (oneAggregatePaired.heap_type),
g (oneAggregatePaired.g), output ()
{
output.resize (nedges, 0.0);
}

// Parallel function operator
void operator() (size_t begin, size_t end)
{
std::shared_ptr<PF::PathFinder> pathfinder =
std::make_shared <PF::PathFinder> (nverts,
*run_sp::getHeapImpl (heap_type), g);
std::vector <double> w (nverts);
std::vector <double> d (nverts);
std::vector <long int> prev (nverts);

for (size_t i = begin; i < end; i++)
{
//if (RcppThread::isInterrupted (i % static_cast<int>(100) == 0))
//if (RcppThread::isInterrupted ())
// return;

const size_t from_i = static_cast <size_t> (dp_fromtoi [i]);
// to_i has to be a vector for Dijkstra algo, but has only 1
// element.
const std::vector <size_t> to_i = {static_cast <size_t> (dp_fromtoi [nfrom + i])};

// These have to be reserved within the parallel operator function!
std::fill (w.begin (), w.end (), INFINITE_DOUBLE);
std::fill (d.begin (), d.end (), INFINITE_DOUBLE);
std::fill (prev.begin (), prev.end (), INFINITE_INT);

d [from_i] = w [from_i] = 0.0;

pathfinder->Dijkstra (d, w, prev, from_i, to_i);
// loop has always only one element here:
for (size_t j = 0; j < to_i.size (); j++)
{
if (w [to_i [j]] < INFINITE_DOUBLE && flows [i] > 0.0)
{
// count how long the path is, so flows on
// each edge can be divided by this length
int path_len = 1;
if (norm_sums)
{
path_len = 0;
long int target_t = static_cast <long int> (to_i [j]);
while (target_t < INFINITE_INT)
{
path_len++;
size_t target_size_t = static_cast <size_t> (target_t);
target_t = prev [target_size_t];
if (target_t < 0 || target_size_t == from_i)
break;
}
}

long int target = static_cast <int> (to_i [j]); // can equal -1
while (target < INFINITE_INT)
{
size_t stt = static_cast <size_t> (target);
if (prev [stt] >= 0 && prev [stt] < INFINITE_INT)
{
std::string v2 = "f" +
vert_name [static_cast <size_t> (prev [stt])] +
"t" + vert_name [stt];
output [verts_to_edge_map.at (v2)] +=
flows [i] / static_cast <double> (path_len);
}

target = prev [stt];
// Only allocate that flow from origin vertex v to all
// previous vertices up until the target vi
if (target < 0L || target == from_i)
{
break;
}
}
}
}
} // end for i
} // end parallel function operator

void join (const OneAggregatePaired &rhs)
{
for (size_t i = 0; i < output.size (); i++)
output [i] += rhs.output [i];
}
};


struct OneDisperse : public RcppParallel::Worker
{
RcppParallel::RVector <int> dp_fromi;
Expand Down

0 comments on commit 998352b

Please sign in to comment.