Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master #1

Merged
merged 161 commits into from
Oct 18, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
161 commits
Select commit Hold shift + click to select a range
bdffcf4
KSQL screencast image
Aug 30, 2017
5ea83ba
remove here hyperlink
Aug 30, 2017
e81f61f
Merge pull request #268 from confluentinc/joel-hamill/demo-image
joel-hamill Aug 30, 2017
c0487e1
Added LIKE examples to syntax guide and examples.
hjafarpour Aug 31, 2017
5589b2e
Fixed typo.
hjafarpour Aug 31, 2017
639e164
Minor doc changes.
hjafarpour Aug 31, 2017
621ccc3
Merge pull request #271 from hjafarpour/KSQL-324-Example-Queries
hjafarpour Aug 31, 2017
70b980f
Improvements to syntax guide
Sep 1, 2017
cc86321
Merge pull request #277 from confluentinc/0.1.x-syntax-guide
miguno Sep 4, 2017
a75dff3
Restructure TOC
Sep 4, 2017
6b896d7
Fixed misspelled words
Sep 5, 2017
056fa14
Fixed a minor error in the syntax guide.
hjafarpour Sep 5, 2017
e1522cb
Merge pull request #281 from hjafarpour/KSQL-324-Example-Queries
hjafarpour Sep 5, 2017
1e5496b
Merge pull request #280 from juneng603/feature-fix-misspelled-words
hjafarpour Sep 5, 2017
15180dd
Clarify PARTITION BY in CSAS
Sep 6, 2017
39c04f8
remove build-tools copy, use pluginRepository in pom (#286)
aayars Sep 6, 2017
032d1b5
Add section Use Cases and Examples
Sep 6, 2017
885cf10
Fix typo
Sep 7, 2017
4637617
Add missing word to roadmap
elliotcm Sep 10, 2017
7f0141b
Merge pull request #288 from elliotcm/patch-1
miguno Sep 11, 2017
c52137d
Clarify FAQ
Sep 12, 2017
3af28e7
Remove trailing whitespace
Sep 12, 2017
b2df850
Add Configure KSQL to Syntax Guide, fix/clariy config examples
Sep 12, 2017
1bf980c
Merge pull request #289 from confluentinc/0.1.x-configuration
miguno Sep 12, 2017
7a9acb9
Emit succinct error for missing query file
Sep 12, 2017
ce7c880
Add upstream project so build are triggered automatically
ewencp Sep 14, 2017
869f598
Clarify client-server mode
Sep 15, 2017
76b0bcc
Clarifications
Sep 15, 2017
01d488c
Add `$` to CLI commands
Sep 15, 2017
ed4f0e6
Clarifications
Sep 15, 2017
b5030ba
Clarifications
Sep 15, 2017
4211fe7
Clarifications
Sep 15, 2017
0f6150c
Fix typo
Sep 15, 2017
7db5983
Remove bullet formatting
Sep 15, 2017
1cb5307
fix checkstyle errors
dguy Sep 15, 2017
e614bd7
Merge pull request #298 from confluentinc/fix-checkstyle
logscape Sep 15, 2017
50ffcac
Document how to configure the Kafka bootstrap servers
Sep 15, 2017
bf8a719
Merge pull request #296 from ewencp/add-upstream-project
ewencp Sep 15, 2017
34c5f9f
Merge remote-tracking branch 'origin/0.1.x'
ewencp Sep 15, 2017
ac19358
Merge pull request #297 from confluentinc/0.1.x-bootstrap-servers
miguno Sep 18, 2017
bbe07e5
Update non-docker-clickstream.md
Sep 18, 2017
a8d1e5d
Merge pull request #301 from himani1/patch-2
logscape Sep 18, 2017
6e2cc63
Merge pull request #291 from kirktrue/omit-stacktrace-for-missing-file
logscape Sep 20, 2017
961eafa
Clarify KSQL works with AK and CP
Sep 20, 2017
9d678d9
Merge pull request #302 from confluentinc/0.1.x-faq-works-with-apache…
logscape Sep 20, 2017
c6d7773
small-code-quality-improvements Added code quality improvements.
TheRealHaui Sep 24, 2017
22b898e
small-code-quality-improvements Added code quality improvements.
TheRealHaui Sep 24, 2017
dfea526
added persistent information for streams and table
Sep 26, 2017
88fda38
Merge pull request #319 from bluemonk3y/0.1.x-DOC-added-persistent-in…
logscape Sep 26, 2017
bf90854
Merge pull request #313 from TheRealHaui/small-code-quality-improvements
logscape Sep 27, 2017
34273f7
fixing commandstore hanging
Sep 27, 2017
a2f8680
reduced poll on priorCommands processing
Sep 27, 2017
c2f12f2
removed header p html
Sep 27, 2017
268ba18
added comment on poll instead of subscribe
Sep 27, 2017
e71f7ca
Merge pull request #321 from bluemonk3y/0.1.x-BUG-fix-command-queue-r…
logscape Sep 27, 2017
36f7624
Clarify how streams and tables are stored in topics
Sep 28, 2017
b8cf069
Merge pull request #323 from confluentinc/0.1.x-concepts-clarification
logscape Sep 28, 2017
a94639e
Including the streams config properties in the session with CTAS and …
hjafarpour Sep 29, 2017
fb5800e
refactoring parser, core and common
Sep 29, 2017
ade44ce
Merge pull request #324 from hjafarpour/KSQL-375-include-query-config…
hjafarpour Sep 29, 2017
ddfec38
extracted serde module
Sep 29, 2017
35e6a40
Merge pull request #325 from bluemonk3y/0.1.x-CODE-restructure
hjafarpour Sep 29, 2017
a33045c
Refactored the package for GenericRow.
hjafarpour Sep 29, 2017
f2a1a9f
Merge pull request #327 from hjafarpour/refactor-common-GenericRow
hjafarpour Sep 29, 2017
bbf636d
Fixed the drop table issue.
hjafarpour Sep 30, 2017
dc1ad22
Merge pull request #328 from hjafarpour/KSQL-376-droptable-issue
hjafarpour Sep 30, 2017
09a7755
extracted pipeline
Oct 2, 2017
115b4b9
Merge pull request #330 from bluemonk3y/0.1.x-CODE-restructure-Queued…
logscape Oct 2, 2017
295fc8b
rename ksql-core to ksql-engine
Oct 2, 2017
850bbbd
Add to Examples: WHERE clause and content-based routing (#333)
ybyzek Oct 3, 2017
ddc9396
Merge pull request #332 from bluemonk3y/0.1.x-CODE-331-rename-ksql-co…
logscape Oct 3, 2017
1584815
First step in better testing tools. Added an integration test tool to…
hjafarpour Oct 3, 2017
58f9a25
Merge pull request #335 from hjafarpour/KSQL-379-Better-Test-Tools
hjafarpour Oct 3, 2017
973c7dc
No need to set the streams application.id for the rest server.
hjafarpour Oct 4, 2017
17fb73f
Merge pull request #336 from hjafarpour/NoNeedForStreamAppIdForRESTSe…
hjafarpour Oct 4, 2017
04094a9
First pass on refactoring KSQL confing.
hjafarpour Oct 4, 2017
fe2296f
extracting new integration test using KSQLContext
Oct 5, 2017
839cb84
Refined admin client config.
hjafarpour Oct 5, 2017
794da0f
remove dead code. make field and method access more restricted
dguy Oct 5, 2017
117d760
Merge branch '0.1.x'
hjafarpour Oct 5, 2017
46f349e
Merge branch 'master' of https://github.com/confluentinc/ksql
hjafarpour Oct 5, 2017
c63aee2
bump common dependency to 3.3.1-SNAPSHOT
Oct 5, 2017
243c00f
Merge remote-tracking branch 'origin/0.1.x'
Oct 5, 2017
392c4d1
using 0.11.0.1 of client to use fixed AdminClient
Oct 5, 2017
443fc5e
kafka version setting to 11.0.1
Oct 5, 2017
c12ca7a
Merge pull request #337 from confluentinc/remove-some-dead-code
hjafarpour Oct 5, 2017
f527704
Merge remote-tracking branch 'upstream/0.1.x' into 0.1.x
hjafarpour Oct 5, 2017
e166ede
Merge branch '0.1.x-CODE-379-started-int-test-refactor' of https://gi…
hjafarpour Oct 6, 2017
520d779
remove unused imports
dguy Oct 6, 2017
5ba12a0
Merge pull request #342 from dguy/unused-imports
dguy Oct 6, 2017
9a17925
remove dead code.
dguy Oct 6, 2017
63228fd
Minor change to use the same Application Id for embedded mode.
hjafarpour Oct 6, 2017
c0836ae
Merge pull request #344 from hjafarpour/bluemonk3y-0.1.x-CODE-379-sta…
hjafarpour Oct 6, 2017
813c850
merge 0.1.x
dguy Oct 6, 2017
2196ea6
Merge pull request #343 from dguy/ksql-engine-tidy
hjafarpour Oct 6, 2017
7c0a11d
Merge remote-tracking branch 'origin/0.1.x'
Oct 7, 2017
e6aaf4d
Merge remote-tracking branch 'upstream/0.1.x' into KSQLConfigRefactor
hjafarpour Oct 9, 2017
71f2d78
Some changes based on the new KSQLConfig.
hjafarpour Oct 9, 2017
8ab2828
Merge pull request #338 from hjafarpour/KSQLConfigRefactor
hjafarpour Oct 9, 2017
6f27e7e
replace window-unit with TimeUnit (#346)
dguy Oct 10, 2017
ff36f7f
skip bad data in json and delimited deserializer (#347)
dguy Oct 10, 2017
f97e11b
Clean up the poms after the refactor that created new packages.
hjafarpour Oct 10, 2017
74beb47
More pom clean up.
hjafarpour Oct 10, 2017
b38cc4b
Merge pull request #350 from hjafarpour/pom-cleanup-after-refactor
hjafarpour Oct 10, 2017
901b3b6
Reverting adminclient singleton temporarily.
hjafarpour Oct 10, 2017
8a6b6a7
Merge pull request #351 from hjafarpour/Revert-AdminClient-Singleton
hjafarpour Oct 10, 2017
5d0978a
Merge branch '0.1.x'
hjafarpour Oct 10, 2017
b9a99fe
Merge pull request #352 from confluentinc/master
hjafarpour Oct 10, 2017
0af3fb8
Bump to common 4.0.0-SNAPSHOT
Oct 11, 2017
0aca9a7
Bump to common 4.1.0-SNAPSHOT
Oct 11, 2017
2e7eb68
update slack channel for build notifications
Oct 11, 2017
c086ca5
Merge remote-tracking branch 'origin/0.1.x' into 4.0.x
Oct 11, 2017
2c82044
Merge remote-tracking branch 'origin/4.0.x'
Oct 11, 2017
aba0f7c
fix artifact version in compose file
Oct 11, 2017
32e41e8
Merge remote-tracking branch 'origin/0.1.x' into 4.0.x
Oct 11, 2017
ed1ee1e
Merge remote-tracking branch 'origin/4.0.x'
Oct 11, 2017
ce9e2da
After bumping the dependency to 4.0.0 we encountered some issues that…
hjafarpour Oct 11, 2017
22e481d
Merge pull request #363 from hjafarpour/KSQL-388-fix-400-build-issues
hjafarpour Oct 11, 2017
27a0e1f
Merge remote-tracking branch 'upstream/master'
hjafarpour Oct 11, 2017
06b59a9
Better clean up for queries. Internal topics will be deletted after q…
hjafarpour Oct 12, 2017
181fa50
Added singleton AdminClient back.
hjafarpour Oct 12, 2017
cc57a3e
extract methods, remove duplication, etc from PhysicalPlanBuilder (#356)
dguy Oct 12, 2017
c480c42
Use streams 1.0 api methods (#365)
dguy Oct 12, 2017
f9e4416
new PR with migrated tests
Oct 12, 2017
dbffadd
tidying stuff up
Oct 12, 2017
cc2177c
trigger build
Oct 12, 2017
2c8ac7b
remove more crud
Oct 12, 2017
32c20ae
Merge pull request #367 from bluemonk3y/4.0.x-TEST-migrating-integrat…
logscape Oct 12, 2017
8b299f3
Merge remote-tracking branch 'upstream/4.0.x' into KSQL-383-clean-up-…
hjafarpour Oct 12, 2017
bcec9eb
Fixed the pom version.
hjafarpour Oct 12, 2017
93634bf
Pom version fix.
hjafarpour Oct 12, 2017
a4c863c
Refactored Function registry.
hjafarpour Oct 13, 2017
8b5b883
adding windowing integration tests, updated streams int test util to …
Oct 13, 2017
5a165c6
trigger
Oct 13, 2017
9c1cc3c
moving packages
Oct 13, 2017
ac3b527
Applied review feedback.
hjafarpour Oct 13, 2017
d18a471
Remove SourceNode as it is redundant (#368)
dguy Oct 16, 2017
858a71b
Merge pull request #371 from bluemonk3y/4.0.x-TEST-windowing-integrat…
logscape Oct 16, 2017
196d4db
adding stream-table join
Oct 16, 2017
a148075
exposing time controls when publishing test data
Oct 16, 2017
73ec9f2
exposing time controls for int tests
Oct 16, 2017
91dd2d5
removed old comments
Oct 16, 2017
e03d4d1
Merge remote-tracking branch 'upstream/4.0.x' into KSQL-386-KsqlFunct…
hjafarpour Oct 16, 2017
87671d0
trigger build
Oct 17, 2017
7055aca
remove jackson dependency in ksql-rest-app. (#369)
dguy Oct 17, 2017
36829c3
Merge pull request #375 from bluemonk3y/4.0.x-TEST-stream-table-join-…
logscape Oct 17, 2017
eb12963
move the createsink method out of SchemaKStream (#376)
dguy Oct 17, 2017
1aecb91
make join test more reliable (#381)
dguy Oct 17, 2017
587b233
rework threading on QueryStreamWriter
Oct 17, 2017
ccd9e13
import
Oct 17, 2017
1fa24e3
import
Oct 17, 2017
1676efc
import
Oct 17, 2017
66c7f13
Use streams deserializtion exception handler (#385)
dguy Oct 17, 2017
28a832d
extract SelectValueMapper from SchemaKXxxx (#379)
dguy Oct 17, 2017
0af6c5b
minor tidyups
Oct 17, 2017
c84ef9d
Merge branch '4.0.x' of https://github.com/confluentinc/ksql into 4.0…
Oct 17, 2017
52ef4ef
Put window aggregate logic into the various KsqlWindowExpression clas…
dguy Oct 17, 2017
863fa31
Merge pull request #383 from bluemonk3y/4.0.x-CODE-QueryStreamWriterT…
logscape Oct 17, 2017
984e51a
Minor modification in KSQLContext.
hjafarpour Oct 17, 2017
c9b3f75
Merge remote-tracking branch 'upstream/4.0.x' into KSQL-386-KsqlFunct…
hjafarpour Oct 17, 2017
de6772e
Merge pull request #370 from hjafarpour/KSQL-386-KsqlFunctions-non-st…
hjafarpour Oct 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use streams 1.0 api methods (confluentinc#365)
  • Loading branch information
dguy authored Oct 12, 2017
commit c480c42946a1cf8a4455331cb47d358a14736e2c
14 changes: 7 additions & 7 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -212,7 +212,7 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
final boolean updateMetastore) throws Exception {

PlanNode logicalPlan = statementPlanPair.getRight();
KStreamBuilder builder = new KStreamBuilder();
StreamsBuilder builder = new StreamsBuilder();

KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

Expand Down Expand Up @@ -272,7 +272,7 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
*/
private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair, Map<String, Object> overriddenStreamsProperties,
KStreamBuilder builder, KsqlConfig ksqlConfigClone, QueuedSchemaKStream schemaKStream,
StreamsBuilder builder, KsqlConfig ksqlConfigClone, QueuedSchemaKStream schemaKStream,
KsqlBareOutputNode bareOutputNode, String serviceId, String transientQueryPrefix) {

String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix);
Expand Down Expand Up @@ -309,7 +309,7 @@ private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
*/
private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair, Map<String, Object> overriddenStreamsProperties,
boolean updateMetastore, KStreamBuilder builder, KsqlConfig ksqlConfigClone, SchemaKStream schemaKStream,
boolean updateMetastore, StreamsBuilder builder, KsqlConfig ksqlConfigClone, SchemaKStream schemaKStream,
KsqlStructuredDataOutputNode outputNode, String serviceId, String persistanceQueryPrefix) {

long queryId = getNextQueryId();
Expand Down Expand Up @@ -408,7 +408,7 @@ public StructuredDataSource getResultDatasource(final Select select, final Strin
}

private KafkaStreams buildStreams(
final KStreamBuilder builder,
final StreamsBuilder builder,
final String applicationId,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties
Expand All @@ -430,11 +430,11 @@ private KafkaStreams buildStreams(
KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX,
ksqlConfig.get(KsqlConfig.KSQL_TIMESTAMP_COLUMN_INDEX));
newStreamsProperties.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, KsqlTimestampExtractor.class);
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, KsqlTimestampExtractor.class);
}


return new KafkaStreams(builder, new StreamsConfig(newStreamsProperties));
return new KafkaStreams(builder.build(), new StreamsConfig(newStreamsProperties));
}

private long getNextQueryId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.TopologyBuilder;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -76,7 +78,7 @@

public class PhysicalPlanBuilder {

private final KStreamBuilder builder;
private final StreamsBuilder builder;
private final KsqlConfig ksqlConfig;
private final KafkaTopicClient kafkaTopicClient;
private final MetastoreUtil metastoreUtil;
Expand All @@ -102,7 +104,7 @@ public class PhysicalPlanBuilder {

private OutputNode planSink = null;

public PhysicalPlanBuilder(final KStreamBuilder builder,
public PhysicalPlanBuilder(final StreamsBuilder builder,
final KsqlConfig ksqlConfig,
final KafkaTopicClient kafkaTopicClient,
final MetastoreUtil metastoreUtil) {
Expand Down Expand Up @@ -425,8 +427,8 @@ private SchemaKStream buildSource(final SourceNode sourceNode, Map<String, Objec

return new SchemaKStream(sourceNode.getSchema(),
builder
.stream((TopologyBuilder.AutoOffsetReset) null, Serdes.String(), genericRowSerde,
structuredDataSourceNode.getStructuredDataSource().getKsqlTopic().getKafkaTopicName())
.stream(structuredDataSourceNode.getStructuredDataSource().getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde))
.map(nonWindowedMapper)
.transformValues(new AddTimestampColumnValueTransformerSupplier()),
sourceNode.getKeyField(), new ArrayList<>(),
Expand All @@ -435,44 +437,45 @@ private SchemaKStream buildSource(final SourceNode sourceNode, Map<String, Objec
throw new KsqlException("Unsupported source logical node: " + sourceNode.getClass().getName());
}

private <K> KTable table(final KStream<K, GenericRow> stream, final Serde<K> keySerde, final Serde<GenericRow> valueSerde, final String stateStoreName) {
return stream.groupByKey(keySerde, valueSerde)
.reduce((genericRow, newValue) -> newValue, stateStoreName);
private <K> KTable table(final KStream<K, GenericRow> stream, final Serde<K> keySerde, final Serde<GenericRow> valueSerde) {
return stream.groupByKey(Serialized.with(keySerde, valueSerde))
.reduce((genericRow, newValue) -> newValue);
}

@SuppressWarnings("unchecked")
private KTable createKTable(final TopologyBuilder.AutoOffsetReset autoOffsetReset,
private KTable createKTable(final Topology.AutoOffsetReset autoOffsetReset,
final KsqlTable ksqlTable,
final Serde<GenericRow> genericRowSerde,
final Serde<GenericRow> genericRowSerdeAfterRead) {
if (ksqlTable.isWindowed()) {
return table(builder
.stream(autoOffsetReset, windowedSerde, genericRowSerde,
ksqlTable.getKsqlTopic().getKafkaTopicName())
.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(),
Consumed.with(windowedSerde, genericRowSerde)
.withOffsetResetPolicy(autoOffsetReset))
.map(windowedMapper)
.transformValues(new AddTimestampColumnValueTransformerSupplier()), windowedSerde, genericRowSerdeAfterRead, ksqlTable.getStateStoreName());
.transformValues(new AddTimestampColumnValueTransformerSupplier()), windowedSerde, genericRowSerdeAfterRead);
} else {
return table(builder
.stream(autoOffsetReset, Serdes.String(), genericRowSerde,
ksqlTable.getKsqlTopic().getKafkaTopicName())
.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(),
Consumed.with(Serdes.String(), genericRowSerde)
.withOffsetResetPolicy(autoOffsetReset))
.map(nonWindowedMapper)
.transformValues(new AddTimestampColumnValueTransformerSupplier()),
Serdes.String(), genericRowSerdeAfterRead, ksqlTable.getStateStoreName());
Serdes.String(), genericRowSerdeAfterRead);
}
}

private TopologyBuilder.AutoOffsetReset getAutoOffsetReset(Map<String, Object> props) {
TopologyBuilder.AutoOffsetReset autoOffsetReset = null;
private Topology.AutoOffsetReset getAutoOffsetReset(Map<String, Object> props) {
if (props.containsKey(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)) {
if (props.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).toString()
.equalsIgnoreCase("EARLIEST")) {
autoOffsetReset = TopologyBuilder.AutoOffsetReset.EARLIEST;
return Topology.AutoOffsetReset.EARLIEST;
} else if (props.get(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).toString()
.equalsIgnoreCase("LATEST")) {
autoOffsetReset = TopologyBuilder.AutoOffsetReset.LATEST;
return Topology.AutoOffsetReset.LATEST;
}
}
return autoOffsetReset;
return null;
}

private SchemaKStream buildJoin(final JoinNode joinNode, final Map<String, Object> propsMap)
Expand Down Expand Up @@ -529,7 +532,7 @@ private KsqlTopicSerDe getResultTopicSerde(final PlanNode node) {
}


public KStreamBuilder getBuilder() {
public StreamsBuilder getBuilder() {
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.util.KsqlConfig;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.util.KsqlException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;

import java.util.List;

Expand All @@ -51,6 +55,7 @@ public class SchemaKGroupedStream {
this.sourceSchemaKStreams = sourceSchemaKStreams;
}

@SuppressWarnings("unchecked")
public SchemaKTable aggregate(final Initializer initializer,
final KudafAggregator aggregator,
final WindowExpression windowExpression,
Expand All @@ -60,42 +65,38 @@ public SchemaKTable aggregate(final Initializer initializer,
KTable<Windowed<String>, GenericRow> aggKtable;
if (windowExpression != null) {
isWindowed = true;
final Materialized<String, GenericRow, WindowStore<Bytes, byte[]>> materialized
= Materialized.<String, GenericRow, WindowStore<Bytes, byte[]>>as(storeName)
.withValueSerde(topicValueSerDe);
if (windowExpression.getKsqlWindowExpression() instanceof TumblingWindowExpression) {
TumblingWindowExpression tumblingWindowExpression =
(TumblingWindowExpression) windowExpression.getKsqlWindowExpression();
aggKtable =
kgroupedStream
kgroupedStream.windowedBy(TimeWindows.of(tumblingWindowExpression.getSizeUnit().toMillis(tumblingWindowExpression.getSize())))
.aggregate(initializer, aggregator,
TimeWindows.of(tumblingWindowExpression.getSizeUnit().toMillis(tumblingWindowExpression.getSize())),
topicValueSerDe,
storeName);
materialized);
} else if (windowExpression.getKsqlWindowExpression() instanceof HoppingWindowExpression) {
HoppingWindowExpression hoppingWindowExpression =
(HoppingWindowExpression) windowExpression.getKsqlWindowExpression();
aggKtable =
kgroupedStream
.aggregate(initializer, aggregator,
TimeWindows.of(
hoppingWindowExpression.getSizeUnit().toMillis(hoppingWindowExpression.getSize()))
.advanceBy(
hoppingWindowExpression.getAdvanceByUnit().toMillis(hoppingWindowExpression.getAdvanceBy())),
topicValueSerDe, storeName);
kgroupedStream.windowedBy(TimeWindows.of(
hoppingWindowExpression.getSizeUnit().toMillis(hoppingWindowExpression.getSize()))
.advanceBy(
hoppingWindowExpression.getAdvanceByUnit().toMillis(hoppingWindowExpression.getAdvanceBy())))
.aggregate(initializer, aggregator, materialized);
} else if (windowExpression.getKsqlWindowExpression() instanceof SessionWindowExpression) {
SessionWindowExpression sessionWindowExpression =
(SessionWindowExpression) windowExpression.getKsqlWindowExpression();
aggKtable =
kgroupedStream
.aggregate(initializer, aggregator,
aggregator.getMerger(),
SessionWindows.with(sessionWindowExpression.getSizeUnit().toMillis(sessionWindowExpression.getGap())),
topicValueSerDe,
storeName);
kgroupedStream.windowedBy(SessionWindows.with(sessionWindowExpression.getSizeUnit().toMillis(sessionWindowExpression.getGap())))
.aggregate(initializer, aggregator, aggregator.getMerger(),
Materialized.<String, GenericRow, SessionStore<Bytes, byte[]>>as(storeName).withValueSerde(topicValueSerDe));
} else {
throw new KsqlException("Could not set the window expression for aggregate.");
}
} else {
aggKtable =
kgroupedStream.aggregate(initializer, aggregator, topicValueSerDe, storeName);
kgroupedStream.aggregate(initializer, aggregator, Materialized.with(null, topicValueSerDe));
}
return new SchemaKTable(schema, aggKtable, keyField, sourceSchemaKStreams, isWindowed,
SchemaKStream.Type.AGGREGATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,7 +97,7 @@ public SchemaKStream into(final String kafkaTopicName, final Serde<GenericRow> t
}
}
return new KeyValue<>(key, new GenericRow(columns));
}).to(Serdes.String(), topicValueSerDe, kafkaTopicName);
}).to(kafkaTopicName, Produced.with(Serdes.String(), topicValueSerDe));
return this;
}

Expand Down Expand Up @@ -200,7 +203,7 @@ public SchemaKStream leftJoin(final SchemaKTable schemaKTable, final Schema join

GenericRow joinGenericRow = new GenericRow(columns);
return joinGenericRow;
}, Serdes.String(), SerDeUtil.getRowSerDe(joinSerDe, this.getSchema()));
}, Joined.with(Serdes.String(), SerDeUtil.getRowSerDe(joinSerDe, this.getSchema()), null));

return new SchemaKStream(joinSchema, joinedKStream, joinKey,
Arrays.asList(this, schemaKTable), Type.JOIN);
Expand Down Expand Up @@ -230,7 +233,7 @@ public SchemaKStream selectKey(final Field newKeyField) {

public SchemaKGroupedStream groupByKey(final Serde keySerde,
final Serde valSerde) {
KGroupedStream kgroupedStream = kstream.groupByKey(keySerde, valSerde);
KGroupedStream kgroupedStream = kstream.groupByKey(Serialized.with(keySerde, valSerde));
return new SchemaKGroupedStream(schema, kgroupedStream, keyField, Arrays.asList(this));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
Expand Down Expand Up @@ -80,7 +81,7 @@ public SchemaKTable into(final String kafkaTopicName, final Serde<GenericRow> to
}
}
return new KeyValue<>(key, new GenericRow(columns));
}).to(new WindowedSerde(), topicValueSerDe, kafkaTopicName);
}).to(kafkaTopicName, Produced.with(new WindowedSerde(), topicValueSerDe));
} else {
ktable.toStream()
.map((KeyValueMapper<String, GenericRow, KeyValue<String, GenericRow>>) (key, row) -> {
Expand All @@ -94,7 +95,7 @@ public SchemaKTable into(final String kafkaTopicName, final Serde<GenericRow> to
}
}
return new KeyValue<>(key, new GenericRow(columns));
}).to(Serdes.String(), topicValueSerDe, kafkaTopicName);
}).to(kafkaTopicName, Produced.with(Serdes.String(), topicValueSerDe));
}

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.confluent.ksql.util.MetaStoreFixture;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.StreamsBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -48,14 +48,14 @@

public class PhysicalPlanBuilderTest {

KStreamBuilder kStreamBuilder;
StreamsBuilder streamsBuilder;
KsqlParser ksqlParser;
PhysicalPlanBuilder physicalPlanBuilder;
MetaStore metaStore;

@Before
public void before() {
kStreamBuilder = new KStreamBuilder();
streamsBuilder = new StreamsBuilder();
ksqlParser = new KsqlParser();
metaStore = MetaStoreFixture.getNewMetaStore();
Map<String, Object> configMap = new HashMap<>();
Expand All @@ -64,7 +64,7 @@ public void before() {
configMap.put("commit.interval.ms", 0);
configMap.put("cache.max.bytes.buffering", 0);
configMap.put("auto.offset.reset", "earliest");
physicalPlanBuilder = new PhysicalPlanBuilder(kStreamBuilder, new KsqlConfig(configMap), new FakeKafkaTopicClient(), new MetastoreUtil());
physicalPlanBuilder = new PhysicalPlanBuilder(streamsBuilder, new KsqlConfig(configMap), new FakeKafkaTopicClient(), new MetastoreUtil());
}

private SchemaKStream buildPhysicalPlan(String queryStr) throws Exception {
Expand Down
Loading