Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Simple engine #30

Merged
merged 27 commits into from
Aug 31, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7a3fb78
[simple-engine] cherry pick Minjie's refactoring on interface
hotpxl Aug 23, 2015
6bfcc7d
[simple-engine] fix typo and unmark virtual
hotpxl Aug 23, 2015
e149942
[simple-engine] WIP, need to refactor interface and remove inheritance
hotpxl Aug 23, 2015
cfe80c6
[simple-engine] A not so simple engine that should somehow work
hotpxl Aug 23, 2015
005db3a
[simple-engine] lint
hotpxl Aug 23, 2015
5dfc759
[simple-engine] remove unnecessary files
hotpxl Aug 24, 2015
6b948fd
[simple-engine] that goes worker
hotpxl Aug 24, 2015
c7146a7
[simple-engine] passed some tests
hotpxl Aug 24, 2015
fef28d2
[simple-engine] some document
hotpxl Aug 24, 2015
761e0e1
[simple-engine] implement missing functions
hotpxl Aug 24, 2015
11432db
[simple-engine] fix order
hotpxl Aug 24, 2015
50bf69b
[simple-engine] fix concurrency
hotpxl Aug 24, 2015
1ad9132
[simple-engine] fix a concurrency bug
hotpxl Aug 24, 2015
4662b4e
[simple-engine] lint
hotpxl Aug 24, 2015
d59a125
[simple-engine] Merge branch 'master' into simple-engine
hotpxl Aug 25, 2015
e71055c
[simple-engine] @antinucleon Sorry I had to turn this on to pass Doxygen
hotpxl Aug 25, 2015
be8312d
[simple-engine] heavier test on engine
hotpxl Aug 25, 2015
3a15c13
[simple-engine] add tests to Makefile
hotpxl Aug 25, 2015
e9ab2a2
[simple-engine] use macro for debugging
hotpxl Aug 25, 2015
a3dbb93
[simple-engine] fix engine bug
hotpxl Aug 25, 2015
6b1b9a1
[simple-engine] even more tests
hotpxl Aug 25, 2015
1c57396
[simple-engine] Merge branch 'master' into simple-engine
hotpxl Aug 26, 2015
602a436
[simple-engine] Merge branch 'master' into simple-engine
hotpxl Aug 28, 2015
e85fb0d
[simple-engine] fix concurrency bug
hotpxl Aug 29, 2015
5d4189e
[simple-engine] obj pool raw implementation
hotpxl Aug 29, 2015
bb16dc4
[simple-engine] Merge branch 'master' into simple-engine
hotpxl Aug 29, 2015
8628138
[simple-engine] fix counter
hotpxl Aug 29, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[simple-engine] A not so simple engine that should somehow work
Missing parts:
1. Workers (this should be easy since all conflicts have been resolved)
2. Runtime context (I have totally no idea)
  • Loading branch information
hotpxl committed Aug 23, 2015
commit cfe80c6e2b3cfe9d7332314337def9d90604c4a6
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ ifneq ($(ADD_LDFLAGS), NONE)
LDFLAGS += $(ADD_LDFLAGS)
endif

#BIN = test/test_threaded_engine test/api_registry_test
OBJ = narray_function_cpu.o
# add threaded engine after it is done
OBJCXX11 = dag_engine.o narray.o c_api.o operator.o symbol.o storage.o fully_connected_cpu.o static_graph.o activation_cpu.o graph_executor.o softmax_cpu.o elementwise_sum_cpu.o pooling_cpu.o
OBJCXX11 = dag_engine.o simple_engine.o narray.o c_api.o operator.o symbol.o storage.o fully_connected_cpu.o static_graph.o activation_cpu.o graph_executor.o softmax_cpu.o elementwise_sum_cpu.o pooling_cpu.o
CUOBJ =
SLIB = lib/libmxnet.so
ALIB = lib/libmxnet.a
Expand All @@ -76,6 +75,7 @@ $(DMLC_CORE)/libdmlc.a:

storage.o: src/storage/storage.cc
dag_engine.o: src/dag_engine/dag_engine.cc
simple_engine.o: src/dag_engine/simple_engine.cc
narray.o: src/narray/narray.cc
narray_function_cpu.o: src/narray/narray_function.cc src/narray/narray_function-inl.h
narray_function_gpu.o: src/narray/narray_function.cu src/narray/narray_function-inl.h
Expand All @@ -99,7 +99,6 @@ lib/libmxnet.a: $(OBJ) $(OBJCXX11) $(CUOBJ)
lib/libmxnet.so: $(OBJ) $(OBJCXX11) $(CUOBJ)

