Skip to content

Commit

Permalink
Make ksql configuration more consistent across components (#903)
Browse files Browse the repository at this point in the history
* Make ksql configuration more consistent across components

Use the following conventions for prefixing settings. Prefixes are recognized
by all components - the server, the query engine, and the cli. Any un-prefixed
settings are passed to all subsystems.
    - KSQL server: ksql.server
    - KSQL query engine: ksql
    - Kafka Streams (and Kafka): ksql.streams

Allow users to set query engine settings via the 'SET' command

Cleanup some settings from DDLConfig. These should just be Stream/Table settings
configurable via the with clause. Some settings in the class are no longer used,
and some dont belong there. This patch cleans those up.

* unused import

* Move a few things around

* Fix test

* add back in avros schema prop

* Apply feedback from proposal comments

* Applied some review feedback

* Fix broken test
  • Loading branch information
rodesai authored Mar 14, 2018
1 parent bef6a38 commit b0c63ea
Show file tree
Hide file tree
Showing 21 changed files with 245 additions and 119 deletions.
3 changes: 1 addition & 2 deletions config/ksql-server.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
bootstrap.servers=localhost:9092
ksql.command.topic.suffix=commands
listeners=http://localhost:8088
ui.enabled=true
ksql.server.ui.enabled=true
2 changes: 1 addition & 1 deletion ksql-cli/src/main/java/io/confluent/ksql/Ksql.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void main(String[] args) throws IOException {
System.exit(-1);
}

final Properties properties = loadProperties(options.getPropertiesFile());
final Properties properties = loadProperties(options.getConfigFile());
final KsqlRestClient restClient =
new KsqlRestClient(options.getServer(), properties);

Expand Down
12 changes: 5 additions & 7 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.CommonUtils;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Version;
import io.confluent.ksql.util.WelcomeMsgUtils;
Expand Down Expand Up @@ -355,7 +356,7 @@ private void runScript(
e
);
}
setProperty(DdlConfig.RUN_SCRIPT_STATEMENTS_CONTENT, fileContent);
setProperty(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
);
Expand Down Expand Up @@ -576,18 +577,15 @@ private void setProperty(String property, String value) {
} else if (property.equalsIgnoreCase(DdlConfig.AVRO_SCHEMA)) {
restClient.setProperty(property, value);
return;
} else if (property.equalsIgnoreCase(DdlConfig.SCHEMA_FILE_CONTENT_PROPERTY)) {
} else if (property.equalsIgnoreCase(KsqlConstants.RUN_SCRIPT_STATEMENTS_CONTENT)) {
restClient.setProperty(property, value);
return;
} else if (property.equalsIgnoreCase(DdlConfig.RUN_SCRIPT_STATEMENTS_CONTENT)) {
restClient.setProperty(property, value);
return;
} else if (property.equalsIgnoreCase(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)) {
} else if (property.startsWith(KsqlConfig.KSQL_CONFIG_PROPERTY_PREFIX)) {
restClient.setProperty(property, value);
return;
} else {
throw new IllegalArgumentException(String.format(
"Not recognizable as streams, consumer, or producer property: '%s'",
"Not recognizable as ksql, streams, consumer, or producer property: '%s'",
property
));
}
Expand Down
16 changes: 7 additions & 9 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@ public class Options {
description = "The address of the Ksql server to connect to (ex: http://confluent.io:9098)")
private String server;

private static final String PROPERTIES_FILE_OPTION_NAME = "--properties-file";
private static final String CONFIGURATION_FILE_OPTION_NAME = "--config-file";

@Option(
name = PROPERTIES_FILE_OPTION_NAME,
description = "A file specifying properties for Ksql and its underlying Kafka Streams "
+ "instance(s) (can specify port number, bootstrap server, etc. "
+ "but these options will "
+ "be overridden if also given via flags)")
private String propertiesFile;
name = CONFIGURATION_FILE_OPTION_NAME,
description = "A file specifying configs for Ksql and its underlying Kafka Streams "
+ "instance(s). Refer to KSQL documentation for a list of available configs.")
private String configFile;


