Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Client/Server Editing #1043

Merged
merged 17 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cpp/perspective/src/cpp/emscripten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,8 @@ namespace binding {
const std::string& index,
t_op op,
bool is_update,
bool is_arrow) {
bool is_arrow,
t_uindex port_id) {
bool table_initialized = has_value(table);
std::shared_ptr<t_pool> pool;
std::shared_ptr<Table> tbl;
Expand Down Expand Up @@ -1162,7 +1163,7 @@ namespace binding {
}

// calculate offset, limit, and set the gnode
tbl->init(data_table, row_count, op);
tbl->init(data_table, row_count, op, port_id);
return tbl;
}

Expand Down Expand Up @@ -1628,6 +1629,8 @@ EMSCRIPTEN_BINDINGS(perspective) {
.function("get_computed_schema", &Table::get_computed_schema)
.function("unregister_gnode", &Table::unregister_gnode)
.function("reset_gnode", &Table::reset_gnode)
.function("make_port", &Table::make_port)
.function("remove_port", &Table::remove_port)
.function("get_id", &Table::get_id)
.function("get_pool", &Table::get_pool)
.function("get_gnode", &Table::get_gnode);
Expand Down
194 changes: 134 additions & 60 deletions cpp/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ t_gnode::t_gnode(const t_schema& input_schema, const t_schema& output_schema)
, m_output_schema(output_schema)
, m_init(false)
, m_id(0)
, m_last_input_port_id(0)
, m_pool_cleanup([]() {}) {
PSP_TRACE_SENTINEL();
LOG_CONSTRUCTOR("t_gnode");
Expand Down Expand Up @@ -77,11 +78,14 @@ t_gnode::init() {
m_gstate = std::make_shared<t_gstate>(m_input_schema, m_output_schema);
m_gstate->init();

// Create a single input port
std::shared_ptr<t_port> port
= std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema);
port->init();
m_iports.push_back(port);
// Create and store the main input port, which is always port 0. The next
// input port will be port 1, and so on
std::shared_ptr<t_port> input_port =
std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema);

input_port->init();

m_input_ports[0] = input_port;

for (t_uindex idx = 0, loop_end = m_transitional_schemas.size(); idx < loop_end; ++idx) {
t_port_mode mode = idx == 0 ? PORT_MODE_PKEYED : PORT_MODE_RAW;
Expand All @@ -92,32 +96,46 @@ t_gnode::init() {
m_oports.push_back(port);
}

std::shared_ptr<t_port>& iport = m_iports[0];
std::shared_ptr<t_data_table> flattened = iport->get_table()->flatten();
for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->get_table()->flatten();
}

m_init = true;
}

std::string
t_gnode::repr() const {
std::stringstream ss;
ss << "t_gnode<" << this << ">";
return ss.str();
}
t_uindex
t_gnode::make_input_port() {
PSP_VERBOSE_ASSERT(m_init, "Cannot `make_input_port` on an uninited gnode.");
std::shared_ptr<t_port> input_port =
std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema);
input_port->init();

void
t_gnode::_send(t_uindex portid, const t_data_table& fragments) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(portid == 0, "Only simple dataflows supported currently");
t_uindex port_id = m_last_input_port_id + 1;
m_input_ports[port_id] = input_port;

std::shared_ptr<t_port>& iport = m_iports[portid];
iport->send(fragments);
// increment the global input port id
m_last_input_port_id = port_id;

return port_id;
}

