Skip to content

Commit

Permalink
refactor: remove private link related connection (#18975)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion authored Oct 30, 2024
1 parent a36c317 commit 07c7bc3
Show file tree
Hide file tree
Showing 22 changed files with 44 additions and 892 deletions.
17 changes: 0 additions & 17 deletions e2e_test/ddl/alter_set_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,6 @@ WHERE nspname = 'test_schema';
----
test_subscription test_schema

statement ok
CREATE CONNECTION test_conn WITH (type = 'privatelink', provider = 'mock');

statement ok
ALTER CONNECTION test_conn SET SCHEMA test_schema;

query TT
SELECT name AS connname, nspname AS schemaname
FROM rw_connections
JOIN pg_namespace ON pg_namespace.oid = rw_connections.schema_id
WHERE nspname = 'test_schema';
----
test_conn test_schema

statement ok
DROP CONNECTION test_schema.test_conn;

statement ok
DROP SINK test_schema.test_sink;

Expand Down
23 changes: 0 additions & 23 deletions e2e_test/ddl/connection.slt

This file was deleted.

42 changes: 0 additions & 42 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -31,48 +31,6 @@ create sink sink_non_exist_broker from t_kafka with (
type = 'append-only',
);

# Test create sink with connection
# Create a mock connection
statement ok
create connection mock with (
type = 'privatelink',
provider = 'mock',
);

# Refer to a non-existant connection
statement error
create sink si_kafka_append_only_conn from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true',
connection.name = 'nonexist',
);

# Create sink with connection
statement ok
create sink si_kafka_append_only_conn from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-append-only',
type = 'append-only',
force_append_only = 'true',
connection.name = 'mock',
);

# Try to drop connection mock, which is in use
statement error
drop connection mock;

# Drop sink
statement ok
drop sink si_kafka_append_only_conn;

# Drop connection
statement ok
drop connection mock;

# Connection test clean-up finished

statement error sink cannot be append-only
Expand Down
52 changes: 0 additions & 52 deletions e2e_test/source_legacy/basic/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -198,35 +198,6 @@ s
statement ok
drop table s

# Test create source with connection
statement ok
CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock');

# Reference to non-existant connection
statement error
create source s (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE mytable (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock'
) FORMAT PLAIN ENCODE JSON;

statement ok
DROP TABLE mytable;


# `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns.
statement error
create source s (
Expand All @@ -236,7 +207,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

# `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key.
Expand All @@ -248,7 +218,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

# `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type.
Expand All @@ -260,25 +229,4 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

statement ok
create source s (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock',
) FORMAT PLAIN ENCODE JSON;

# Drop a connection in use
statement error
drop connection mock;

statement ok
drop source s;

statement ok
drop connection mock;
40 changes: 0 additions & 40 deletions e2e_test/source_legacy/basic/old_row_format_syntax/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -152,35 +152,6 @@ s
statement ok
drop table s

# Test create source with connection
statement ok
CREATE CONNECTION mock WITH (type = 'privatelink', provider = 'mock');

# Reference to non-existant connection
statement error
create source s (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT JSON;

statement ok
CREATE TABLE mytable (
column1 varchar
) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock'
) ROW FORMAT JSON;

statement ok
DROP TABLE mytable;


# `DEBEZIUM_MONGO_JSON` requires the source table have `_id` and `payload` columns.
statement error
create source s (
Expand All @@ -190,7 +161,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT DEBEZIUM_MONGO_JSON;

# `DEBEZIUM_MONGO_JSON` requires the `_id` column is primary key.
Expand All @@ -202,7 +172,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT DEBEZIUM_MONGO_JSON;

# `DEBEZIUM_MONGO_JSON` requires the `payload` column is jsonb type.
Expand All @@ -214,7 +183,6 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'nonexist',
) ROW FORMAT DEBEZIUM_MONGO_JSON;

statement ok
Expand All @@ -224,15 +192,7 @@ create source s (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
connection.name = 'mock',
) ROW FORMAT JSON;

# Drop a connection in use
statement error
drop connection mock;

statement ok
drop source s;

statement ok
drop connection mock;
2 changes: 1 addition & 1 deletion proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ message Connection {
uint32 database_id = 3;
string name = 4;
oneof info {
PrivateLinkService private_link_service = 5;
PrivateLinkService private_link_service = 5 [deprecated = true];
}
uint32 owner = 6;
}
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,12 +693,10 @@ impl SinkCommitCoordinator for DummySinkCommitCoordinator {

impl SinkImpl {
pub fn new(mut param: SinkParam) -> Result<Self> {
const CONNECTION_NAME_KEY: &str = "connection.name";
const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";

// remove privatelink related properties if any
param.properties.remove(PRIVATE_LINK_TARGET_KEY);
param.properties.remove(CONNECTION_NAME_KEY);

let sink_type = param
.properties
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::error::ConnectorResult;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
pub const CONNECTION_NAME_KEY: &str = "connection.name";

#[derive(Debug)]
pub(super) enum PrivateLinkContextRole {
Expand Down
29 changes: 0 additions & 29 deletions src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::anyhow;
use risingwave_connector::source::kafka::private_link::insert_privatelink_broker_rewrite_map;
use risingwave_connector::WithPropertiesExt;
use risingwave_pb::catalog::connection::private_link_service::PrivateLinkProvider;
use risingwave_pb::catalog::connection::Info;
use risingwave_pb::catalog::{connection, PbConnection};

use crate::catalog::{ConnectionId, OwnedByUserCatalog};
use crate::error::{Result, RwError};
use crate::user::UserId;

#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -64,24 +56,3 @@ impl OwnedByUserCatalog for ConnectionCatalog {
self.owner
}
}

pub(crate) fn resolve_private_link_connection(
connection: &Arc<ConnectionCatalog>,
properties: &mut BTreeMap<String, String>,
) -> Result<()> {
#[allow(irrefutable_let_patterns)]
if let connection::Info::PrivateLinkService(svc) = &connection.info {
if !properties.is_kafka_connector() {
return Err(RwError::from(anyhow!(
"Private link is only supported for Kafka connector"
)));
}
// skip all checks for mock connection
if svc.get_provider()? == PrivateLinkProvider::Mock {
return Ok(());
}
insert_privatelink_broker_rewrite_map(properties, Some(svc), None)
.map_err(RwError::from)?;
}
Ok(())
}
Loading

0 comments on commit 07c7bc3

Please sign in to comment.