test/test_storage: test/test_storage.cc lib/libmxnet.a
#test/test_threaded_engine: test/test_threaded_engine.cc api/libmxnet.a

$(BIN) :
$(CXX) $(CFLAGS) -std=c++0x -o $@ $(filter %.cpp %.o %.c %.a %.cc, $^) $(LDFLAGS)
Expand Down
3 changes: 3 additions & 0 deletions src/dag_engine/dag_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ DAGEngine::~DAGEngine() = default;
DAGEngine::DAGEngine() = default;

DAGEngine* DAGEngine::Get() {
/*!
* \brief Change specific engine to use.
*/
using EngineImplementation = engine::SimpleEngine;

static EngineImplementation inst;
Expand Down
35 changes: 30 additions & 5 deletions src/dag_engine/dag_engine_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,45 @@
#ifndef MXNET_DAG_ENGINE_DAG_ENGINE_IMPL_H_
#define MXNET_DAG_ENGINE_DAG_ENGINE_IMPL_H_

#include <vector>
#include <utility>
#include "mxnet/dag_engine.h"

namespace mxnet {
namespace engine {

struct Var {}; // struct Var
struct Var {
virtual ~Var() = default;

template <typename T>
T* Cast();
}; // struct Var

struct Opr {
DAGEngine::AsyncFn fn;
std::vector<Var*> use_vars;
std::vector<Var*> mutate_vars;
virtual ~Opr() = default;
template <typename T>
T* Cast();
}; // struct Opr

template <typename T>
T* Var::Cast() {
static_assert(std::is_base_of<Var, T>::value, "must inherit `mxnet::engine::Var`");
#ifdef NDEBUG
return reinterpret_cast<T*>(this);
#else // NDEBUG
return dynamic_cast<T*>(this);
#endif // NDEBUG
}

template <typename T>
T* Opr::Cast() {
static_assert(std::is_base_of<Opr, T>::value, "must inherit `mxnet::engine::Opr`");
#ifdef NDEBUG
return reinterpret_cast<T*>(this);
#else // NDEBUG
return dynamic_cast<T*>(this);
#endif // NDEBUG
}

} // namespace engine
} // namespace mxnet

Expand Down
123 changes: 116 additions & 7 deletions src/dag_engine/simple_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,139 @@
* Copyright (c) 2015 by Contributors
*/
#include "simple_engine.h"
#include <mshadow/tensor.h>
#include <dmlc/logging.h>
#include <cassert>
#include <algorithm>
#include <utility>

namespace mxnet {

namespace engine {

SimpleVar* SimpleVar::CastFromBase(Var* v) { return v->Cast<SimpleVar>(); }

SimpleOpr* SimpleOpr::CastFromBase(Opr* o) { return o->Cast<SimpleOpr>(); }

SimpleEngine::SimpleEngine() = default;

SimpleEngine::~SimpleEngine() = default;

SimpleEngine::Variable SimpleEngine::NewVar() { return new Var{}; }
SimpleEngine::Variable SimpleEngine::NewVar() {
auto ret = new SimpleVar{};
ret->var = new VersionedVarBlock{};
return ret;
}

SimpleEngine::Operator SimpleEngine::NewOperator(
SimpleEngine::AsyncFn fn, std::vector<Variable> const& use_vars,
std::vector<Variable> const& mutate_vars) {
return new Opr{fn, use_vars, mutate_vars};
auto ret = new SimpleOpr{};
ret->fn = fn;
ret->use_vars.resize(use_vars.size());
ret->mutate_vars.resize(mutate_vars.size());
std::transform(use_vars.begin(), use_vars.end(), ret->use_vars.begin(),
SimpleVar::CastFromBase);
std::transform(mutate_vars.begin(), mutate_vars.end(),
ret->mutate_vars.begin(), SimpleVar::CastFromBase);
return ret;
}

void SimpleEngine::DeleteOperator(Operator op) {
delete op;
}
void SimpleEngine::DeleteOperator(Operator op) { delete op; }

void SimpleEngine::Push(Operator op, Context exec_ctx) {
static_cast<void>(exec_ctx);
auto opr = SimpleOpr::CastFromBase(op);
auto opr_block = new OprBlock{};
opr_block->wait.store(opr->use_vars.size() + opr->mutate_vars.size() + 1);
// Add reading dependencies.
auto add_dependency = [&opr_block](SimpleVar* i) {
std::lock_guard<dmlc::Spinlock> lock{i->var->lock};
if (!i->var->waiting) {
assert(i->var->next == nullptr);
assert(i->var->join == nullptr);
assert(i->var->trigger == nullptr);
--opr_block->wait;
} else {
auto new_var = new VersionedVarBlock{};
new_var->waiting = true;
assert(i->var->next == nullptr);
i->var->next = new_var;
i->var->trigger = opr_block;
i->var = new_var;
}
};
std::for_each(opr->use_vars.begin(), opr->use_vars.end(), add_dependency);
std::for_each(opr->mutate_vars.begin(), opr->mutate_vars.end(),
add_dependency);
// Add mutation dependencies.
VersionedVarBlock* first = nullptr;
VersionedVarBlock* previous = nullptr;
for (auto i : opr->mutate_vars) {
i->var->lock.lock();
if (!i->var->waiting) {
assert(i->var->next == nullptr);
assert(i->var->join == nullptr);
assert(i->var->trigger == nullptr);
i->var->waiting = true;
} else {
auto new_var = new VersionedVarBlock{};
new_var->waiting = true;
// Moving out from old block, set flag to false.
i->var->waiting = false;
new_var->lock.lock();
i->var->lock.unlock();
i->var = new_var;
}
if (first == nullptr) {
first = i->var;
} else {
previous->join = i->var;
previous->lock.unlock();
}
previous = i->var;
}
if (previous != nullptr) {
previous->lock.unlock();
}
auto callback = [this, first]() { OnComplete(first); };
// TODO do something useful
RunContext ctx{};
opr_block->fn = [opr, ctx, callback]() { opr->fn(ctx, callback); };
if (--opr_block == 0) {
task_queue_.Push(opr_block);
}
}

void SimpleEngine::OnComplete(VersionedVarBlock* trigger) {
auto head = trigger;
while (head != nullptr) {
auto cur = head;
head = trigger->join;
VersionedVarBlock* previous = nullptr;
// Food for thought. This could also be `while true`.
while (cur != nullptr) {
std::lock_guard<dmlc::Spinlock> lock{cur->lock};
assert((cur->next == nullptr) || cur->waiting);
assert((previous == nullptr) || (cur->join == nullptr));
assert((cur->trigger == nullptr) == (cur->next == nullptr));
if (cur->trigger != nullptr && --cur->trigger->wait == 0) {
task_queue_.Push(cur->trigger);
}
if (previous != nullptr) {
delete previous;
}
previous = cur;
if (cur->next == nullptr) {
if (!cur->waiting) {
// No `SimpleOpr` is on this block. Safe to delete.
delete cur;
} else {
cur->waiting = false;
}
break;
}
cur = cur->next;
}
}
}

} // namespace engine
Expand Down
35 changes: 31 additions & 4 deletions src/dag_engine/simple_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,52 @@
#ifndef MXNET_DAG_ENGINE_SIMPLE_ENGINE_H_
#define MXNET_DAG_ENGINE_SIMPLE_ENGINE_H_

#include <dmlc/base.h>
#include <dmlc/concurrency.h>
#include <vector>
#include <functional>
#include <atomic>
#include "mxnet/dag_engine.h"
#include "dag_engine_impl.h"

namespace mxnet {

namespace engine {

// TODO
// 1. Workers
// 2. Runtime context

struct SimpleOpr;
struct OprBlock;

struct VersionedVarBlock {
VersionedVarBlock* next = nullptr;
OprBlock* waiting = nullptr;
VersionedVarBlock* next{nullptr};
VersionedVarBlock* join{nullptr};
OprBlock* trigger{nullptr};
dmlc::Spinlock lock;
bool waiting{false};
}; // struct VersionedVarBlock

struct OprBlock {
std::function<void()> fn;
VersionedVarBlock* trigger;
Opr* opr;
std::atomic<std::size_t> wait{0};
}; // struct OprBlock

struct SimpleVar final : public Var {
VersionedVarBlock* var{nullptr};

static SimpleVar* CastFromBase(Var* ptr);
}; // struct SimpleVar

struct SimpleOpr final : public Opr {
DAGEngine::AsyncFn fn;
std::vector<SimpleVar*> use_vars;
std::vector<SimpleVar*> mutate_vars;

static SimpleOpr* CastFromBase(Opr* ptr);
}; // struct SimpleOpr

class SimpleEngine final : public DAGEngine {
public:
SimpleEngine();
Expand All @@ -45,7 +69,10 @@ class SimpleEngine final : public DAGEngine {

void WaitForAll() override{};

void OnComplete(VersionedVarBlock* var);

private:
dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_;
DISALLOW_COPY_AND_ASSIGN(SimpleEngine);
}; // class SimpleEngine

Expand Down