@Option(
Expand Down Expand Up @@ -121,8 +119,8 @@ public String getServer() {
return server;
}

public Optional<String> getPropertiesFile() {
return Optional.ofNullable(propertiesFile);
public Optional<String> getConfigFile() {
return Optional.ofNullable(configFile);
}

public Long getStreamedQueryRowLimit() {
Expand Down
14 changes: 12 additions & 2 deletions ksql-cli/src/test/java/io/confluent/ksql/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,24 @@ public void testPropertySetUnset() {
test("set 'max.request.size' = '1048576'", EMPTY_RESULT);
test("set 'consumer.max.poll.records' = '500'", EMPTY_RESULT);
test("set 'enable.auto.commit' = 'true'", EMPTY_RESULT);
test("set 'AVROSCHEMA' = 'schema'", EMPTY_RESULT);
test("set 'ksql.streams.application.id' = 'Test_App'", EMPTY_RESULT);
test("set 'ksql.streams.producer.batch.size' = '16384'", EMPTY_RESULT);
test("set 'ksql.streams.max.request.size' = '1048576'", EMPTY_RESULT);
test("set 'ksql.streams.consumer.max.poll.records' = '500'", EMPTY_RESULT);
test("set 'ksql.streams.enable.auto.commit' = 'true'", EMPTY_RESULT);
test("set 'ksql.service.id' = 'test'", EMPTY_RESULT);

test("unset 'application.id'", EMPTY_RESULT);
test("unset 'producer.batch.size'", EMPTY_RESULT);
test("unset 'max.request.size'", EMPTY_RESULT);
test("unset 'consumer.max.poll.records'", EMPTY_RESULT);
test("unset 'enable.auto.commit'", EMPTY_RESULT);
test("unset 'AVROSCHEMA'", EMPTY_RESULT);
test("unset 'ksql.streams.application.id'", EMPTY_RESULT);
test("unset 'ksql.streams.producer.batch.size'", EMPTY_RESULT);
test("unset 'ksql.streams.max.request.size'", EMPTY_RESULT);
test("unset 'ksql.streams.consumer.max.poll.records'", EMPTY_RESULT);
test("unset 'ksql.streams.enable.auto.commit'", EMPTY_RESULT);
test("unset 'ksql.service.id'", EMPTY_RESULT);

testListOrShow("properties", build(validStartUpConfigs()), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,4 @@ public class DdlConfig {
public static final String IS_WINDOWED_PROPERTY = "WINDOWED";
public static final String TIMESTAMP_NAME_PROPERTY = "TIMESTAMP";
public static final String PARTITION_BY_PROPERTY = "PARTITION_BY";
public static final String SCHEMA_FILE_CONTENT_PROPERTY = "ksql.schema.file.content";
public static final String RUN_SCRIPT_STATEMENTS_CONTENT = "ksql.run.script.statements";
}
10 changes: 10 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/CommonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.confluent.ksql.util;

import java.net.ConnectException;
import java.util.Map;
import java.util.stream.Collectors;

public class CommonUtils {
public static String getErrorMessageWithCause(Throwable e) {
Expand Down Expand Up @@ -46,4 +48,12 @@ public static String getErrorCauseMessage(Throwable e) {
}
return msg;
}

public static Map<String, Object> getPropertiesWithoutPrefix(
String prefix, Map<String, Object> properties) {
return properties.keySet()
.stream()
.filter(s -> !s.startsWith(prefix))
.collect(Collectors.toMap(s -> s, s -> properties.get(s)));
}
}
50 changes: 36 additions & 14 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler;

Expand All @@ -47,7 +48,8 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {

public static final String STREAM_INTERNAL_REPARTITION_TOPIC_SUFFIX = "-repartition";

public static final String FAIL_ON_DESERIALIZATION_ERROR_CONFIG = "fail.on.deserialization.error";
public static final String
FAIL_ON_DESERIALIZATION_ERROR_CONFIG = "ksql.fail.on.deserialization.error";

public static final String
KSQL_SERVICE_ID_CONFIG = "ksql.service.id";
Expand All @@ -73,6 +75,7 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {
defaultSchemaRegistryUrl = "http://localhost:8081";

public static final boolean defaultAvroSchemaUnionNull = true;
public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

Map<String, Object> ksqlConfigProps;
Map<String, Object> ksqlStreamConfigProps;
Expand Down Expand Up @@ -144,13 +147,29 @@ public class KsqlConfig extends AbstractConfig implements Cloneable {
;
}

private static Map<String, Object> commonConfigs(Map<String, Object> props) {
return CommonUtils.getPropertiesWithoutPrefix(KSQL_CONFIG_PROPERTY_PREFIX, props);
}

private static Map<String, Object> propertiesWithPrefix(
Map<String, Object> props, String prefix) {
AbstractConfig abstractConfig = new AbstractConfig(new ConfigDef(), props);
return abstractConfig.originalsWithPrefix(prefix);
}


private void applyStreamsConfig(Map<String, Object> props) {
ksqlStreamConfigProps.putAll(commonConfigs(props));
ksqlStreamConfigProps.putAll(propertiesWithPrefix(props, KSQL_STREAMS_PREFIX));
}

public KsqlConfig(Map<?, ?> props) {
super(CONFIG_DEF, props);

ksqlConfigProps = new HashMap<>();
ksqlStreamConfigProps = new HashMap<>();
ksqlConfigProps.putAll(super.values());

ksqlConfigProps.putAll(this.values());

ksqlStreamConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KsqlConstants
.defaultAutoOffsetRestConfig);
Expand All @@ -162,20 +181,15 @@ public KsqlConfig(Map<?, ?> props) {
ksqlStreamConfigProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, KsqlConstants
.defaultNumberOfStreamsThreads);

for (Map.Entry<?, ?> entry : originals().entrySet()) {
final String key = entry.getKey().toString();
if (!key.toLowerCase().startsWith(KSQL_CONFIG_PROPERTY_PREFIX)) {
ksqlStreamConfigProps.put(key, entry.getValue());
}
}

final Object fail = props.get(FAIL_ON_DESERIALIZATION_ERROR_CONFIG);
final Object fail = originals().get(FAIL_ON_DESERIALIZATION_ERROR_CONFIG);
if (fail == null || !Boolean.parseBoolean(fail.toString())) {
ksqlStreamConfigProps.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogMetricAndContinueExceptionHandler.class
);
}

applyStreamsConfig(originals());
}

public Map<String, Object> getKsqlConfigProps() {
Expand Down Expand Up @@ -208,23 +222,31 @@ public Object get(String propertyName) {
public void put(String propertyName, Object propertyValue) {
if (propertyName.toLowerCase().startsWith(KSQL_CONFIG_PROPERTY_PREFIX)) {
ksqlConfigProps.put(propertyName, propertyValue);
} else if (propertyName.startsWith(KSQL_STREAMS_PREFIX)) {
ksqlStreamConfigProps.put(
propertyName.substring(KSQL_STREAMS_PREFIX.length()), propertyValue);
} else {
ksqlStreamConfigProps.put(propertyName, propertyValue);
}
}

public KsqlConfig clone() {
Map<String, Object> clonedProperties = new HashMap<>();
clonedProperties.putAll(ksqlConfigProps);
clonedProperties.putAll(ksqlStreamConfigProps);
clonedProperties.putAll(originals());
return new KsqlConfig(clonedProperties);
}

public KsqlConfig cloneWithPropertyOverwrite(Map<String, Object> props) {
Map<String, Object> clonedProperties = new HashMap<>();
clonedProperties.putAll(ksqlConfigProps);
clonedProperties.putAll(ksqlStreamConfigProps);
clonedProperties.putAll(
ksqlStreamConfigProps.entrySet().stream()
.collect(Collectors.toMap(e -> KSQL_STREAMS_PREFIX + e.getKey(), e -> e.getValue())));
clonedProperties.putAll(props);
return new KsqlConfig(clonedProperties);
KsqlConfig clone = new KsqlConfig(clonedProperties);
// re-apply streams configs so that any un-prefixed overwrite settings
// take precedence over older prefixed settings
clone.applyStreamsConfig(props);
return clone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ public class KsqlConstants {
public static final long defaultCacheMaxBytesBufferingConfig = 10000000;
public static final int defaultNumberOfStreamsThreads = 4;

public static final String RUN_SCRIPT_STATEMENTS_CONTENT = "ksql.run.script.statements";
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.confluent.ksql.util;

import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.hamcrest.core.IsEqual;
import org.junit.Test;
Expand Down Expand Up @@ -66,5 +67,51 @@ public void shouldNotSetDeserializationExceptionHandlerWhenFailOnDeserialization
assertThat(result, nullValue());
}

@Test
public void shouldSetStreamsConfigProperties() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "100"));
Object result = ksqlConfig.getKsqlStreamConfigProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
assertThat(result, equalTo("100"));
}

@Test
public void shouldSetPrefixedStreamsConfigProperties() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(
KsqlConfig.KSQL_STREAMS_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "100"));
Object result = ksqlConfig.getKsqlStreamConfigProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
assertThat(result, equalTo("100"));
}

@Test
public void shouldCloneWithKsqlPropertyOverwrite() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(
KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test"));
final KsqlConfig ksqlConfigClone = ksqlConfig.cloneWithPropertyOverwrite(
Collections.singletonMap(
KsqlConfig.KSQL_SERVICE_ID_CONFIG, "test-2"));
Object result = ksqlConfigClone.getKsqlConfigProps().get(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
assertThat(result, equalTo("test-2"));
}

@Test
public void shouldCloneWithStreamPropertyOverwrite() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "100"));
final KsqlConfig ksqlConfigClone = ksqlConfig.cloneWithPropertyOverwrite(
Collections.singletonMap(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "200"));
Object result = ksqlConfigClone.getKsqlStreamConfigProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
assertThat(result, equalTo("200"));
}

@Test
public void shouldCloneWithPrefixedStreamPropertyOverwrite() {
final KsqlConfig ksqlConfig = new KsqlConfig(Collections.singletonMap(
KsqlConfig.KSQL_STREAMS_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "100"));
final KsqlConfig ksqlConfigClone = ksqlConfig.cloneWithPropertyOverwrite(
Collections.singletonMap(
KsqlConfig.KSQL_STREAMS_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "200"));
Object result = ksqlConfigClone.getKsqlStreamConfigProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
assertThat(result, equalTo("200"));
}
}
4 changes: 2 additions & 2 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ public void removeTemporaryQuery(QueryMetadata queryMetadata) {

public DDLCommandResult executeDdlStatement(
String sqlExpression, final DDLStatement statement,
final Map<String, Object> streamsProperties
final Map<String, Object> overriddenProperties
) {
return queryEngine.handleDdlStatement(sqlExpression, statement, streamsProperties);
return queryEngine.handleDdlStatement(sqlExpression, statement, overriddenProperties);
}

public SchemaRegistryClient getSchemaRegistryClient() {
Expand Down
14 changes: 6 additions & 8 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.util.AvroUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.QueryMetadata;
Expand Down Expand Up @@ -139,7 +138,7 @@ private PlanNode buildQueryLogicalPlan(
List<QueryMetadata> buildPhysicalPlans(
final List<Pair<String, PlanNode>> logicalPlans,
final List<Pair<String, Statement>> statementList,
final Map<String, Object> overriddenStreamsProperties,
final Map<String, Object> overriddenProperties,
final boolean updateMetastore
) throws Exception {

Expand All @@ -157,12 +156,12 @@ List<QueryMetadata> buildPhysicalPlans(
handleDdlStatement(
statementPlanPair.getLeft(),
(DDLStatement) statement,
overriddenStreamsProperties
overriddenProperties
);
} else {
buildQueryPhysicalPlan(
physicalPlans, statementPlanPair,
overriddenStreamsProperties, updateMetastore
overriddenProperties, updateMetastore
);
}

Expand All @@ -173,21 +172,20 @@ List<QueryMetadata> buildPhysicalPlans(
private void buildQueryPhysicalPlan(
final List<QueryMetadata> physicalPlans,
final Pair<String, PlanNode> statementPlanPair,
final Map<String, Object> overriddenStreamsProperties,
final Map<String, Object> overriddenProperties,
final boolean updateMetastore
) throws Exception {

final StreamsBuilder builder = new StreamsBuilder();
final KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

// Build a physical plan, in this case a Kafka Streams DSL
final PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(
builder,
ksqlConfigClone.cloneWithPropertyOverwrite(overriddenStreamsProperties),
ksqlEngine.getKsqlConfig().cloneWithPropertyOverwrite(overriddenProperties),
ksqlEngine.getTopicClient(),
new MetastoreUtil(),
ksqlEngine.getFunctionRegistry(),
overriddenStreamsProperties,
overriddenProperties,
updateMetastore,
ksqlEngine.getMetaStore(),
ksqlEngine.getSchemaRegistryClient()
Expand Down
Loading

0 comments on commit b0c63ea

Please sign in to comment.