Skip to content

Commit

Permalink
Pulling in the KSQL Experimental UI (#693)
Browse files Browse the repository at this point in the history
* adding show topics

* adding information to show topics - consumerGroupCount and consumerCount

* updated show topics to include consumerGroupCount and consumerCount

* display the output format on the stream

* PR updates to include documenation and feedback

* fixed grammar

* PR changes from feedback

* fixed test

* cut 1

* cut 2

* removed broken files

* reverted pom dependency

* adding ksqlserver.properties.ui.disabled=false

* checkstyle

* change ui.disabled to ui.enabled

* started adding docs

* final set of changes to improve UX

* final bits

* fixing final tests due to version checker location change

* last changes to docs and clickstream.sql file

* missed some uppercasing
  • Loading branch information
Neil Avery authored Feb 7, 2018
1 parent e43aa2a commit 677e52f
Show file tree
Hide file tree
Showing 18 changed files with 237 additions and 555 deletions.
2 changes: 1 addition & 1 deletion config/ksqlserver.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
bootstrap.servers=localhost:9092
ksql.command.topic.suffix=commands

listeners=http://localhost:8080
ui.enabled=true
10 changes: 10 additions & 0 deletions docs/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,20 @@ Its interface should be familiar to users of MySQL, Postgres, Oracle, Hive, Pres

The KSQL CLI acts as a client to the KSQL server (see next section).

>An alternative to the KSQL CLI when doing local development and testing is to use the KSQL Experimental UI, see below.

#### KSQL Server
The KSQL server runs the engine that executes KSQL queries, which includes the data processing as well as reading
data from and writing data to the target Kafka cluster.

#### KSQL Experimental UI
KSQL currently ships with an [experimental web UI](https://github.com/confluentinc/ksql-experimental-ui), whose goal it is to provide a simple visual wrapper for the interactive KSQL CLI. The experimental UI is meant to be used for local development, testing, and demoing KSQL. If you want to use KSQL interactively in production environments, we recommend the use of the KSQL CLI instead.

* By default, the KSQL Experimental UI runs on every KSQL Server and is accessible at ``http://ksqlserver:8080``
* To disable the experimental UI you must add ``ui.enabled=false`` to the server's configuration file ``ksqlserver.properties``.
* The UI shares the port with KSQL's REST API, which you can configure via the ``listeners`` configuration property.

# Terminology
When using KSQL, the following terminology is used.

Expand Down
45 changes: 22 additions & 23 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,27 @@

package io.confluent.ksql.cli;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.cli.console.CliSpecificCommand;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.AstBuilder;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ErrorMessageEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.CommonUtils;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Version;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand Down Expand Up @@ -48,28 +69,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.cli.console.CliSpecificCommand;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.AstBuilder;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ErrorMessageEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.CommonUtils;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Version;

public class Cli implements Closeable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);
Expand Down Expand Up @@ -132,7 +131,7 @@ public void runInteractively() {
private void displayWelcomeMessage() {
String serverVersion;
try {
serverVersion = restClient.makeRootRequest().getResponse().getVersion();
serverVersion = restClient.getServerInfo().getResponse().getVersion();
} catch (Exception exception) {
serverVersion = "<unknown>";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ public void printHelp() {

@Override
public void execute(String commandStrippedLine) {
ServerInfo serverInfo = restClient.makeRootRequest().getResponse();
ServerInfo serverInfo = restClient.getServerInfo().getResponse();
writer().printf("Version: %s%n", serverInfo.getVersion());
flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void shouldPrintErrorOnErrorResponseFromRestClient() {
final RemoteCli.RemoteCliSpecificCommand command = new RemoteCli.RemoteCliSpecificCommand(
new KsqlRestClient("xxxx", Collections.emptyMap()) {
@Override
public RestResponse<ServerInfo> makeRootRequest() {
public RestResponse<ServerInfo> getServerInfo() {
return RestResponse.erroneous(new ErrorMessage("it is broken", Collections.emptyList()));
}
}, new PrintWriter(out));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void shouldThrowConfigExceptionIfOnlyPasswordIsProvided() throws Exceptio
}

@Test
public void shouldReturnRemoteCliWhenBothUsernameAndPasswordAreProvided() throws Exception {
public void shouldReturnRemoteCliWhenBothUsCliTesternameAndPasswordAreProvided() throws Exception {
Remote remoteCommand = new Remote();
remoteCommand.server = "http://foobar";
remoteCommand.versionCheckerAgent = EasyMock.mock(VersionCheckerAgent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void shouldRaiseAuthenticationExceptionOn401Response() {
String serverAddress = "http://foobar";
Client client = mockClientExpectingGetRequestAndReturningStatus(serverAddress, Response.Status.UNAUTHORIZED);
KsqlRestClient restClient = new KsqlRestClient(client, serverAddress);
RestResponse restResponse = restClient.makeRootRequest();
RestResponse restResponse = restClient.getServerInfo();
assertTrue(restResponse.isErroneous());
}

Expand All @@ -43,7 +43,7 @@ public void shouldReturnSuccessfulResponseWhenAuthenticationSucceeds() {
String serverAddress = "http://foobar";
Client client = mockClientExpectingGetRequestAndReturningStatus(serverAddress, Response.Status.OK);
KsqlRestClient restClient = new KsqlRestClient(client, serverAddress);
RestResponse restResponse = restClient.makeRootRequest();
RestResponse restResponse = restClient.getServerInfo();
assertTrue(restResponse.isSuccessful());
}

Expand All @@ -52,7 +52,7 @@ private Client mockClientExpectingGetRequestAndReturningStatus(String server, Re

WebTarget target = EasyMock.createNiceMock(WebTarget.class);
EasyMock.expect(client.target(server)).andReturn(target);
EasyMock.expect(target.path("/")).andReturn(target);
EasyMock.expect(target.path("/info")).andReturn(target);
Invocation.Builder builder = EasyMock.createNiceMock(Invocation.Builder.class);
EasyMock.expect(target.request(MediaType.APPLICATION_JSON_TYPE)).andReturn(builder);
Response response = EasyMock.createNiceMock(Response.class);
Expand Down
46 changes: 23 additions & 23 deletions ksql-clickstream-demo/demo/clickstream-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varcha

-- number of events per minute - think about key-for-distribution-purpose - shuffling etc - shouldnt use 'userid'
DROP TABLE events_per_min;
create table events_per_min as select userid, count(*) as events from clickstream window TUMBLING (size 10 second) group by userid;
CREATE table events_per_min AS SELECT userid, count(*) AS events FROM clickstream window TUMBLING (size 10 second) GROUP BY userid;

-- VIEW - Enrich with rowTime
DROP TABLE events_per_min_ts;
CREATE TABLE events_per_min_ts as select rowTime as event_ts, * from events_per_min;
CREATE TABLE events_per_min_ts AS SELECT rowTime AS event_ts, * FROM events_per_min;

-- VIEW
DROP TABLE events_per_min_max_avg;
DROP TABLE events_per_min_max_avg_ts;
create table events_per_min_max_avg as select userid, min(events) as min, max(events) as max, sum(events)/count(events) as avg from events_per_min WINDOW TUMBLING (size 10 second) group by userid;
create table events_per_min_max_avg_ts as select rowTime as event_ts, * from events_per_min_max_avg;
CREATE TABLE events_per_min_max_avg AS SELECT userid, min(events) AS min, max(events) AS max, sum(events)/count(events) AS avg from events_per_min WINDOW TUMBLING (size 10 second) GROUP BY userid;
CREATE TABLE events_per_min_max_avg_ts AS SELECT rowTime AS event_ts, * FROM events_per_min_max_avg;


-- 3. BUILD STATUS_CODES
Expand All @@ -37,16 +37,16 @@ CREATE TABLE clickstream_codes (code int, definition varchar) with (key='code',

-- Add _TS for Timeseries storage
DROP TABLE clickstream_codes_ts;
create table clickstream_codes_ts as select rowTime as event_ts, * from clickstream_codes;
CREATE table clickstream_codes_ts AS SELECT rowTime AS event_ts, * FROM clickstream_codes;


-- 4. BUILD PAGE_VIEWS
DROP TABLE pages_per_min;
create table pages_per_min as select userid, count(*) as pages from clickstream WINDOW HOPPING (size 10 second, advance by 5 second) WHERE request like '%html%' group by userid ;
CREATE TABLE pages_per_min AS SELECT userid, count(*) AS pages FROM clickstream WINDOW HOPPING (size 10 second, advance by 5 second) WHERE request like '%html%' GROUP BY userid ;

-- Add _TS for Timeseries storage
DROP TABLE pages_per_min_ts;
CREATE TABLE pages_per_min_ts as select rowTime as event_ts, * from pages_per_min;
CREATE TABLE pages_per_min_ts AS SELECT rowTime AS event_ts, * FROM pages_per_min;


----------------------------------------------------------------------------------------------------------------------------
Expand All @@ -57,15 +57,15 @@ CREATE TABLE pages_per_min_ts as select rowTime as event_ts, * from pages_per_mi

-- Use 'HAVING' Filter to show ERROR codes > 400 where count > 5
DROP TABLE ERRORS_PER_MIN_ALERT;
create TABLE ERRORS_PER_MIN_ALERT as select status, count(*) as errors from clickstream window HOPPING ( size 30 second, advance by 20 second) WHERE status > 400 group by status HAVING count(*) > 5 AND count(*) is not NULL;
CREATE TABLE ERRORS_PER_MIN_ALERT AS SELECT status, count(*) AS errors FROM clickstream window HOPPING ( size 30 second, advance by 20 second) WHERE status > 400 GROUP BY status HAVING count(*) > 5 AND count(*) is not NULL;
DROP TABLE ERRORS_PER_MIN_ALERT_TS;
CREATE TABLE ERRORS_PER_MIN_ALERT_TS as select rowTime as event_ts, * from ERRORS_PER_MIN_ALERT;
CREATE TABLE ERRORS_PER_MIN_ALERT_TS AS SELECT rowTime AS event_ts, * FROM ERRORS_PER_MIN_ALERT;


DROP TABLE ERRORS_PER_MIN;
create table ERRORS_PER_MIN as select status, count(*) as errors from clickstream window HOPPING ( size 10 second, advance by 5 second) WHERE status > 400 group by status;
CREATE table ERRORS_PER_MIN AS SELECT status, count(*) AS errors FROM clickstream window HOPPING ( size 10 second, advance by 5 second) WHERE status > 400 GROUP BY status;
DROP TABLE ERRORS_PER_MIN_TS;
CREATE TABLE ERRORS_PER_MIN_TS as select rowTime as event_ts, * from ERRORS_PER_MIN;
CREATE TABLE ERRORS_PER_MIN_TS AS SELECT rowTime AS event_ts, * FROM ERRORS_PER_MIN;


-- VIEW - Enrich Codes with errors with Join to Status-Code definition
Expand All @@ -78,7 +78,7 @@ CREATE STREAM ENRICHED_ERROR_CODES AS SELECT code, definition FROM clickstream L
-- Aggregate (count&groupBy) using a TABLE-Window
CREATE TABLE ENRICHED_ERROR_CODES_COUNT AS SELECT code, definition, COUNT(*) AS count FROM ENRICHED_ERROR_CODES WINDOW TUMBLING (size 30 second) GROUP BY code, definition HAVING COUNT(*) > 1;
-- Enrich w rowTime timestamp to support timeseries search
CREATE TABLE ENRICHED_ERROR_CODES_TS AS SELECT rowTime as EVENT_TS, * FROM ENRICHED_ERROR_CODES_COUNT;
CREATE TABLE ENRICHED_ERROR_CODES_TS AS SELECT rowTime AS EVENT_TS, * FROM ENRICHED_ERROR_CODES_COUNT;


----------------------------------------------------------------------------------------------------------------------------
Expand All @@ -93,19 +93,19 @@ CREATE TABLE WEB_USERS (user_id int, registered_At long, username varchar, first

-- Clickstream enriched with user account data
DROP STREAM customer_clickstream;
CREATE STREAM customer_clickstream WITH (PARTITIONS=2) as SELECT userid, u.first_name, u.last_name, u.level, time, ip, request, status, agent FROM clickstream c LEFT JOIN web_users u ON c.userid = u.user_id;
CREATE STREAM customer_clickstream WITH (PARTITIONS=2) AS SELECT userid, u.first_name, u.last_name, u.level, time, ip, request, status, agent FROM clickstream c LEFT JOIN web_users u ON c.userid = u.user_id;

-- Find error views by important users
--DROP STREAM platinum_customers_with_errors
--create stream platinum_customers_with_errors WITH (PARTITIONS=2) as seLECT * FROM customer_clickstream WHERE status > 400 AND level = 'Platinum';
--CREATE stream platinum_customers_with_errors WITH (PARTITIONS=2) AS seLECT * FROM customer_clickstream WHERE status > 400 AND level = 'Platinum';

-- Find error views by important users in one shot
--DROP STREAM platinum_errors;
--CREATE STREAM platinum_errors WITH (PARTITIONS=2) as SELECT userid, u.first_name, u.last_name, u.city, u.level, time, ip, request, status, agent FROM clickstream c LEFT JOIN web_users u ON c.userid = u.user_id WHERE status > 400 AND level = 'Platinum';
--CREATE STREAM platinum_errors WITH (PARTITIONS=2) AS SELECT userid, u.first_name, u.last_name, u.city, u.level, time, ip, request, status, agent FROM clickstream c LEFT JOIN web_users u ON c.userid = u.user_id WHERE status > 400 AND level = 'Platinum';
--
---- Trend of errors from important users
--DROP TABLE platinum_page_errors_per_5_min;
--CREATE TABLE platinum_errors_per_5_min AS SELECT userid, first_name, last_name, city, count(*) as running_count FROM platinum_errors WINDOW TUMBLING (SIZE 5 MINUTE) WHERE request LIKE '%html%' GROUP BY userid, first_name, last_name, city;
--CREATE TABLE platinum_errors_per_5_min AS SELECT userid, first_name, last_name, city, count(*) AS running_count FROM platinum_errors WINDOW TUMBLING (SIZE 5 MINUTE) WHERE request LIKE '%html%' GROUP BY userid, first_name, last_name, city;


----------------------------------------------------------------------------------------------------------------------------
Expand All @@ -122,7 +122,7 @@ CREATE TABLE USER_IP_ACTIVITY AS SELECT username, ip, city, COUNT(*) AS count FR

-- Enrich w rowTime timestamp to support timeseries search
DROP TABLE USER_IP_ACTIVITY_TS;
CREATE TABLE USER_IP_ACTIVITY_TS AS SELECT rowTime as EVENT_TS, * FROM USER_IP_ACTIVITY;
CREATE TABLE USER_IP_ACTIVITY_TS AS SELECT rowTime AS EVENT_TS, * FROM USER_IP_ACTIVITY;


----------------------------------------------------------------------------------------------------------------------------
Expand All @@ -134,8 +134,8 @@ CREATE TABLE USER_IP_ACTIVITY_TS AS SELECT rowTime as EVENT_TS, * FROM USER_IP_A

DROP TABLE CLICK_USER_SESSIONS;
DROP TABLE CLICK_USER_SESSIONS_TS;
CREATE TABLE CLICK_USER_SESSIONS as SELECT username, count(*) as events FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username;
CREATE TABLE CLICK_USER_SESSIONS_TS as SELECT rowTime as event_ts, * from CLICK_USER_SESSIONS;
CREATE TABLE CLICK_USER_SESSIONS AS SELECT username, count(*) AS events FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username;
CREATE TABLE CLICK_USER_SESSIONS_TS AS SELECT rowTime AS event_ts, * FROM CLICK_USER_SESSIONS;


----------------------------------------------------------------------------------------------------------------------------
Expand All @@ -146,13 +146,13 @@ CREATE TABLE CLICK_USER_SESSIONS_TS as SELECT rowTime as event_ts, * from CLICK_
----------------------------------------------------------------------------------------------------------------------------

--DROP TABLE PER_USER_KBYTES;
--create TABLE PER_USER_KBYTES as SELECT username, sum(bytes)/1024 as kbytes FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username;
--CREATE TABLE PER_USER_KBYTES AS SELECT username, sum(bytes)/1024 AS kbytes FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username;

--DROP TABLE PER_USER_KBYTES_TS;
--CREATE TABLE PER_USER_KBYTES_TS as select rowTime as event_ts, kbytes, username from PER_USER_KBYTES WHERE ip IS NOT NULL;
--CREATE TABLE PER_USER_KBYTES_TS AS select rowTime AS event_ts, kbytes, username FROM PER_USER_KBYTES WHERE ip IS NOT NULL;

--DROP TABLE MALICIOUS_USER_SESSIONS;
--CREATE TABLE MALICIOUS_USER_SESSIONS as SELECT username, ip, sum(bytes)/1024 as kbytes FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username, ip HAVING sum(bytes)/1024 > 50;
--CREATE TABLE MALICIOUS_USER_SESSIONS_TS as select rowTime as event_ts, ip, username, kbytes from MALICIOUS_USER_SESSIONS;
--CREATE TABLE MALICIOUS_USER_SESSIONS AS SELECT username, ip, sum(bytes)/1024 AS kbytes FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username, ip HAVING sum(bytes)/1024 > 50;
--CREATE TABLE MALICIOUS_USER_SESSIONS_TS AS select rowTime AS event_ts, ip, username, kbytes FROM MALICIOUS_USER_SESSIONS;


44 changes: 44 additions & 0 deletions ksql-rest-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,50 @@

<build>
<plugins>

<!-- Grab the web interface -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<executions>
<execution>
<id>download-ksql-experimental-ui</id>
<phase>process-resources</phase>
<goals>
<goal>download-single</goal>
</goals>
<configuration>
<url>https://github.com/confluentinc</url>
<fromFile>ksql-experimental-ui/releases/download/0.1/ksql-experimental-ui-0.1.war</fromFile>
<toDir>./target/classes/io/confluent/ksql/rest/</toDir>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<id>prepare</id>
<phase>process-resources</phase>
<configuration>
<tasks>
<echo message="unpacking the ui" />
<unzip src="./target/classes/io/confluent/ksql/rest/ksql-experimental-ui-0.1.war"
dest="./target/classes/io/confluent/ksql/rest/" />
</tasks>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down
Loading

0 comments on commit 677e52f

Please sign in to comment.