Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(subscription): support cusor order #18801

Merged
merged 12 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,19 @@ statement ok
create subscription sub2 from t3 with(retention = '1D');

statement ok
create sink s1 into t3 from t2;
create sink s1 into t3 from t2;

statement ok
create table t4 (v1 int, v2 int, v3 int, v4 int);

statement ok
create materialized view mv4 as select v4,v2 from t4;

statement ok
create subscription sub4 from mv4 with(retention = '1D');

statement ok
create table t5 (v1 int, v2 int, v3 int, v4 int, primary key (v1, v2));

statement ok
create subscription sub5 from t5 with(retention = '1D');
18 changes: 17 additions & 1 deletion e2e_test/subscription/drop_table_and_subscription.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,20 @@ statement ok
drop table t3;

statement ok
drop table t2;
drop table t2;


statement ok
drop subscription sub4;

statement ok
drop materialized view mv4;

statement ok
drop table t4;

statement ok
drop subscription sub5;

statement ok
drop table t5;
147 changes: 146 additions & 1 deletion e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,6 @@ def test_block_cursor():
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub2 full",conn)
execute_insert("insert into t2 values(1,1)",conn)
execute_insert("flush",conn)
Expand Down Expand Up @@ -402,6 +401,148 @@ def insert_into_table():
)
execute_insert("insert into t2 values(10,10)",conn)