void
t_gnode::_send_and_process(const t_data_table& fragments) {
_send(0, fragments);
_process();
t_gnode::remove_input_port(t_uindex port_id) {
PSP_VERBOSE_ASSERT(m_init, "Cannot `remove_input_port` on an uninited gnode.");

if (m_input_ports.count(port_id) == 0) {
std::cerr << "Input port `" << port_id << "` cannot be removed, as it does not exist.";
return;
}

std::shared_ptr<t_port> input_port = m_input_ports[port_id];

// clear the table at the port
input_port->clear();

// remove from the map
m_input_ports.erase(port_id);
}

t_value_transition
Expand Down Expand Up @@ -217,18 +235,29 @@ t_gnode::_process_mask_existed_rows(t_process_state& process_state) {
return mask;
}

std::shared_ptr<t_data_table>
t_gnode::_process_table() {
t_process_table_result
t_gnode::_process_table(t_uindex port_id) {
m_was_updated = false;

std::shared_ptr<t_port>& iport = m_iports[0];
t_process_table_result result;
result.m_flattened_data_table = nullptr;
result.m_should_notify_userspace = false;

std::shared_ptr<t_data_table> flattened = nullptr;

if (m_input_ports.count(port_id) == 0) {
std::cerr << "Cannot process table on port `" << port_id << "` as it does not exist." << std::endl;
return result;
}

std::shared_ptr<t_port> input_port = m_input_ports[port_id];

if (iport->get_table()->size() == 0) {
return nullptr;
if (input_port->get_table()->size() == 0) {
return result;
}

m_was_updated = true;
std::shared_ptr<t_data_table> flattened(iport->get_table()->flatten());
flattened = input_port->get_table()->flatten();

PSP_GNODE_VERIFY_TABLE(flattened);
PSP_GNODE_VERIFY_TABLE(get_table());
Expand All @@ -244,6 +273,7 @@ t_gnode::_process_table() {
row_lookup[idx] = m_gstate->lookup(pkey);
}

// first update - master table is empty
if (m_gstate->mapping_size() == 0) {
// Update context from state first - computes columns during update
_update_contexts_from_state(flattened);
Expand All @@ -256,12 +286,12 @@ t_gnode::_process_table() {
auto state_table = get_table();
PSP_GNODE_VERIFY_TABLE(state_table);
#endif
return nullptr;
// Make sure user is notified after first update.
result.m_should_notify_userspace = true;
return result;
}

for (t_uindex idx = 0, loop_end = m_iports.size(); idx < loop_end; ++idx) {
m_iports[idx]->release_or_clear();
}
input_port->release_or_clear();

// Use `t_process_state` to manage intermediate structures
t_process_state _process_state;
Expand Down Expand Up @@ -440,8 +470,13 @@ t_gnode::_process_table() {
PSP_GNODE_VERIFY_TABLE(updated_table);
}
#endif

m_oports[PSP_PORT_FLATTENED]->set_table(flattened_masked);
return flattened_masked;

result.m_flattened_data_table = flattened_masked;
result.m_should_notify_userspace = true;

return result;
}

template <>
Expand Down Expand Up @@ -532,19 +567,33 @@ t_gnode::_process_column<std::string>(
}

void
t_gnode::_process() {
t_gnode::send(t_uindex port_id, const t_data_table& fragments) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(
m_mode == NODE_PROCESSING_SIMPLE_DATAFLOW, "Only simple dataflows supported currently");
psp_log_time(repr() + " _process.enter");
PSP_VERBOSE_ASSERT(m_init, "Cannot `send` to an uninited gnode.");

std::shared_ptr<t_data_table> flattened_masked = _process_table();
if (flattened_masked) {
notify_contexts(*flattened_masked);
if (m_input_ports.count(port_id) == 0) {
std::cerr << "Cannot send table to port `" << port_id << "`, which does not exist." << std::endl;
return;
}

psp_log_time(repr() + " _process.noinit_path.exit");
std::shared_ptr<t_port>& input_port = m_input_ports[port_id];
input_port->send(fragments);
}

bool
t_gnode::process(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "Cannot `process` on an uninited gnode.");

t_process_table_result result = _process_table(port_id);

if (result.m_flattened_data_table) {
notify_contexts(*result.m_flattened_data_table);
}

// Whether the user should be notified - False if process_table exited
// early, True otherwise.
return result.m_should_notify_userspace;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could maybe even remove the concept of no-ops and drop m_should_notify_userspace altogether; instead, we can just say if _process_table made it past the empty-table-checks earlier in this method, we call notify_userspace. This may simplify some of the aggregate logic in the context_* classes as well since they won't need to report this information.

}

t_uindex
Expand All @@ -553,39 +602,39 @@ t_gnode::mapping_size() const {
}

t_data_table*
t_gnode::_get_otable(t_uindex portidx) {
t_gnode::_get_otable(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(portidx < m_oports.size(), "Invalid port number");
return m_oports[portidx]->get_table().get();
PSP_VERBOSE_ASSERT(m_init, "Cannot `_get_otable` on an uninited gnode.");
PSP_VERBOSE_ASSERT(port_id < m_oports.size(), "Invalid port number");
return m_oports[port_id]->get_table().get();
}

t_data_table*
t_gnode::_get_itable(t_uindex portidx) {
t_gnode::_get_itable(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(portidx < m_iports.size(), "Invalid port number");
return m_iports[portidx]->get_table().get();
PSP_VERBOSE_ASSERT(m_init, "Cannot `_get_itable` on an uninited gnode.");
PSP_VERBOSE_ASSERT(m_input_ports.count(port_id) != 0, "Invalid port number");
return m_input_ports[port_id]->get_table().get();
}

t_data_table*
t_gnode::get_table() {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table` on an uninited gnode.");
return m_gstate->get_table().get();
}

const t_data_table*
t_gnode::get_table() const {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table` on an uninited gnode.");
return m_gstate->get_table().get();
}

std::shared_ptr<t_data_table>
t_gnode::get_table_sptr() {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table_sptr` on an uninited gnode.");
return m_gstate->get_table();
}

Expand All @@ -597,10 +646,16 @@ t_gnode::get_table_sptr() {
void
t_gnode::promote_column(const std::string& name, t_dtype new_type) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `promote_column` on an uninited gnode.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better error message! I assume you changed these to trace the segfault? Reminds me that a future improvement may be to completely replace PSP_VERBOSE_ASSERT which check object initialization only and replace with proper valgrind support in the build scripts, stricter clang warnings and our existing excellent test coverage?

get_table()->promote_column(name, new_type, 0, false);
_get_otable(0)->promote_column(name, new_type, 0, false);
_get_itable(0)->promote_column(name, new_type, 0, false);

for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
std::shared_ptr<t_data_table> input_table = input_port->get_table();
input_table->promote_column(name, new_type, 0, false);
}

m_output_schema.retype_column(name, new_type);
m_input_schema.retype_column(name, new_type);
m_transitional_schemas[0].retype_column(name, new_type);
Expand Down Expand Up @@ -1098,10 +1153,21 @@ t_gnode::get_id() const {
return m_id;
}

t_uindex
t_gnode::num_input_ports() const {
return m_input_ports.size();
}

t_uindex
t_gnode::num_output_ports() const {
return m_oports.size();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK with removing these too if you're keen.


void
t_gnode::release_inputs() {
for (const auto& p : m_iports) {
p->release();
for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->release();
}
}

Expand Down Expand Up @@ -1194,8 +1260,9 @@ t_gnode::reset() {

void
t_gnode::clear_input_ports() {
for (t_uindex idx = 0, loop_end = m_oports.size(); idx < loop_end; ++idx) {
m_iports[idx]->get_table()->clear();
for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->get_table()->clear();
}
}

Expand Down Expand Up @@ -1246,6 +1313,13 @@ t_gnode::get_sorted_pkeyed_table() const {
return m_gstate->get_sorted_pkeyed_table();
}

std::string
t_gnode::repr() const {
std::stringstream ss;
ss << "t_gnode<" << this << ">";
return ss.str();
}

void
t_gnode::register_context(const std::string& name, std::shared_ptr<t_ctx0> ctx) {
_register_context(name, ZERO_SIDED_CONTEXT, reinterpret_cast<std::int64_t>(ctx.get()));
Expand Down
Loading