Skip to content

Commit

Permalink
extracting new integration test using KSQLContext
Browse files Browse the repository at this point in the history
 Please enter the commit message for your changes. Lines starting
  • Loading branch information
bluemonk3y committed Oct 5, 2017
1 parent 58f9a25 commit fe2296f
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 175 deletions.
1 change: 1 addition & 0 deletions config/ksqlserver.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#bootstrap.servers=localhost:1119092
bootstrap.servers=localhost:9092
application.id=ksql_server_quickstart
ksql.command.topic.suffix=commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +43,7 @@ public class StandaloneExecutor {

public StandaloneExecutor(Map streamProperties) {
KsqlConfig ksqlConfig = new KsqlConfig(streamProperties);
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig));
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig, AdminClient.create(ksqlConfig.getKsqlConfigProps())));
}

public void executeStatements(String queries) throws Exception {
Expand All @@ -64,5 +65,4 @@ public void executeStatements(String queries) throws Exception {
}
}
}

}
2 changes: 1 addition & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public static void setUp() throws Exception {
testListOrShowCommands();

orderDataProvider = new OrderDataProvider();
restServer.getKsqlEngine().getKafkaTopicClient().createTopic(orderDataProvider.topicName(), 1, (short)1);
restServer.getKsqlEngine().getTopicClient().createTopic(orderDataProvider.topicName(), 1, (short)1);
produceInputStream(orderDataProvider);
}

Expand Down
24 changes: 21 additions & 3 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package io.confluent.ksql;

import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -37,6 +40,8 @@ public class KsqlContext {
final KsqlEngine ksqlEngine;
private static final String APPLICATION_ID_OPTION_DEFAULT = "ksql_standalone_cli";
private static final String KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT = "localhost:9092";
private final AdminClient adminClient;
private final KafkaTopicClientImpl topicClient;

public KsqlContext() {
this(null);
Expand All @@ -59,7 +64,15 @@ public KsqlContext(Map<String, Object> streamsProperties) {
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT);
}
KsqlConfig ksqlConfig = new KsqlConfig(streamsProperties);
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig));

adminClient = AdminClient.create(ksqlConfig.getKsqlConfigProps());
topicClient = new KafkaTopicClientImpl(ksqlConfig, adminClient);
ksqlEngine = new KsqlEngine(ksqlConfig, topicClient);
}


public MetaStore getMetaStore() {
return ksqlEngine.getMetaStore();
}