def test_order_table_with_pk():
print(f"test_order_table_with_pk")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t2 values(6,6),(3,3),(5,5),(4,4),(7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub2 full",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([3,3],row[0],"Insert")
check_rows_data([4,4],row[1],"Insert")
check_rows_data([5,5],row[2],"Insert")
check_rows_data([6,6],row[3],"Insert")
check_rows_data([7,7],row[4],"Insert")
execute_insert("insert into t2 values(16,16),(13,13),(15,15),(14,14),(17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([13,13],row[0],"Insert")
check_rows_data([14,14],row[1],"Insert")
check_rows_data([15,15],row[2],"Insert")
check_rows_data([16,16],row[3],"Insert")
check_rows_data([17,17],row[4],"Insert")
execute_insert("update t2 set v2 = 100 where v1 > 10",conn)
execute_insert("flush",conn)
row = execute_query("fetch 10 from cur",conn)
assert len(row) == 10
check_rows_data([13,13],row[0],"UpdateDelete")
check_rows_data([13,100],row[1],"UpdateInsert")
check_rows_data([14,14],row[2],"UpdateDelete")
check_rows_data([14,100],row[3],"UpdateInsert")
check_rows_data([15,15],row[4],"UpdateDelete")
check_rows_data([15,100],row[5],"UpdateInsert")
check_rows_data([16,16],row[6],"UpdateDelete")
check_rows_data([16,100],row[7],"UpdateInsert")
check_rows_data([17,17],row[8],"UpdateDelete")
check_rows_data([17,100],row[9],"UpdateInsert")
drop_table_subscription()

def test_order_table_with_row_id():
print(f"test_order_table_with_pk")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t1 values(6,6),(3,3),(5,5),(4,4),(7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub full",conn)
row = execute_query("fetch 10 from cur",conn)
ex_row = execute_query("select v1, v2 from t1 order by _row_id",conn)
assert len(row) == 6
assert len(ex_row) == 6
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
execute_insert("insert into t1 values(16,16),(13,13),(15,15),(14,14),(17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
ex_row = execute_query("select v1, v2 from t1 where v1 > 10 order by _row_id",conn)
assert len(row) == 5
assert len(ex_row) == 5
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
drop_table_subscription()

def test_order_mv():
print(f"test_order_mv")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t4 values(6,6,6,6),(3,3,3,3),(5,5,5,5),(4,4,4,4),(7,7,7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub4 full",conn)
row = execute_query("fetch 5 from cur",conn)
ex_row = execute_query("select v4, v2 from t4 order by _row_id",conn)
assert len(row) == 5
assert len(ex_row) == 5
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
execute_insert("insert into t4 values(16,16,16,16),(13,13,13,13),(15,15,15,15),(14,14,14,14),(17,17,17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
ex_row = execute_query("select v4, v2 from t4 where v2 > 10 order by _row_id",conn)
assert len(row) == 5
assert len(ex_row) == 5
check_rows_data(ex_row[0],row[0],"Insert")
check_rows_data(ex_row[1],row[1],"Insert")
check_rows_data(ex_row[2],row[2],"Insert")
check_rows_data(ex_row[3],row[3],"Insert")
check_rows_data(ex_row[4],row[4],"Insert")
drop_table_subscription()

def test_order_multi_pk():
print(f"test_order_mutil_pk")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)
execute_insert("insert into t5 values(6,6,6,6),(6,3,3,3),(5,5,5,5),(5,4,4,4),(7,7,7,7)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub5 full",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([5,4,4,4],row[0],"Insert")
check_rows_data([5,5,5,5],row[1],"Insert")
check_rows_data([6,3,3,3],row[2],"Insert")
check_rows_data([6,6,6,6],row[3],"Insert")
check_rows_data([7,7,7,7],row[4],"Insert")
execute_insert("insert into t5 values(16,16,16,16),(16,13,13,13),(15,15,15,15),(15,14,14,14),(17,17,17,17)",conn)
execute_insert("flush",conn)
row = execute_query("fetch 5 from cur",conn)
assert len(row) == 5
check_rows_data([15,14,14,14],row[0],"Insert")
check_rows_data([15,15,15,15],row[1],"Insert")
check_rows_data([16,13,13,13],row[2],"Insert")
check_rows_data([16,16,16,16],row[3],"Insert")
check_rows_data([17,17,17,17],row[4],"Insert")
drop_table_subscription()

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
Expand All @@ -413,4 +554,8 @@ def insert_into_table():
test_cursor_with_table_alter()
test_cursor_fetch_n()
test_rebuild_table()
test_order_table_with_pk()
test_order_table_with_row_id()
test_order_mv()
test_order_multi_pk()
test_block_cursor()
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ message LogRowSeqScanNode {
common.Buffer vnode_bitmap = 3;
common.BatchQueryEpoch old_epoch = 4;
common.BatchQueryEpoch new_epoch = 5;
bool ordered = 6;
}

message InsertNode {
Expand Down
8 changes: 8 additions & 0 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct LogRowSeqScanExecutor<S: StateStore> {
old_epoch: u64,
new_epoch: u64,
version_id: HummockVersionId,
ordered: bool,
}

impl<S: StateStore> LogRowSeqScanExecutor<S> {
Expand All @@ -63,6 +64,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
chunk_size: usize,
identity: String,
metrics: Option<BatchMetrics>,
ordered: bool,
) -> Self {
let mut schema = table.schema().clone();
schema.fields.push(Field::with_name(
Expand All @@ -78,6 +80,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
old_epoch,
new_epoch,
version_id,
ordered,
}
}
}
Expand Down Expand Up @@ -146,6 +149,7 @@ impl BoxedExecutorBuilder for LogStoreRowSeqScanExecutorBuilder {
chunk_size as usize,
source.plan_node().get_identity().clone(),
metrics,
log_store_seq_scan_node.ordered,
)))
})
}
Expand Down Expand Up @@ -175,6 +179,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
new_epoch,
version_id,
schema,
ordered,
..
} = *self;
let table = std::sync::Arc::new(table);
Expand All @@ -194,6 +199,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
chunk_size,
histogram,
Arc::new(schema.clone()),
ordered,
);
#[for_await]
for chunk in stream {
Expand All @@ -211,12 +217,14 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
ordered: bool,
) {
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
old_epoch,
HummockReadEpoch::BatchQueryCommitted(new_epoch, version_id),
ordered,
)
.await?
.flat_map(|r| {
Expand Down
14 changes: 13 additions & 1 deletion src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::Fields;
use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, Statement};
use risingwave_sqlparser::ast::{ExplainOptions, ExplainType, FetchCursorStatement, Statement};
use thiserror_ext::AsReport;

use super::create_index::{gen_create_index_plan, resolve_index_schema};
Expand Down Expand Up @@ -94,6 +94,18 @@ async fn do_handle_explain(
(Ok(plan), context)
}

Statement::FetchCursor {
stmt: FetchCursorStatement { cursor_name, .. },
} => {
let cursor_manager = session.clone().get_cursor_manager();
let plan = cursor_manager
.gen_batch_plan_with_subscription_cursor(cursor_name, handler_args)
.await
.map(|x| x.plan)?;
let context = plan.ctx();
(Ok(plan), context)
}

// For other queries without `await` point, we can keep a copy of reference to the
// `OptimizerContext` even if the planning fails. This enables us to log the partial
// traces for better debugging experience.
Expand Down
20 changes: 18 additions & 2 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use risingwave_common::types::{
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::{
CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr,
TableFactor, TableWithJoins,
CompatibleSourceSchema, ConnectorSchema, Expr, Ident, ObjectName, OrderByExpr, Query, Select,
SelectItem, SetExpr, TableFactor, TableWithJoins,
};
use thiserror_ext::AsReport;

Expand Down Expand Up @@ -234,6 +234,22 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> Query {
}
}

pub fn gen_query_from_table_name_order_by(from_name: ObjectName, pk_names: Vec<String>) -> Query {
let mut query = gen_query_from_table_name(from_name);
query.order_by = pk_names
.into_iter()
.map(|pk| {
let expr = Expr::Identifier(Ident::with_quote_unchecked('"', pk));
OrderByExpr {
expr,
asc: None,
nulls_first: None,
}
})
.collect();
query
}

pub fn convert_unix_millis_to_logstore_u64(unix_millis: u64) -> u64 {
Epoch::from_unix_millis(unix_millis).0
}
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_log_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct BatchLogSeqScan {

impl BatchLogSeqScan {
fn new_inner(core: generic::LogScan, dist: Distribution) -> Self {
let order = Order::any();
let order = Order::new(core.table_desc.pk.clone());
let base = PlanBase::new_batch(core.ctx(), core.schema(), dist, order);

Self { base, core }
Expand Down Expand Up @@ -88,8 +88,11 @@ impl Distill for BatchLogSeqScan {
});
vec.push(("distribution", dist));
}
vec.push(("old_epoch", Pretty::from(self.core.old_epoch.to_string())));
vec.push(("new_epoch", Pretty::from(self.core.new_epoch.to_string())));
vec.push(("version_id", Pretty::from(self.core.version_id.to_string())));

childless_record("BatchScan", vec)
childless_record("BatchLogSeqScan", vec)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please include the epoch range and version id in the result as well.

}
}

Expand Down Expand Up @@ -126,6 +129,8 @@ impl TryToBatchPb for BatchLogSeqScan {
},
)),
}),
// It's currently true.
ordered: true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this consistent with BatchSeqScan:

ordered: !self.order().is_any()

}))
}
}
Expand Down
Loading
Loading