Skip to content

Commit

Permalink
[pixie-ioGH-573] Support {start|end}_time=None in px.DataFrame cons…
Browse files Browse the repository at this point in the history
…tructor.

Summary:
- Changes the `px.DataFrame` constructor to default to `None` for both start and end times.
  - To support this, a change to the `MemorySourceNode` API was necessary, in order to set start and end time separately.
- Changes `MemorySourceNode` to handle non-set start/stop times, differently depending on if `df.stream()` was called.
- Removes special time handling from MergeNodesRule for MemorySource ops, as it was unnecessary because we only merge mem sources if they have identical time specs.
Fixes pixie-io#573

Test Plan: Added many parameterized tests in memory_source_node_test. Existing tests pass. Skaffold deployed and streaming queries worked.

Reviewers: #carnot, philkuz

Reviewed By: #carnot, philkuz

Subscribers: philkuz

Signed-off-by: James Bartlett <jamesbartlett@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D12100

GitOrigin-RevId: c4ad4f8ece9af4cdaefc0bf4f4150a7ba1d092d7
  • Loading branch information
JamesMBartlett authored and copybaranaut committed Aug 23, 2022
1 parent b2bbead commit 47cf907
Show file tree
Hide file tree
Showing 15 changed files with 878 additions and 127 deletions.
28 changes: 17 additions & 11 deletions src/carnot/exec/memory_source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status MemorySourceNode::OpenImpl(ExecState* exec_state) {
table_ = exec_state->table_store()->GetTable(plan_node_->TableName(), plan_node_->Tablet());
DCHECK(table_ != nullptr);

infinite_stream_ = plan_node_->infinite_stream();
streaming_ = plan_node_->streaming();

if (table_ == nullptr) {
return error::NotFound("Table '$0' not found", plan_node_->TableName());
Expand All @@ -70,22 +70,28 @@ Status MemorySourceNode::OpenImpl(ExecState* exec_state) {
}

StopSpec stop_spec;
if (plan_node_->HasStopTime()) {
stop_spec.type = StopSpec::StopType::StopAtTimeOrEndOfTable;
stop_spec.stop_time = plan_node_->stop_time();
} else if (infinite_stream_) {
stop_spec.type = StopSpec::StopType::Infinite;
if (streaming_) {
if (plan_node_->HasStopTime()) {
stop_spec.type = StopSpec::StopType::StopAtTime;
stop_spec.stop_time = plan_node_->stop_time();
} else {
stop_spec.type = StopSpec::StopType::Infinite;
}
} else {
// Determine table_end at Open() time because Stirling may be pushing to the table
stop_spec.type = StopSpec::StopType::CurrentEndOfTable;
if (plan_node_->HasStopTime()) {
stop_spec.type = StopSpec::StopType::StopAtTimeOrEndOfTable;
stop_spec.stop_time = plan_node_->stop_time();
} else {
stop_spec.type = StopSpec::StopType::CurrentEndOfTable;
}
}
cursor_ = std::make_unique<Table::Cursor>(table_, start_spec, stop_spec);

return Status::OK();
}

Status MemorySourceNode::CloseImpl(ExecState*) {
stats()->AddExtraInfo("infinite_stream", infinite_stream_ ? "true" : "false");
stats()->AddExtraInfo("streaming", streaming_ ? "true" : "false");
return Status::OK();
}

Expand All @@ -110,7 +116,7 @@ StatusOr<std::unique_ptr<RowBatch>> MemorySourceNode::GetNextRowBatch(ExecState*
// If infinite stream is set, we don't send Eow or Eos. Infinite streams therefore never cause
// HasBatchesRemaining to be false. Instead the outer loop that calls GenerateNext() is
// responsible for managing whether we continue the stream or end it.
if (cursor_->Done() && !infinite_stream_) {
if (cursor_->Done()) {
row_batch->set_eow(true);
row_batch->set_eos(true);
}
Expand All @@ -128,7 +134,7 @@ bool MemorySourceNode::InfiniteStreamNextBatchReady() { return cursor_->NextBatc
bool MemorySourceNode::NextBatchReady() {
// Next batch is ready if we haven't seen an eow and if it's an infinite_stream that has batches
// to push.
return HasBatchesRemaining() && (!infinite_stream_ || InfiniteStreamNextBatchReady());
return HasBatchesRemaining() && (!streaming_ || InfiniteStreamNextBatchReady());
}

} // namespace exec
Expand Down
5 changes: 2 additions & 3 deletions src/carnot/exec/memory_source_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ class MemorySourceNode : public SourceNode {
private:
StatusOr<std::unique_ptr<RowBatch>> GetNextRowBatch(ExecState* exec_state);
bool InfiniteStreamNextBatchReady();
// Whether this memory source will stream infinitely. Can be stopped by the
// exec_state_->keep_running() call in exec_graph.
bool infinite_stream_ = false;
// Whether this memory source will stream future results.
bool streaming_ = false;

std::unique_ptr<Table::Cursor> cursor_;

Expand Down
Loading

0 comments on commit 47cf907

Please sign in to comment.