/**
Expand All @@ -69,8 +82,7 @@ public KsqlContext(Map<String, Object> streamsProperties) {
* @throws Exception
*/
public void sql(String sql) throws Exception {
List<QueryMetadata> queryMetadataList = ksqlEngine.buildMultipleQueries(
false, sql, Collections.emptyMap());
List<QueryMetadata> queryMetadataList = ksqlEngine.buildMultipleQueries(true, sql, Collections.emptyMap());
for (QueryMetadata queryMetadata: queryMetadataList) {
if (queryMetadata instanceof PersistentQueryMetadata) {
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
Expand All @@ -86,6 +98,12 @@ public void sql(String sql) throws Exception {
}
}

public void close() throws IOException {
ksqlEngine.close();
topicClient.close();
adminClient.close();
}

/**
* Terminate a query with the given id.
*
Expand Down
20 changes: 10 additions & 10 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,20 @@ public class KsqlEngine implements Closeable {
private KsqlConfig ksqlConfig;

private final MetaStore metaStore;
private final KafkaTopicClient kafkaTopicClient;
private final KafkaTopicClient topicClient;
private final DDLCommandExec ddlCommandExec;
private final QueryEngine queryEngine;

private final Map<Long, PersistentQueryMetadata> persistentQueries;
private final Set<QueryMetadata> liveQueries;

public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient kafkaTopicClient) {
public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClient) {
Objects.requireNonNull(ksqlConfig, "Streams properties map cannot be null as it may be mutated later on");

this.ksqlConfig = ksqlConfig;

this.metaStore = new MetaStoreImpl();
this.kafkaTopicClient = kafkaTopicClient;
this.topicClient = topicClient;
this.ddlCommandExec = new DDLCommandExec(metaStore);
this.queryEngine = new QueryEngine(this);

Expand Down Expand Up @@ -263,21 +263,21 @@ private Pair<String, Statement> buildSingleQueryAst(final Statement statement,
} else if (statement instanceof CreateStream) {
ddlCommandExec.tryExecute(
new CreateStreamCommand(
(CreateStream) statement, overriddenProperties, kafkaTopicClient),
(CreateStream) statement, overriddenProperties, topicClient),
tempMetaStoreForParser);
ddlCommandExec.tryExecute(
new CreateStreamCommand(
(CreateStream) statement, overriddenProperties, kafkaTopicClient),
(CreateStream) statement, overriddenProperties, topicClient),
tempMetaStore);
return new Pair<>(statementString, statement);
} else if (statement instanceof CreateTable) {
ddlCommandExec.tryExecute(
new CreateTableCommand(
(CreateTable) statement, overriddenProperties, kafkaTopicClient),
(CreateTable) statement, overriddenProperties, topicClient),
tempMetaStoreForParser);
ddlCommandExec.tryExecute(
new CreateTableCommand(
(CreateTable) statement, overriddenProperties, kafkaTopicClient),
(CreateTable) statement, overriddenProperties, topicClient),
tempMetaStore);
return new Pair<>(statementString, statement);
} else if (statement instanceof DropStream) {
Expand Down Expand Up @@ -345,8 +345,8 @@ public MetaStore getMetaStore() {
return metaStore;
}

public KafkaTopicClient getKafkaTopicClient() {
return kafkaTopicClient;
public KafkaTopicClient getTopicClient() {
return topicClient;
}

public DDLCommandExec getDDLCommandExec() {
Expand Down Expand Up @@ -392,7 +392,7 @@ public void close() throws IOException {
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
queryMetadata.getKafkaStreams().cleanUp();
}
kafkaTopicClient.close();
topicClient.close();
}

public QueryEngine getQueryEngine() {
Expand Down
6 changes: 3 additions & 3 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

// Build a physical plan, in this case a Kafka Streams DSL
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getKafkaTopicClient());
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getTopicClient());
SchemaKStream schemaKStream = physicalPlanBuilder.buildPhysicalPlan(logicalPlan);

OutputNode outputNode = physicalPlanBuilder.getPlanSink();
Expand Down Expand Up @@ -375,10 +375,10 @@ private DDLCommand generateDDLCommand(
return new RegisterTopicCommand((RegisterTopic) statement, overriddenProperties);
} else if (statement instanceof CreateStream) {
return new CreateStreamCommand((CreateStream) statement, overriddenProperties,
ksqlEngine.getKafkaTopicClient());
ksqlEngine.getTopicClient());
} else if (statement instanceof CreateTable) {
return new CreateTableCommand((CreateTable) statement, overriddenProperties,
ksqlEngine.getKafkaTopicClient());
ksqlEngine.getTopicClient());
} else if (statement instanceof DropStream) {
return new DropSourceCommand((DropStream) statement);
} else if (statement instanceof DropTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@

public class KafkaTopicClientImpl implements KafkaTopicClient {
private static final Logger log = LoggerFactory.getLogger(KafkaTopicClient.class);
private final KsqlConfig ksqlConfig;
private final AdminClient adminClient;

public KafkaTopicClientImpl(final KsqlConfig ksqlConfig) {
this.ksqlConfig = ksqlConfig.clone();
public KafkaTopicClientImpl(final KsqlConfig ksqlConfig, AdminClient adminClient) {
this.adminClient = adminClient;
}

public void createTopic(String topic, int numPartitions, short replicatonFactor) {
Expand All @@ -57,8 +57,7 @@ public void createTopic(String topic, int numPartitions, short replicatonFactor)
}
NewTopic newTopic = new NewTopic(topic, numPartitions, replicatonFactor);
try {
AdminClient.create(ksqlConfig.getKsqlConfigProps())
.createTopics(Collections.singleton(newTopic)).all().get();
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " +
topic, e);
Expand All @@ -72,17 +71,15 @@ public boolean isTopicExists(String topic) {

public Set<String> listTopicNames() {
try {
return AdminClient.create(ksqlConfig.getKsqlConfigProps())
.listTopics().names().get();
return adminClient.listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to retrieve kafka topic names", e);
}
}

public Map<String, TopicDescription> describeTopics(Collection<String> topicNames) {
try {
return AdminClient.create(ksqlConfig.getKsqlConfigProps())
.describeTopics(topicNames).all().get();
return adminClient.describeTopics(topicNames).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to describe kafka topics", e);
}
Expand Down
2 changes: 1 addition & 1 deletion ksql-engine/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
log4j.rootLogger=WARN,stdout
log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
Expand Down
Loading

0 comments on commit fe2296f

Please sign in to comment.