-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Client/Server Editing #1043
Changes from all commits
5fc55f0
3270f77
957f95a
34ac441
e4d00c4
d571652
2eb567b
0678a1c
a279584
73f6ead
bc31b73
21f81b9
554c8e8
0723798
1402232
954dbd2
40dd19a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,7 @@ t_gnode::t_gnode(const t_schema& input_schema, const t_schema& output_schema) | |
, m_output_schema(output_schema) | ||
, m_init(false) | ||
, m_id(0) | ||
, m_last_input_port_id(0) | ||
, m_pool_cleanup([]() {}) { | ||
PSP_TRACE_SENTINEL(); | ||
LOG_CONSTRUCTOR("t_gnode"); | ||
|
@@ -77,11 +78,14 @@ t_gnode::init() { | |
m_gstate = std::make_shared<t_gstate>(m_input_schema, m_output_schema); | ||
m_gstate->init(); | ||
|
||
// Create a single input port | ||
std::shared_ptr<t_port> port | ||
= std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema); | ||
port->init(); | ||
m_iports.push_back(port); | ||
// Create and store the main input port, which is always port 0. The next | ||
// input port will be port 1, and so on | ||
std::shared_ptr<t_port> input_port = | ||
std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema); | ||
|
||
input_port->init(); | ||
|
||
m_input_ports[0] = input_port; | ||
|
||
for (t_uindex idx = 0, loop_end = m_transitional_schemas.size(); idx < loop_end; ++idx) { | ||
t_port_mode mode = idx == 0 ? PORT_MODE_PKEYED : PORT_MODE_RAW; | ||
|
@@ -92,32 +96,46 @@ t_gnode::init() { | |
m_oports.push_back(port); | ||
} | ||
|
||
std::shared_ptr<t_port>& iport = m_iports[0]; | ||
std::shared_ptr<t_data_table> flattened = iport->get_table()->flatten(); | ||
for (auto& iter : m_input_ports) { | ||
std::shared_ptr<t_port> input_port = iter.second; | ||
input_port->get_table()->flatten(); | ||
} | ||
|
||
m_init = true; | ||
} | ||
|
||
std::string | ||
t_gnode::repr() const { | ||
std::stringstream ss; | ||
ss << "t_gnode<" << this << ">"; | ||
return ss.str(); | ||
} | ||
t_uindex | ||
t_gnode::make_input_port() { | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `make_input_port` on an uninited gnode."); | ||
std::shared_ptr<t_port> input_port = | ||
std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema); | ||
input_port->init(); | ||
|
||
void | ||
t_gnode::_send(t_uindex portid, const t_data_table& fragments) { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(portid == 0, "Only simple dataflows supported currently"); | ||
t_uindex port_id = m_last_input_port_id + 1; | ||
m_input_ports[port_id] = input_port; | ||
|
||
std::shared_ptr<t_port>& iport = m_iports[portid]; | ||
iport->send(fragments); | ||
// increment the global input port id | ||
m_last_input_port_id = port_id; | ||
|
||
return port_id; | ||
} | ||
|
||
void | ||
t_gnode::_send_and_process(const t_data_table& fragments) { | ||
_send(0, fragments); | ||
_process(); | ||
t_gnode::remove_input_port(t_uindex port_id) { | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `remove_input_port` on an uninited gnode."); | ||
|
||
if (m_input_ports.count(port_id) == 0) { | ||
std::cerr << "Input port `" << port_id << "` cannot be removed, as it does not exist."; | ||
return; | ||
} | ||
|
||
std::shared_ptr<t_port> input_port = m_input_ports[port_id]; | ||
|
||
// clear the table at the port | ||
input_port->clear(); | ||
|
||
// remove from the map | ||
m_input_ports.erase(port_id); | ||
} | ||
|
||
t_value_transition | ||
|
@@ -217,18 +235,29 @@ t_gnode::_process_mask_existed_rows(t_process_state& process_state) { | |
return mask; | ||
} | ||
|
||
std::shared_ptr<t_data_table> | ||
t_gnode::_process_table() { | ||
t_process_table_result | ||
t_gnode::_process_table(t_uindex port_id) { | ||
m_was_updated = false; | ||
|
||
std::shared_ptr<t_port>& iport = m_iports[0]; | ||
t_process_table_result result; | ||
result.m_flattened_data_table = nullptr; | ||
result.m_should_notify_userspace = false; | ||
|
||
std::shared_ptr<t_data_table> flattened = nullptr; | ||
|
||
if (m_input_ports.count(port_id) == 0) { | ||
std::cerr << "Cannot process table on port `" << port_id << "` as it does not exist." << std::endl; | ||
return result; | ||
} | ||
|
||
std::shared_ptr<t_port> input_port = m_input_ports[port_id]; | ||
|
||
if (iport->get_table()->size() == 0) { | ||
return nullptr; | ||
if (input_port->get_table()->size() == 0) { | ||
return result; | ||
} | ||
|
||
m_was_updated = true; | ||
std::shared_ptr<t_data_table> flattened(iport->get_table()->flatten()); | ||
flattened = input_port->get_table()->flatten(); | ||
|
||
PSP_GNODE_VERIFY_TABLE(flattened); | ||
PSP_GNODE_VERIFY_TABLE(get_table()); | ||
|
@@ -244,6 +273,7 @@ t_gnode::_process_table() { | |
row_lookup[idx] = m_gstate->lookup(pkey); | ||
} | ||
|
||
// first update - master table is empty | ||
if (m_gstate->mapping_size() == 0) { | ||
// Update context from state first - computes columns during update | ||
_update_contexts_from_state(flattened); | ||
|
@@ -256,12 +286,12 @@ t_gnode::_process_table() { | |
auto state_table = get_table(); | ||
PSP_GNODE_VERIFY_TABLE(state_table); | ||
#endif | ||
return nullptr; | ||
// Make sure user is notified after first update. | ||
result.m_should_notify_userspace = true; | ||
return result; | ||
} | ||
|
||
for (t_uindex idx = 0, loop_end = m_iports.size(); idx < loop_end; ++idx) { | ||
m_iports[idx]->release_or_clear(); | ||
} | ||
input_port->release_or_clear(); | ||
|
||
// Use `t_process_state` to manage intermediate structures | ||
t_process_state _process_state; | ||
|
@@ -440,8 +470,13 @@ t_gnode::_process_table() { | |
PSP_GNODE_VERIFY_TABLE(updated_table); | ||
} | ||
#endif | ||
|
||
m_oports[PSP_PORT_FLATTENED]->set_table(flattened_masked); | ||
return flattened_masked; | ||
|
||
result.m_flattened_data_table = flattened_masked; | ||
result.m_should_notify_userspace = true; | ||
|
||
return result; | ||
} | ||
|
||
template <> | ||
|
@@ -532,19 +567,33 @@ t_gnode::_process_column<std::string>( | |
} | ||
|
||
void | ||
t_gnode::_process() { | ||
t_gnode::send(t_uindex port_id, const t_data_table& fragments) { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT( | ||
m_mode == NODE_PROCESSING_SIMPLE_DATAFLOW, "Only simple dataflows supported currently"); | ||
psp_log_time(repr() + " _process.enter"); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `send` to an uninited gnode."); | ||
|
||
std::shared_ptr<t_data_table> flattened_masked = _process_table(); | ||
if (flattened_masked) { | ||
notify_contexts(*flattened_masked); | ||
if (m_input_ports.count(port_id) == 0) { | ||
std::cerr << "Cannot send table to port `" << port_id << "`, which does not exist." << std::endl; | ||
return; | ||
} | ||
|
||
psp_log_time(repr() + " _process.noinit_path.exit"); | ||
std::shared_ptr<t_port>& input_port = m_input_ports[port_id]; | ||
input_port->send(fragments); | ||
} | ||
|
||
bool | ||
t_gnode::process(t_uindex port_id) { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `process` on an uninited gnode."); | ||
|
||
t_process_table_result result = _process_table(port_id); | ||
|
||
if (result.m_flattened_data_table) { | ||
notify_contexts(*result.m_flattened_data_table); | ||
} | ||
|
||
// Whether the user should be notified - False if process_table exited | ||
// early, True otherwise. | ||
return result.m_should_notify_userspace; | ||
} | ||
|
||
t_uindex | ||
|
@@ -553,39 +602,39 @@ t_gnode::mapping_size() const { | |
} | ||
|
||
t_data_table* | ||
t_gnode::_get_otable(t_uindex portidx) { | ||
t_gnode::_get_otable(t_uindex port_id) { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(portidx < m_oports.size(), "Invalid port number"); | ||
return m_oports[portidx]->get_table().get(); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `_get_otable` on an uninited gnode."); | ||
PSP_VERBOSE_ASSERT(port_id < m_oports.size(), "Invalid port number"); | ||
return m_oports[port_id]->get_table().get(); | ||
} | ||
|
||
t_data_table* | ||
t_gnode::_get_itable(t_uindex portidx) { | ||
t_gnode::_get_itable(t_uindex port_id) { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(portidx < m_iports.size(), "Invalid port number"); | ||
return m_iports[portidx]->get_table().get(); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `_get_itable` on an uninited gnode."); | ||
PSP_VERBOSE_ASSERT(m_input_ports.count(port_id) != 0, "Invalid port number"); | ||
return m_input_ports[port_id]->get_table().get(); | ||
} | ||
|
||
t_data_table* | ||
t_gnode::get_table() { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table` on an uninited gnode."); | ||
return m_gstate->get_table().get(); | ||
} | ||
|
||
const t_data_table* | ||
t_gnode::get_table() const { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table` on an uninited gnode."); | ||
return m_gstate->get_table().get(); | ||
} | ||
|
||
std::shared_ptr<t_data_table> | ||
t_gnode::get_table_sptr() { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table_sptr` on an uninited gnode."); | ||
return m_gstate->get_table(); | ||
} | ||
|
||
|
@@ -597,10 +646,16 @@ t_gnode::get_table_sptr() { | |
void | ||
t_gnode::promote_column(const std::string& name, t_dtype new_type) { | ||
PSP_TRACE_SENTINEL(); | ||
PSP_VERBOSE_ASSERT(m_init, "touching uninited object"); | ||
PSP_VERBOSE_ASSERT(m_init, "Cannot `promote_column` on an uninited gnode."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Much better error message! I assume you changed these to trace the segfault? Reminds me that a future improvement may be to completely replace |
||
get_table()->promote_column(name, new_type, 0, false); | ||
_get_otable(0)->promote_column(name, new_type, 0, false); | ||
_get_itable(0)->promote_column(name, new_type, 0, false); | ||
|
||
for (auto& iter : m_input_ports) { | ||
std::shared_ptr<t_port> input_port = iter.second; | ||
std::shared_ptr<t_data_table> input_table = input_port->get_table(); | ||
input_table->promote_column(name, new_type, 0, false); | ||
} | ||
|
||
m_output_schema.retype_column(name, new_type); | ||
m_input_schema.retype_column(name, new_type); | ||
m_transitional_schemas[0].retype_column(name, new_type); | ||
|
@@ -1098,10 +1153,21 @@ t_gnode::get_id() const { | |
return m_id; | ||
} | ||
|
||
t_uindex | ||
t_gnode::num_input_ports() const { | ||
return m_input_ports.size(); | ||
} | ||
|
||
t_uindex | ||
t_gnode::num_output_ports() const { | ||
return m_oports.size(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am OK with removing these too if you're keen. |
||
|
||
void | ||
t_gnode::release_inputs() { | ||
for (const auto& p : m_iports) { | ||
p->release(); | ||
for (auto& iter : m_input_ports) { | ||
std::shared_ptr<t_port> input_port = iter.second; | ||
input_port->release(); | ||
} | ||
} | ||
|
||
|
@@ -1194,8 +1260,9 @@ t_gnode::reset() { | |
|
||
void | ||
t_gnode::clear_input_ports() { | ||
for (t_uindex idx = 0, loop_end = m_oports.size(); idx < loop_end; ++idx) { | ||
m_iports[idx]->get_table()->clear(); | ||
for (auto& iter : m_input_ports) { | ||
std::shared_ptr<t_port> input_port = iter.second; | ||
input_port->get_table()->clear(); | ||
} | ||
} | ||
|
||
|
@@ -1246,6 +1313,13 @@ t_gnode::get_sorted_pkeyed_table() const { | |
return m_gstate->get_sorted_pkeyed_table(); | ||
} | ||
|
||
std::string | ||
t_gnode::repr() const { | ||
std::stringstream ss; | ||
ss << "t_gnode<" << this << ">"; | ||
return ss.str(); | ||
} | ||
|
||
void | ||
t_gnode::register_context(const std::string& name, std::shared_ptr<t_ctx0> ctx) { | ||
_register_context(name, ZERO_SIDED_CONTEXT, reinterpret_cast<std::int64_t>(ctx.get())); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could maybe even remove the concept of no-ops and drop
m_should_notify_userspace
altogether; instead, we can just say if_process_table
made it past the empty-table-checks earlier in this method, we callnotify_userspace
. This may simplify some of the aggregate logic in thecontext_*
classes as well since they won't need to report this information.