Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update limit behavior to use incrementing index (fix last_by_index aggregate) #2964

Merged
merged 3 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ jobs:
is-release: false

steps:
- name: Free up disk space
run: |
rm -rf /__t/*

- name: Checkout
uses: actions/checkout@v4

Expand Down
2 changes: 1 addition & 1 deletion cpp/perspective/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ try {
execSync(`mkdirp ${cwd}`, { stdio });
process.env.CLICOLOR_FORCE = 1;
execSync(
`emcmake cmake ${__dirname} ${cmake_flags} -DCMAKE_BUILD_TYPE=${env}`,
`emcmake cmake ${__dirname} ${cmake_flags} -DCMAKE_BUILD_TYPE=${env} -DRAPIDJSON_BUILD_EXAMPLES=OFF`,
{
cwd,
stdio,
Expand Down
13 changes: 13 additions & 0 deletions cpp/perspective/src/cpp/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,19 @@ type_to_dtype<void*>() {
return DTYPE_OBJECT;
}

std::ostream&
operator<<(std::ostream& os, const t_op& op) {
#define X(NAME) \
case NAME: \
os << #NAME; \
break;

switch (op) { FOREACH_T_OP(X) }
#undef X
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lmao


return os;
}

} // end namespace perspective

namespace std {
Expand Down
10 changes: 7 additions & 3 deletions cpp/perspective/src/cpp/binding_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void
encode_api_response(
const ProtoServerResp<std::string>& msg, EncodedApiResp* encoded
) {
auto* data = new char[msg.data.size()];
auto* data = static_cast<char*>(UNINSTRUMENTED_MALLOC(msg.data.size()));
std::copy(msg.data.begin(), msg.data.end(), data);

encoded->data = data;
Expand All @@ -57,8 +57,12 @@ encode_api_response(

EncodedApiEntries*
encode_api_responses(const std::vector<ProtoServerResp<std::string>>& msgs) {
auto* encoded = new EncodedApiEntries;
encoded->entries = new EncodedApiResp[msgs.size()];
auto* encoded = static_cast<EncodedApiEntries*>(
UNINSTRUMENTED_MALLOC(sizeof(EncodedApiEntries))
);
encoded->entries = static_cast<EncodedApiResp*>(
UNINSTRUMENTED_MALLOC(sizeof(EncodedApiResp) * msgs.size())
);

encoded->size = msgs.size();
auto* encoded_mem = encoded->entries;
Expand Down
16 changes: 14 additions & 2 deletions cpp/perspective/src/cpp/context_zero.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

#include "perspective/base.h"
#include "perspective/raw_types.h"
#include <perspective/first.h>
#include <perspective/context_base.h>
#include <perspective/get_data_extents.h>
Expand Down Expand Up @@ -101,6 +103,7 @@ t_ctx0::notify(
flattened.get_const_column("psp_pkey");
std::shared_ptr<const t_column> op_sptr =
flattened.get_const_column("psp_op");
auto old_pkey_col = flattened.get_column("psp_old_pkey");
const t_column* pkey_col = pkey_sptr.get();
const t_column* op_col = op_sptr.get();

Expand Down Expand Up @@ -173,11 +176,20 @@ t_ctx0::notify(
m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);
bool existed = *(existed_col->get_nth<bool>(idx));
const auto existed = *(existed_col->get_nth<bool>(idx));
const auto old_pkey = old_pkey_col->get_scalar(idx);

switch (op) {
case OP_INSERT: {
if (existed) {
if (old_pkey.is_valid()) {
m_traversal->move_row(
*m_gstate,
*(m_expression_tables->m_master),
m_config,
old_pkey,
pkey
);
} else if (existed) {
m_traversal->update_row(
*m_gstate,
*(m_expression_tables->m_master),
Expand Down
17 changes: 15 additions & 2 deletions cpp/perspective/src/cpp/data_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
#include <sstream>
#include <utility>
namespace perspective {
std::ostream&
operator<<(std::ostream& os, const t_flatten_record& fr) {
os << "store_idx: " << fr.m_store_idx << ", bidx: " << fr.m_begin_idx
<< ", eidx: " << fr.m_edge_idx;
return os;
}

void
t_data_table::set_capacity(t_uindex idx) {
Expand Down Expand Up @@ -319,15 +325,15 @@ t_data_table::get_schema() const {
}

std::shared_ptr<t_data_table>
t_data_table::flatten() const {
t_data_table::flatten(t_uindex limit) const {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(is_pkey_table(), "Not a pkeyed table");
std::shared_ptr<t_data_table> flattened = std::make_shared<t_data_table>(
"", "", m_schema, DEFAULT_EMPTY_CAPACITY, BACKING_STORE_MEMORY
);
flattened->init();
flatten_body<std::shared_ptr<t_data_table>>(flattened);
flatten_body<std::shared_ptr<t_data_table>>(flattened, limit);
return flattened;
}

Expand Down Expand Up @@ -635,6 +641,13 @@ t_data_table::join(const std::shared_ptr<t_data_table>& other_table) const {
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");

if (size() != other_table->size()) {
#if PSP_DEBUG
LOG_DEBUG("Joining current table:");
pprint();
LOG_DEBUG("on this this table:");
other_table->pprint();
#endif

std::stringstream ss;
ss << "[t_data_table::join] Cannot join two tables of unequal sizes! "
"Current size: "
Expand Down
22 changes: 22 additions & 0 deletions cpp/perspective/src/cpp/flat_traversal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,28 @@ t_ftrav::delete_row(t_tscalar pkey) {
++m_step_deletes;
}

void
t_ftrav::move_row(
const t_gstate& gstate,
const t_data_table& expression_master_table,
const t_config& config,
t_tscalar old_pkey,
t_tscalar new_pkey
) {
auto old_pkiter = m_pkeyidx.find(old_pkey);
bool old_pkey_existed = old_pkiter != m_pkeyidx.end();
if (!old_pkey_existed) {
LOG_DEBUG("Tried to move pkey that doesn't exist: " << old_pkey);
return;
}
LOG_DEBUG("Moving pkey from: " << old_pkey << " to: " << new_pkey);

(*m_index)[old_pkiter->second].m_deleted = true;
t_mselem mselem;
fill_sort_elem(gstate, expression_master_table, config, new_pkey, mselem);
m_new_elems[new_pkey] = mselem;
}

std::vector<t_sortspec>
t_ftrav::get_sort_by() const {
return m_sortby;
Expand Down
31 changes: 24 additions & 7 deletions cpp/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

#include "perspective/raw_types.h"
#include <perspective/first.h>
#include <perspective/context_unit.h>
#include <perspective/context_zero.h>
Expand Down Expand Up @@ -48,7 +49,9 @@ calc_negate(t_tscalar val) {
return val.negate();
}

t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) :
t_gnode::t_gnode(
t_schema input_schema, t_schema output_schema, t_uindex limit
) :
m_mode(NODE_PROCESSING_SIMPLE_DATAFLOW)
#ifdef PSP_PARALLEL_FOR
,
Expand All @@ -60,6 +63,7 @@ t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) :
m_output_schema(std::move(output_schema)),
m_init(false),
m_id(0),
m_limit(limit),
m_last_input_port_id(0),
m_pool_cleanup([]() {}) {
PSP_TRACE_SENTINEL();
Expand All @@ -85,6 +89,10 @@ t_gnode::t_gnode(t_schema input_schema, t_schema output_schema) :
existed_schema
};
m_epoch = std::chrono::high_resolution_clock::now();

m_input_schema.add_column(
"psp_old_pkey", m_input_schema.get_dtype("psp_pkey")
);
}

t_gnode::~t_gnode() {
Expand All @@ -97,7 +105,8 @@ void
t_gnode::init() {
PSP_TRACE_SENTINEL();

m_gstate = std::make_shared<t_gstate>(m_input_schema, m_output_schema);
m_gstate =
std::make_shared<t_gstate>(m_input_schema, m_output_schema, m_limit);
m_gstate->init();

// Create and store the main input port, which is always port 0. The next
Expand All @@ -123,7 +132,7 @@ t_gnode::init() {

for (const auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->get_table()->flatten();
input_port->get_table()->flatten(m_limit);
}

// Initialize expression-related state
Expand Down Expand Up @@ -186,11 +195,13 @@ t_gnode::calc_transition(

if (!row_pre_existed && !cur_valid && !t_env::backout_invalid_neq_ft()) {
trans = VALUE_TRANSITION_NEQ_FT;
} else if (row_pre_existed && !prev_valid && !cur_valid && !t_env::backout_eq_invalid_invalid()) {
} else if (row_pre_existed && !prev_valid && !cur_valid
&& !t_env::backout_eq_invalid_invalid()) {
trans = VALUE_TRANSITION_EQ_TT;
} else if (!prev_existed && !exists) {
trans = VALUE_TRANSITION_EQ_FF;
} else if (row_pre_existed && exists && !prev_valid && cur_valid && !t_env::backout_nveq_ft()) {
} else if (row_pre_existed && exists && !prev_valid && cur_valid
&& !t_env::backout_nveq_ft()) {
trans = VALUE_TRANSITION_NVEQ_FT;
} else if (prev_existed && exists && prev_cur_eq) {
trans = VALUE_TRANSITION_EQ_TT;
Expand Down Expand Up @@ -298,16 +309,22 @@ t_gnode::_process_table(t_uindex port_id) {
}

m_was_updated = true;
flattened = input_port->get_table()->flatten();
flattened = input_port->get_table()->flatten(m_limit);

PSP_GNODE_VERIFY_TABLE(flattened);
PSP_GNODE_VERIFY_TABLE(get_table());

t_uindex flattened_num_rows = flattened->num_rows();

std::vector<t_rlookup> row_lookup(flattened_num_rows);
t_column* pkey_col = flattened->get_column("psp_pkey").get();

#if PSP_DEBUG
LOG_DEBUG("m_mapping");
for (const auto [k, v] : m_gstate->get_pkey_map()) {
LOG_DEBUG("KEY: " << k << " , VALUE: " << v);
}
#endif

for (t_uindex idx = 0; idx < flattened_num_rows; ++idx) {
// See if each primary key in flattened already exist in the dataset
t_tscalar pkey = pkey_col->get_scalar(idx);
Expand Down
Loading
Loading