Skip to content

Commit

Permalink
fixed issues[smartloli#418] [smartloli#420] and add connect monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
smartloli committed Sep 27, 2020
1 parent d8a77f9 commit 1fe6e64
Show file tree
Hide file tree
Showing 9 changed files with 560 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ kafka.eagle.metrics.retain=15
# master quartz time
######################################
kafka.eagle.quartz.master.time=* */1 * * * ?
kafka.eagle.distributed.enable=true

######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.worknode.port=8787
# kafka.eagle.sql.distributed.enable=false
kafka.eagle.sql.worknode.rpc.timeout=300000
kafka.eagle.sql.worknode.fetch.threshold=20
kafka.eagle.sql.worknode.fetch.timeout=20000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,4 @@ private static List<WorkNodeStrategy> getWorkNodes() {
return nodes;
}

public static void main(String[] args) {
String sql = "select * from kjson where `partition` in (0) and JSON(msg,'id')=1 limit 10";
System.out.println(getTaskStrategy(sql, "cluster1"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public List<MetadataInfo> metadata(String clusterAlias, String topicName, Map<St
*/
public String execute(String clusterAlias, String sql, String jobId, String type) {
String result = "";
if (SystemConfigUtils.getBooleanProperty("kafka.eagle.distributed.enable")) {
if (SystemConfigUtils.getBooleanProperty("kafka.eagle.sql.distributed.enable")) {
if (KConstants.Protocol.KSQL_LOGICAL.equals(type)) {
result = JobClient.logicalSubmit(clusterAlias, sql, jobId);
} else if (KConstants.Protocol.KSQL_PHYSICS.equals(type)) {
Expand Down
3 changes: 1 addition & 2 deletions kafka-eagle-web/src/main/resources/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<directory>src/main/resources/bash/</directory>
<outputDirectory>/bin</outputDirectory>
<includes>
<include>*.sh</include>
<include>ke.sh</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
Expand All @@ -28,7 +28,6 @@
<includes>
<include>*.properties</include>
<include>*.conf</include>
<include>works</include>
</includes>
</fileSet>
<fileSet>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ kafka.eagle.metrics.retain=15
######################################
# kafka sql topic records max
######################################
kafka.eagle.distributed.enable=true
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.worknode.port=8787
kafka.eagle.sql.worknode.rpc.timeout=120000
kafka.eagle.sql.worknode.fetch.threshold=5000
kafka.eagle.sql.worknode.fetch.timeout=20000

######################################
# delete kafka topic token
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@
class="fab fa-maxcdn fa-sm fa-fw mr-1"></i>Multi-Clusters</a>
<a id="ke_navbar_cluster_zkcli" class="nav-link" href="/cluster/zkcli"><i
class="fas fa-terminal fa-code fa-sm fa-fw mr-1"></i>ZkCli</a>
<!--
<a id="ke_navbar_cluster_worknodes" class="nav-link" href="/cluster/worknodes"><i
class="fas fa-bezier-curve fa-sm fa-fw mr-1"></i>WorkNodes</a>
-->
</nav>
</div>
<a class="nav-link collapsed" href="#" data-toggle="collapse" data-target="#collapseMetrics"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
$(document).ready(function () {
var mime = 'text/x-mariadb';
// get mime type
if (window.location.href.indexOf('mime=') > -1) {
mime = window.location.href.substr(window.location.href.indexOf('mime=') + 5);
}
var sqlEditor = CodeMirror.fromTextArea(document.getElementById('code'), {
mode: mime,
indentWithTabs: true,
smartIndent: true,
lineNumbers: true,
matchBrackets: true,
autofocus: true,
extraKeys: {
"Alt-/": "autocomplete"
}
});
var logEditor = CodeMirror.fromTextArea(document.getElementById('job_info'), {
mode: mime,
indentWithTabs: true,
smartIndent: true,
lineNumbers: true,
matchBrackets: true,
autofocus: true,
readOnly: true
});
$('#result_tab li:eq(0) a').tab('show');
var triggerTask;
function getProgress(jobId) {
$.ajax({
type: 'get',
dataType: 'json',
url: '/shard/sub/scan/log/?jobId=' + jobId,
success: function (datas) {
console.log(datas)
if (datas != null) {
logEditor.setValue(datas.logs);
logEditor.execCommand("goDocEnd");
}
}
});
if (logEditor.getValue().indexOf("Time taken:") > -1) {
clearInterval(triggerTask);
}
}
var offset = 0;
function viewerTopics(sql, dataSets, jobId) {
// var ret = JSON.parse(dataSets);
var tabHeader = "<div class='panel-body table-responsive' id='div_children" + offset + "'><table id='result_children" + offset + "' class='table table-bordered table-hover' width='100%'><thead><tr>"
var mData = [];
var i = 0;
for (var key in dataSets) {
tabHeader += "<th>" + dataSets[key] + "</th>";
var obj = {
mData: dataSets[key]
};
mData.push(obj);
}
tabHeader += "</tr></thead></table></div>";
$("#result_textarea").append(tabHeader);
if (offset > 0) {
$("#div_children" + (offset - 1)).remove();
}

$("#result_children" + offset).dataTable({
"searching": false,
"bSort": false,
"retrieve": true,
"bLengthChange": false,
"bProcessing": true,
"bServerSide": true,
"fnServerData": retrieveData,
"sAjaxSource": '/topic/physics/commit/?sql=' + sql + '&jobId=' + jobId,
"aoColumns": mData
});

function retrieveData(sSource, aoData, fnCallback) {
$.ajax({
"type": "get",
"contentType": "application/json",
"url": sSource,
"dataType": "json",
"data": {
aoData: JSON.stringify(aoData)
},
"success": function (data) {
fnCallback(data)
}
});
}

offset++;
}

$(document).on('click', 'button[id=ke_ksql_query]', function () {
var sql = encodeURI(sqlEditor.getValue());
logEditor.setValue("");
var jobId = "job_id_" + new Date().getTime();
$.ajax({
type: 'get',
dataType: 'json',
url: '/topic/logical/commit/?sql=' + sql + '&jobId=' + jobId,
success: function (datas) {
if (datas != null) {
if (datas.error) {
logEditor.setValue(datas.msg);
viewerTopicSqlHistory();
} else {
triggerTask = setInterval(getProgress, 1000, jobId);
viewerTopics(sql, datas.columns, jobId);
}
}
}
});
});

var historyOffset = 0;

function viewerTopicSqlHistory() {
var thList = [{
th: "ID",
column: "id"
}, {
th: "User",
column: "username"
}, {
th: "Host",
column: "host"
}, {
th: "KSQL",
column: "ksql"
}, {
th: "Status",
column: "status"
}, {
th: "Spent",
column: "spendTime"
}, {
th: "Created",
column: "created"
}];
var ksqlTabHeader = "<div class='panel-body table-responsive' id='div_ksql_children" + historyOffset + "'><table id='result_ksql_children" + historyOffset + "' class='table table-bordered table-hover' width='100%'><thead><tr>"
var ksqlMData = [];
var i = 0;
for (var i = 0; i < thList.length; i++) {
ksqlTabHeader += "<th>" + thList[i].th + "</th>";
var obj = {
mData: thList[i].column
};
ksqlMData.push(obj);
}

ksqlTabHeader += "</tr></thead></table></div>";
$("#ksql_history_result_div").append(ksqlTabHeader);
if (historyOffset > 0) {
$("#div_ksql_children" + (historyOffset - 1)).remove();
} else {
$("#ksql_history_result0").remove("");
}

$("#result_ksql_children" + historyOffset).dataTable({
"bSort": false,
"retrieve": true,
"bLengthChange": false,
"bProcessing": true,
"bServerSide": true,
"fnServerData": retrieveData,
"sAjaxSource": '/topic/sql/history/ajax',
"aoColumns": ksqlMData
});

function retrieveData(sSource, aoData, fnCallback) {
$.ajax({
"type": "get",
"contentType": "application/json",
"url": sSource,
"dataType": "json",
"data": {
aoData: JSON.stringify(aoData)
},
"success": function (data) {
fnCallback(data)
}
});
}

historyOffset++;
}

$('a[data-toggle="tab"]').on('show.bs.tab', function (e) {
if (e.target.id.indexOf("ke_ksql_history_textarea") > -1) {
viewerTopicSqlHistory();
}
})

});
41 changes: 7 additions & 34 deletions kafka-eagle-web/src/main/webapp/media/js/main/topic/ksql.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,17 @@ $(document).ready(function () {
$('#result_tab li:eq(0) a').tab('show');
var triggerTask;
function getProgress(jobId) {
$.ajax({
type: 'get',
dataType: 'json',
url: '/shard/sub/scan/log/?jobId=' + jobId,
success: function (datas) {
console.log(datas)
if (datas != null) {
logEditor.setValue(datas.logs);
logEditor.execCommand("goDocEnd");
}
}
});
if (logEditor.getValue().indexOf("Time taken:") > -1) {
clearInterval(triggerTask);
}
}
var offset = 0;
function viewerTopics(sql, dataSets, jobId) {
// var ret = JSON.parse(dataSets);
var ret = JSON.parse(dataSets);
var tabHeader = "<div class='panel-body table-responsive' id='div_children" + offset + "'><table id='result_children" + offset + "' class='table table-bordered table-hover' width='100%'><thead><tr>"
var mData = [];
var i = 0;
for (var key in dataSets) {
tabHeader += "<th>" + dataSets[key] + "</th>";
for (var key in ret[0]) {
tabHeader += "<th>" + key + "</th>";
var obj = {
mData: dataSets[key]
mData: key
};
mData.push(obj);
}
Expand Down Expand Up @@ -111,11 +91,11 @@ $(document).ready(function () {
if (datas != null) {
if (datas.error) {
logEditor.setValue(datas.msg);
viewerTopicSqlHistory();
} else {
triggerTask = setInterval(getProgress, 1000, jobId);
viewerTopics(sql, datas.columns, jobId);
logEditor.setValue(datas.status);
viewerTopics(sql, datas.msg, jobId);
}
viewerTopicSqlHistory();
}
}
});
Expand Down Expand Up @@ -193,11 +173,4 @@ $(document).ready(function () {

historyOffset++;
}

$('a[data-toggle="tab"]').on('show.bs.tab', function (e) {
if (e.target.id.indexOf("ke_ksql_history_textarea") > -1) {
viewerTopicSqlHistory();
}
})

});
Loading

0 comments on commit 1fe6e64

Please sign in to comment.