Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master #1

Merged
merged 161 commits into from
Oct 18, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
161 commits
Select commit Hold shift + click to select a range
bdffcf4
KSQL screencast image
Aug 30, 2017
5ea83ba
remove here hyperlink
Aug 30, 2017
e81f61f
Merge pull request #268 from confluentinc/joel-hamill/demo-image
joel-hamill Aug 30, 2017
c0487e1
Added LIKE examples to syntax guide and examples.
hjafarpour Aug 31, 2017
5589b2e
Fixed typo.
hjafarpour Aug 31, 2017
639e164
Minor doc changes.
hjafarpour Aug 31, 2017
621ccc3
Merge pull request #271 from hjafarpour/KSQL-324-Example-Queries
hjafarpour Aug 31, 2017
70b980f
Improvements to syntax guide
Sep 1, 2017
cc86321
Merge pull request #277 from confluentinc/0.1.x-syntax-guide
miguno Sep 4, 2017
a75dff3
Restructure TOC
Sep 4, 2017
6b896d7
Fixed misspelled words
Sep 5, 2017
056fa14
Fixed a minor error in the syntax guide.
hjafarpour Sep 5, 2017
e1522cb
Merge pull request #281 from hjafarpour/KSQL-324-Example-Queries
hjafarpour Sep 5, 2017
1e5496b
Merge pull request #280 from juneng603/feature-fix-misspelled-words
hjafarpour Sep 5, 2017
15180dd
Clarify PARTITION BY in CSAS
Sep 6, 2017
39c04f8
remove build-tools copy, use pluginRepository in pom (#286)
aayars Sep 6, 2017
032d1b5
Add section Use Cases and Examples
Sep 6, 2017
885cf10
Fix typo
Sep 7, 2017
4637617
Add missing word to roadmap
elliotcm Sep 10, 2017
7f0141b
Merge pull request #288 from elliotcm/patch-1
miguno Sep 11, 2017
c52137d
Clarify FAQ
Sep 12, 2017
3af28e7
Remove trailing whitespace
Sep 12, 2017
b2df850
Add Configure KSQL to Syntax Guide, fix/clariy config examples
Sep 12, 2017
1bf980c
Merge pull request #289 from confluentinc/0.1.x-configuration
miguno Sep 12, 2017
7a9acb9
Emit succinct error for missing query file
Sep 12, 2017
ce7c880
Add upstream project so build are triggered automatically
ewencp Sep 14, 2017
869f598
Clarify client-server mode
Sep 15, 2017
76b0bcc
Clarifications
Sep 15, 2017
01d488c
Add `$` to CLI commands
Sep 15, 2017
ed4f0e6
Clarifications
Sep 15, 2017
b5030ba
Clarifications
Sep 15, 2017
4211fe7
Clarifications
Sep 15, 2017
0f6150c
Fix typo
Sep 15, 2017
7db5983
Remove bullet formatting
Sep 15, 2017
1cb5307
fix checkstyle errors
dguy Sep 15, 2017
e614bd7
Merge pull request #298 from confluentinc/fix-checkstyle
logscape Sep 15, 2017
50ffcac
Document how to configure the Kafka bootstrap servers
Sep 15, 2017
bf8a719
Merge pull request #296 from ewencp/add-upstream-project
ewencp Sep 15, 2017
34c5f9f
Merge remote-tracking branch 'origin/0.1.x'
ewencp Sep 15, 2017
ac19358
Merge pull request #297 from confluentinc/0.1.x-bootstrap-servers
miguno Sep 18, 2017
bbe07e5
Update non-docker-clickstream.md
Sep 18, 2017
a8d1e5d
Merge pull request #301 from himani1/patch-2
logscape Sep 18, 2017
6e2cc63
Merge pull request #291 from kirktrue/omit-stacktrace-for-missing-file
logscape Sep 20, 2017
961eafa
Clarify KSQL works with AK and CP
Sep 20, 2017
9d678d9
Merge pull request #302 from confluentinc/0.1.x-faq-works-with-apache…
logscape Sep 20, 2017
c6d7773
small-code-quality-improvements Added code quality improvements.
TheRealHaui Sep 24, 2017
22b898e
small-code-quality-improvements Added code quality improvements.
TheRealHaui Sep 24, 2017
dfea526
added persistent information for streams and table
Sep 26, 2017
88fda38
Merge pull request #319 from bluemonk3y/0.1.x-DOC-added-persistent-in…
logscape Sep 26, 2017
bf90854
Merge pull request #313 from TheRealHaui/small-code-quality-improvements
logscape Sep 27, 2017
34273f7
fixing commandstore hanging
Sep 27, 2017
a2f8680
reduced poll on priorCommands processing
Sep 27, 2017
c2f12f2
removed header p html
Sep 27, 2017
268ba18
added comment on poll instead of subscribe
Sep 27, 2017
e71f7ca
Merge pull request #321 from bluemonk3y/0.1.x-BUG-fix-command-queue-r…
logscape Sep 27, 2017
36f7624
Clarify how streams and tables are stored in topics
Sep 28, 2017
b8cf069
Merge pull request #323 from confluentinc/0.1.x-concepts-clarification
logscape Sep 28, 2017
a94639e
Including the streams config properties in the session with CTAS and …
hjafarpour Sep 29, 2017
fb5800e
refactoring parser, core and common
Sep 29, 2017
ade44ce
Merge pull request #324 from hjafarpour/KSQL-375-include-query-config…
hjafarpour Sep 29, 2017
ddfec38
extracted serde module
Sep 29, 2017
35e6a40
Merge pull request #325 from bluemonk3y/0.1.x-CODE-restructure
hjafarpour Sep 29, 2017
a33045c
Refactored the package for GenericRow.
hjafarpour Sep 29, 2017
f2a1a9f
Merge pull request #327 from hjafarpour/refactor-common-GenericRow
hjafarpour Sep 29, 2017
bbf636d
Fixed the drop table issue.
hjafarpour Sep 30, 2017
dc1ad22
Merge pull request #328 from hjafarpour/KSQL-376-droptable-issue
hjafarpour Sep 30, 2017
09a7755
extracted pipeline
Oct 2, 2017
115b4b9
Merge pull request #330 from bluemonk3y/0.1.x-CODE-restructure-Queued…
logscape Oct 2, 2017
295fc8b
rename ksql-core to ksql-engine
Oct 2, 2017
850bbbd
Add to Examples: WHERE clause and content-based routing (#333)
ybyzek Oct 3, 2017
ddc9396
Merge pull request #332 from bluemonk3y/0.1.x-CODE-331-rename-ksql-co…
logscape Oct 3, 2017
1584815
First step in better testing tools. Added an integration test tool to…
hjafarpour Oct 3, 2017
58f9a25
Merge pull request #335 from hjafarpour/KSQL-379-Better-Test-Tools
hjafarpour Oct 3, 2017
973c7dc
No need to set the streams application.id for the rest server.
hjafarpour Oct 4, 2017
17fb73f
Merge pull request #336 from hjafarpour/NoNeedForStreamAppIdForRESTSe…
hjafarpour Oct 4, 2017
04094a9
First pass on refactoring KSQL confing.
hjafarpour Oct 4, 2017
fe2296f
extracting new integration test using KSQLContext
Oct 5, 2017
839cb84
Refined admin client config.
hjafarpour Oct 5, 2017
794da0f
remove dead code. make field and method access more restricted
dguy Oct 5, 2017
117d760
Merge branch '0.1.x'
hjafarpour Oct 5, 2017
46f349e
Merge branch 'master' of https://github.com/confluentinc/ksql
hjafarpour Oct 5, 2017
c63aee2
bump common dependency to 3.3.1-SNAPSHOT
Oct 5, 2017
243c00f
Merge remote-tracking branch 'origin/0.1.x'
Oct 5, 2017
392c4d1
using 0.11.0.1 of client to use fixed AdminClient
Oct 5, 2017
443fc5e
kafka version setting to 11.0.1
Oct 5, 2017
c12ca7a
Merge pull request #337 from confluentinc/remove-some-dead-code
hjafarpour Oct 5, 2017
f527704
Merge remote-tracking branch 'upstream/0.1.x' into 0.1.x
hjafarpour Oct 5, 2017
e166ede
Merge branch '0.1.x-CODE-379-started-int-test-refactor' of https://gi…
hjafarpour Oct 6, 2017
520d779
remove unused imports
dguy Oct 6, 2017
5ba12a0
Merge pull request #342 from dguy/unused-imports
dguy Oct 6, 2017
9a17925
remove dead code.
dguy Oct 6, 2017
63228fd
Minor change to use the same Application Id for embedded mode.
hjafarpour Oct 6, 2017
c0836ae
Merge pull request #344 from hjafarpour/bluemonk3y-0.1.x-CODE-379-sta…
hjafarpour Oct 6, 2017
813c850
merge 0.1.x
dguy Oct 6, 2017
2196ea6
Merge pull request #343 from dguy/ksql-engine-tidy
hjafarpour Oct 6, 2017
7c0a11d
Merge remote-tracking branch 'origin/0.1.x'
Oct 7, 2017
e6aaf4d
Merge remote-tracking branch 'upstream/0.1.x' into KSQLConfigRefactor
hjafarpour Oct 9, 2017
71f2d78
Some changes based on the new KSQLConfig.
hjafarpour Oct 9, 2017
8ab2828
Merge pull request #338 from hjafarpour/KSQLConfigRefactor
hjafarpour Oct 9, 2017
6f27e7e
replace window-unit with TimeUnit (#346)
dguy Oct 10, 2017
ff36f7f
skip bad data in json and delimited deserializer (#347)
dguy Oct 10, 2017
f97e11b
Clean up the poms after the refactor that created new packages.
hjafarpour Oct 10, 2017
74beb47
More pom clean up.
hjafarpour Oct 10, 2017
b38cc4b
Merge pull request #350 from hjafarpour/pom-cleanup-after-refactor
hjafarpour Oct 10, 2017
901b3b6
Reverting adminclient singleton temporarily.
hjafarpour Oct 10, 2017
8a6b6a7
Merge pull request #351 from hjafarpour/Revert-AdminClient-Singleton
hjafarpour Oct 10, 2017
5d0978a
Merge branch '0.1.x'
hjafarpour Oct 10, 2017
b9a99fe
Merge pull request #352 from confluentinc/master
hjafarpour Oct 10, 2017
0af3fb8
Bump to common 4.0.0-SNAPSHOT
Oct 11, 2017
0aca9a7
Bump to common 4.1.0-SNAPSHOT
Oct 11, 2017
2e7eb68
update slack channel for build notifications
Oct 11, 2017
c086ca5
Merge remote-tracking branch 'origin/0.1.x' into 4.0.x
Oct 11, 2017
2c82044
Merge remote-tracking branch 'origin/4.0.x'
Oct 11, 2017
aba0f7c
fix artifact version in compose file
Oct 11, 2017
32e41e8
Merge remote-tracking branch 'origin/0.1.x' into 4.0.x
Oct 11, 2017
ed1ee1e
Merge remote-tracking branch 'origin/4.0.x'
Oct 11, 2017
ce9e2da
After bumping the dependency to 4.0.0 we encountered some issues that…
hjafarpour Oct 11, 2017
22e481d
Merge pull request #363 from hjafarpour/KSQL-388-fix-400-build-issues
hjafarpour Oct 11, 2017
27a0e1f
Merge remote-tracking branch 'upstream/master'
hjafarpour Oct 11, 2017
06b59a9
Better clean up for queries. Internal topics will be deletted after q…
hjafarpour Oct 12, 2017
181fa50
Added singleton AdminClient back.
hjafarpour Oct 12, 2017
cc57a3e
extract methods, remove duplication, etc from PhysicalPlanBuilder (#356)
dguy Oct 12, 2017
c480c42
Use streams 1.0 api methods (#365)
dguy Oct 12, 2017
f9e4416
new PR with migrated tests
Oct 12, 2017
dbffadd
tidying stuff up
Oct 12, 2017
cc2177c
trigger build
Oct 12, 2017
2c8ac7b
remove more crud
Oct 12, 2017
32c20ae
Merge pull request #367 from bluemonk3y/4.0.x-TEST-migrating-integrat…
logscape Oct 12, 2017
8b299f3
Merge remote-tracking branch 'upstream/4.0.x' into KSQL-383-clean-up-…
hjafarpour Oct 12, 2017
bcec9eb
Fixed the pom version.
hjafarpour Oct 12, 2017
93634bf
Pom version fix.
hjafarpour Oct 12, 2017
a4c863c
Refactored Function registry.
hjafarpour Oct 13, 2017
8b5b883
adding windowing integration tests, updated streams int test util to …
Oct 13, 2017
5a165c6
trigger
Oct 13, 2017
9c1cc3c
moving packages
Oct 13, 2017
ac3b527
Applied review feedback.
hjafarpour Oct 13, 2017
d18a471
Remove SourceNode as it is redundant (#368)
dguy Oct 16, 2017
858a71b
Merge pull request #371 from bluemonk3y/4.0.x-TEST-windowing-integrat…
logscape Oct 16, 2017
196d4db
adding stream-table join
Oct 16, 2017
a148075
exposing time controls when publishing test data
Oct 16, 2017
73ec9f2
exposing time controls for int tests
Oct 16, 2017
91dd2d5
removed old comments
Oct 16, 2017
e03d4d1
Merge remote-tracking branch 'upstream/4.0.x' into KSQL-386-KsqlFunct…
hjafarpour Oct 16, 2017
87671d0
trigger build
Oct 17, 2017
7055aca
remove jackson dependency in ksql-rest-app. (#369)
dguy Oct 17, 2017
36829c3
Merge pull request #375 from bluemonk3y/4.0.x-TEST-stream-table-join-…
logscape Oct 17, 2017
eb12963
move the createsink method out of SchemaKStream (#376)
dguy Oct 17, 2017
1aecb91
make join test more reliable (#381)
dguy Oct 17, 2017
587b233
rework threading on QueryStreamWriter
Oct 17, 2017
ccd9e13
import
Oct 17, 2017
1fa24e3
import
Oct 17, 2017
1676efc
import
Oct 17, 2017
66c7f13
Use streams deserializtion exception handler (#385)
dguy Oct 17, 2017
28a832d
extract SelectValueMapper from SchemaKXxxx (#379)
dguy Oct 17, 2017
0af6c5b
minor tidyups
Oct 17, 2017
c84ef9d
Merge branch '4.0.x' of https://github.com/confluentinc/ksql into 4.0…
Oct 17, 2017
52ef4ef
Put window aggregate logic into the various KsqlWindowExpression clas…
dguy Oct 17, 2017
863fa31
Merge pull request #383 from bluemonk3y/4.0.x-CODE-QueryStreamWriterT…
logscape Oct 17, 2017
984e51a
Minor modification in KSQLContext.
hjafarpour Oct 17, 2017
c9b3f75
Merge remote-tracking branch 'upstream/4.0.x' into KSQL-386-KsqlFunct…
hjafarpour Oct 17, 2017
de6772e
Merge pull request #370 from hjafarpour/KSQL-386-KsqlFunctions-non-st…
hjafarpour Oct 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,13 @@
import java.util.List;
import java.util.Map;

import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;

public class KsqlContext {

private static final Logger log = LoggerFactory.getLogger(KsqlContext.class);
final KsqlEngine ksqlEngine;
private final KsqlEngine ksqlEngine;
private static final String APPLICATION_ID_OPTION_DEFAULT = "ksql_standalone_cli";
private static final String KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT = "localhost:9092";
private final AdminClient adminClient;
Expand All @@ -53,7 +52,7 @@ public KsqlContext() {
*
* @param streamsProperties
*/
public KsqlContext(Map<String, Object> streamsProperties) {
KsqlContext(Map<String, Object> streamsProperties) {
if (streamsProperties == null) {
streamsProperties = new HashMap<>();
}
Expand Down Expand Up @@ -104,24 +103,4 @@ public void close() throws IOException {
topicClient.close();
adminClient.close();
}

/**
* Terminate a query with the given id.
*
* @param queryId
*/
public void terminateQuery(long queryId) {
if (!ksqlEngine.getPersistentQueries().containsKey(queryId)) {
throw new KsqlException(String.format("Invalid query id. Query id, %d, does not exist.",
queryId));
}
PersistentQueryMetadata persistentQueryMetadata = ksqlEngine
.getPersistentQueries().get(queryId);
persistentQueryMetadata.getKafkaStreams().close();
ksqlEngine.getPersistentQueries().remove(queryId);
}

public Map<Long, PersistentQueryMetadata> getRunningQueries() {
return ksqlEngine.getPersistentQueries();
}
}
4 changes: 0 additions & 4 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,6 @@ public Map<Long, PersistentQueryMetadata> getPersistentQueries() {
return new HashMap<>(persistentQueries);
}

public Set<QueryMetadata> getLiveQueries() {
return new HashSet<>(liveQueries);
}

public static List<String> getImmutableProperties() {
return new ArrayList<>(IMMUTABLE_PROPERTIES);
}
Expand Down
14 changes: 7 additions & 7 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ public class QueryEngine {
private final KsqlEngine ksqlEngine;


public QueryEngine(final KsqlEngine ksqlEngine) {
QueryEngine(final KsqlEngine ksqlEngine) {
this.queryIdCounter = new AtomicLong(1);
this.ksqlEngine = ksqlEngine;
}


public List<Pair<String, PlanNode>> buildLogicalPlans(
List<Pair<String, PlanNode>> buildLogicalPlans(
final MetaStore metaStore,
final List<Pair<String, Statement>> statementList) {

Expand All @@ -116,20 +116,20 @@ public List<Pair<String, PlanNode>> buildLogicalPlans(
return logicalPlansList;
}

public PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMetaStore) {
private PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMetaStore) {

// Analyze the query to resolve the references and extract operations
Analysis analysis = new Analysis();
Analyzer analyzer = new Analyzer(analysis, tempMetaStore);
analyzer.process(query, new AnalysisContext(null, null));
analyzer.process(query, new AnalysisContext(null));

AggregateAnalysis aggregateAnalysis = new AggregateAnalysis();
AggregateAnalyzer aggregateAnalyzer = new
AggregateAnalyzer(aggregateAnalysis, tempMetaStore, analysis);
AggregateAnalyzer(aggregateAnalysis, analysis);
AggregateExpressionRewriter aggregateExpressionRewriter = new AggregateExpressionRewriter();
for (Expression expression: analysis.getSelectExpressions()) {
aggregateAnalyzer
.process(expression, new AnalysisContext(null, null));
.process(expression, new AnalysisContext(null));
if (!aggregateAnalyzer.isHasAggregateFunction()) {
aggregateAnalysis.getNonAggResultColumns().add(expression);
}
Expand All @@ -145,7 +145,7 @@ public PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMet
// TODO: make sure only aggregates are in the expression. For now we assume this is the case.
if (analysis.getHavingExpression() != null) {
aggregateAnalyzer.process(analysis.getHavingExpression(),
new AnalysisContext(null, null));
new AnalysisContext(null));
if (!aggregateAnalyzer.isHasAggregateFunction()) {
aggregateAnalysis.getNonAggResultColumns().add(analysis.getHavingExpression());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,14 @@

public class AggregateAnalysis {

private List<Expression> requiredColumnsList = new ArrayList<>();
private Expression havingExpression = null;
private Map<String, Expression> requiredColumnsMap = new HashMap<>();
private List<Expression> nonAggResultColumns = new ArrayList<>();
private List<Expression> finalSelectExpressions = new ArrayList<>();
List<Expression> aggregateFunctionArguments = new ArrayList<>();
List<Expression> requiredColumnsList = new ArrayList<>();

Expression havingExpression = null;

Map<String, Expression> requiredColumnsMap = new HashMap<>();

List<FunctionCall> functionList = new ArrayList<>();

List<Expression> nonAggResultColumns = new ArrayList<>();

List<Expression> finalSelectExpressions = new ArrayList<>();

public List<Expression> getAggregateFunctionArguments() {
return aggregateFunctionArguments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.confluent.ksql.analyzer;

import io.confluent.ksql.function.KsqlFunctions;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.tree.DereferenceExpression;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.FunctionCall;
Expand All @@ -30,7 +29,6 @@
public class AggregateAnalyzer extends DefaultTraversalVisitor<Node, AnalysisContext> {

private AggregateAnalysis aggregateAnalysis;
private MetaStore metaStore;
private Analysis analysis;

private boolean hasAggregateFunction = false;
Expand All @@ -43,10 +41,9 @@ public void setHasAggregateFunction(boolean hasAggregateFunction) {
this.hasAggregateFunction = hasAggregateFunction;
}

public AggregateAnalyzer(AggregateAnalysis aggregateAnalysis, MetaStore metaStore,
public AggregateAnalyzer(AggregateAnalysis aggregateAnalysis,
Analysis analysis) {
this.aggregateAnalysis = aggregateAnalysis;
this.metaStore = metaStore;
this.analysis = analysis;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public class Analysis {
private Map<String, Object> intoProperties = new HashMap<>();
private String intoFormat = null;
// TODO: Maybe have all as properties. At the moment this will only be set if format is avro.
private String intoAvroSchemaFilePath = null;
private String intoKafkaTopicName = null;
private List<Pair<StructuredDataSource, String>> fromDataSources = new ArrayList<>();
private JoinNode join;
Expand All @@ -50,7 +49,7 @@ public class Analysis {
private Optional<Integer> limitClause = Optional.empty();


public void addSelectItem(final Expression expression, final String alias) {
void addSelectItem(final Expression expression, final String alias) {
selectExpressions.add(expression);
selectExpressionAlias.add(alias);
}
Expand All @@ -68,10 +67,6 @@ public List<Pair<StructuredDataSource, String>> getFromDataSources() {
return fromDataSources;
}

public void setFromDataSources(List<Pair<StructuredDataSource, String>> fromDataSources) {
this.fromDataSources = fromDataSources;
}

public Expression getWhereExpression() {
return whereExpression;
}
Expand All @@ -84,18 +79,10 @@ public List<Expression> getSelectExpressions() {
return selectExpressions;
}

public void setSelectExpressions(List<Expression> selectExpressions) {
this.selectExpressions = selectExpressions;
}

public List<String> getSelectExpressionAlias() {
return selectExpressionAlias;
}

public void setSelectExpressionAlias(List<String> selectExpressionAlias) {
this.selectExpressionAlias = selectExpressionAlias;
}

public JoinNode getJoin() {
return join;
}
Expand All @@ -120,22 +107,10 @@ public String getIntoKafkaTopicName() {
return intoKafkaTopicName;
}

public String getIntoAvroSchemaFilePath() {
return intoAvroSchemaFilePath;
}

public void setIntoAvroSchemaFilePath(String intoAvroSchemaFilePath) {
this.intoAvroSchemaFilePath = intoAvroSchemaFilePath;
}

public List<Expression> getGroupByExpressions() {
return groupByExpressions;
}

public void setGroupByExpressions(List<Expression> groupByExpressions) {
this.groupByExpressions = groupByExpressions;
}

public WindowExpression getWindowExpression() {
return windowExpression;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.parser.tree.Node;

public class AnalysisContext {

public enum ParentType {
Expand All @@ -38,18 +36,12 @@ public String getValue() {
}
}

final Node parentNode;
final ParentType parentType;
private final ParentType parentType;

public AnalysisContext(final Node parentNode, final ParentType parentType) {
this.parentNode = parentNode;
public AnalysisContext(final ParentType parentType) {
this.parentType = parentType;
}

public Node getParentNode() {
return parentNode;
}

public ParentType getParentType() {
return parentType;
}
Expand Down
35 changes: 14 additions & 21 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,29 @@ protected Node visitQuerySpecification(final QuerySpecification node,
final AnalysisContext context) {

process(node.getFrom().get(),
new AnalysisContext(null, AnalysisContext.ParentType.FROM));
new AnalysisContext(AnalysisContext.ParentType.FROM));

process(node.getInto().get(), new AnalysisContext(null,
AnalysisContext.ParentType.INTO));
process(node.getInto().get(), new AnalysisContext(
AnalysisContext.ParentType.INTO));
if (!(analysis.getInto() instanceof KsqlStdOut)) {
analyzeNonStdOutSink();
}

process(node.getSelect(), new AnalysisContext(null,
AnalysisContext.ParentType.SELECT));
process(node.getSelect(), new AnalysisContext(
AnalysisContext.ParentType.SELECT));
if (node.getWhere().isPresent()) {
analyzeWhere(node.getWhere().get(), context);
analyzeWhere(node.getWhere().get());
}
if (node.getGroupBy().isPresent()) {
analyzeGroupBy(node.getGroupBy().get(), context);
analyzeGroupBy(node.getGroupBy().get());
}

if (node.getWindowExpression().isPresent()) {
analyzeWindowExpression(node.getWindowExpression().get(), context);
analyzeWindowExpression(node.getWindowExpression().get());
}

if (node.getHaving().isPresent()) {
analyzeHaving(node.getHaving().get(), context);
analyzeHaving(node.getHaving().get());
}

if (node.getLimit().isPresent()) {
Expand All @@ -116,7 +116,7 @@ protected Node visitQuerySpecification(final QuerySpecification node,
private void analyzeNonStdOutSink() {
List<Pair<StructuredDataSource, String>> fromDataSources = analysis.getFromDataSources();

StructuredDataSource intoStructuredDataSource = (StructuredDataSource) analysis.getInto();
StructuredDataSource intoStructuredDataSource = analysis.getInto();
String intoKafkaTopicName = analysis.getIntoKafkaTopicName();
if (intoKafkaTopicName == null) {
intoKafkaTopicName = intoStructuredDataSource.getName();
Expand Down Expand Up @@ -408,28 +408,22 @@ protected Node visitGroupBy(final GroupBy node, final AnalysisContext context) {
return null;
}

private StructuredDataSource analyzeFrom(final QuerySpecification node,
final AnalysisContext context) {
return null;
}

private void analyzeWhere(final Node node, final AnalysisContext context) {
private void analyzeWhere(final Node node) {
analysis.setWhereExpression((Expression) node);
}

private void analyzeGroupBy(final GroupBy groupBy, final AnalysisContext context) {
private void analyzeGroupBy(final GroupBy groupBy) {
for (GroupingElement groupingElement : groupBy.getGroupingElements()) {
Set<Expression> groupingSet = groupingElement.enumerateGroupingSets().get(0);
analysis.getGroupByExpressions().addAll(groupingSet);
}
}

private void analyzeWindowExpression(final WindowExpression windowExpression,
final AnalysisContext context) {
private void analyzeWindowExpression(final WindowExpression windowExpression) {
analysis.setWindowExpression(windowExpression);
}

private void analyzeHaving(final Node node, final AnalysisContext context) {
private void analyzeHaving(final Node node) {
analysis.setHavingExpression((Expression) node);
}

Expand Down Expand Up @@ -526,7 +520,6 @@ private void setIntoTopicFormat(final StructuredDataSource into, final Table nod
}
avroSchemaFilePath = avroSchemaFilePath.substring(1, avroSchemaFilePath.length() - 1);
}
analysis.setIntoAvroSchemaFilePath(avroSchemaFilePath);
analysis.getIntoProperties().put(DdlConfig.AVRO_SCHEMA_FILE, avroSchemaFilePath);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.function.KsqlFunction;
import io.confluent.ksql.function.KsqlFunctions;
import io.confluent.ksql.parser.tree.ArithmeticBinaryExpression;
import io.confluent.ksql.parser.tree.AstVisitor;
import io.confluent.ksql.parser.tree.Cast;
Expand All @@ -39,15 +37,15 @@


public class ExpressionAnalyzer {
final Schema schema;
final boolean isJoinSchema;
private final Schema schema;
private final boolean isJoinSchema;

public ExpressionAnalyzer(Schema schema, boolean isJoinSchema) {
ExpressionAnalyzer(Schema schema, boolean isJoinSchema) {
this.schema = schema;
this.isJoinSchema = isJoinSchema;
}

public void analyzeExpression(Expression expression) {
void analyzeExpression(Expression expression) {
Visitor visitor = new Visitor(schema);
visitor.process(expression, null);
}
Expand All @@ -67,8 +65,6 @@ protected Object visitLikePredicate(LikePredicate node, Object context) {
}

protected Object visitFunctionCall(FunctionCall node, Object context) {
String functionName = node.getName().getSuffix();
KsqlFunction ksqlFunction = KsqlFunctions.getFunction(functionName);
for (Expression argExpr : node.getArguments()) {
process(argExpr, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public String process(final Expression expression, final Schema schema) {
return formatExpression(expression, true);
}

public String formatExpression(final Expression expression, final boolean unmangleNames) {
private String formatExpression(final Expression expression, final boolean unmangleNames) {
Pair<String, Schema>
expressionFormatterResult =
new SqlToJavaVisitor.Formatter().process(expression, unmangleNames);
Expand Down
Loading