Skip to content

Commit

Permalink
[Feature-2981][admin] Preview data in execution history (DataLinkDC#3572
Browse files Browse the repository at this point in the history
)

Co-authored-by: suxinshuo <suxinshuo@itiger.com>
Co-authored-by: suxinshuo <suxinshuo@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 14, 2024
1 parent 72c765d commit 2b9f627
Show file tree
Hide file tree
Showing 27 changed files with 567 additions and 50 deletions.
57 changes: 57 additions & 0 deletions dinky-admin/src/main/java/org/dinky/job/JobShutdownConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.job;

import org.dinky.context.TenantContextHolder;
import org.dinky.data.result.ResultPool;

import java.util.List;

import javax.annotation.PreDestroy;

import org.springframework.stereotype.Component;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;

/**
* JobShutdownConfig.
*
* @since 2024/6/3 18:09
*/
@Slf4j
@Component
public class JobShutdownConfig {

@PreDestroy
public void destroy() {
log.info("Job shutdown.");
List<String> jobIds = ResultPool.getJobIds();
if (CollectionUtil.isEmpty(jobIds)) {
log.info("Result pool is empty.");
return;
}
// set job result status to destroyed
jobIds.stream().map(ResultPool::get).forEach(jobResult -> jobResult.setDestroyed(Boolean.TRUE));
JobHandler jobHandler = JobHandler.build();
TenantContextHolder.ignoreTenant();
jobHandler.persistResultData(jobIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.dinky.daemon.pool.FlinkJobThreadPool;
import org.dinky.daemon.task.DaemonTask;
import org.dinky.daemon.task.DaemonTaskConfig;
import org.dinky.data.constant.MysqlConstant;
import org.dinky.data.dto.ClusterInstanceDTO;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.enums.JobStatus;
Expand All @@ -34,8 +35,11 @@
import org.dinky.data.model.job.JobInstance;
import org.dinky.data.model.mapping.ClusterConfigurationMapping;
import org.dinky.data.model.mapping.ClusterInstanceMapping;
import org.dinky.data.result.ResultPool;
import org.dinky.data.result.SelectResult;
import org.dinky.job.FlinkJobTask;
import org.dinky.job.Job;
import org.dinky.job.JobReadHandler;
import org.dinky.service.ClusterConfigurationService;
import org.dinky.service.ClusterInstanceService;
import org.dinky.service.HistoryService;
Expand All @@ -44,14 +48,21 @@
import org.dinky.service.TaskService;

import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.springframework.context.annotation.DependsOn;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;

/**
* Job2MysqlHandler
*
* @since 2021/6/27 0:04
*/
@Slf4j
@DependsOn("springContextUtils")
public class Job2MysqlHandler extends AbsJobHandler {

Expand Down Expand Up @@ -230,4 +241,43 @@ public boolean callback() {
public boolean close() {
return true;
}

/**
* Persistent storage of result data into mysql.
*/
@Override
public void persistResultData(List<String> jobIds) {
if (CollectionUtil.isEmpty(jobIds)) {
return;
}
List<History> historyList = jobIds.stream()
.map(jobIdStr -> {
Integer jobId = Integer.parseInt(jobIdStr);
SelectResult selectResult = ResultPool.get(jobIdStr);
if (Objects.isNull(selectResult)) {
log.info("The result data does not exist. Job id: {}", jobId);
return null;
}
String resultJsonStr = selectResult.toTruncateJson(MysqlConstant.MEDIUMTEXT_MAX_LENGTH);
History history = new History();
history.setId(jobId);
history.setResult(resultJsonStr);
return history;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
historyService.updateBatchById(historyList);
log.info("The result data persistence to MySQL was successful. Job ids: {}", jobIds);
}

/**
* Get the read handler.
* Each handler that executes a job should have a corresponding read handler.
*
* @return JobReadHandler
*/
@Override
public JobReadHandler getReadHandler() {
return new JobReadMysqlHandler();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.job.handler;

import org.dinky.context.SpringContextUtils;
import org.dinky.data.model.job.History;
import org.dinky.data.result.SelectResult;
import org.dinky.job.JobReadHandler;
import org.dinky.service.HistoryService;

import org.apache.commons.lang3.StringUtils;

import java.util.Objects;

import org.springframework.context.annotation.DependsOn;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;

/**
* JobReadMysqlHandler.
*
* @since 2024/5/31 11:41
*/
@Slf4j
@DependsOn("springContextUtils")
public class JobReadMysqlHandler implements JobReadHandler {

private static final HistoryService historyService;

static {
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
}

/**
* Read result data from mysql.
*
* @param jobId job id
* @return result data
*/
@Override
public SelectResult readResultDataFromStorage(Integer jobId) {
History history = historyService.getById(jobId);
if (Objects.isNull(history)) {
return SelectResult.buildFailed();
}
String result = history.getResult();
if (StringUtils.isBlank(result)) {
return SelectResult.buildFailed();
}
return JSONUtil.toBean(result, SelectResult.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public boolean cancelTaskJob(TaskDTO task, boolean withSavePoint, boolean forceC
}
isSuccess = true;
} catch (Exception e) {
log.warn("Stop with savcePoint failed: {}, will try normal rest api stop", e.getMessage());
log.warn("Stop with savePoint failed: {}, will try normal rest api stop", e.getMessage());
isSuccess = jobManager.cancelNormal(jobInstance.getJid());
}
jobInstanceService.refreshJobInfoDetail(jobInstance.getId(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ update dinky_task set first_level_owner = creator;

ALTER TABLE dinky_history ALTER COLUMN statement SET DATA TYPE LONGVARCHAR ;

ALTER TABLE dinky_history ALTER COLUMN result SET DATA TYPE LONGVARCHAR ;

ALTER TABLE dinky_task ALTER COLUMN statement SET DATA TYPE LONGVARCHAR ;

ALTER TABLE dinky_task_version ALTER COLUMN statement SET DATA TYPE LONGVARCHAR ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ ALTER TABLE dinky_task DROP COLUMN `first_level_owner`;
ALTER TABLE dinky_task DROP COLUMN `second_level_owners`;

ALTER TABLE dinky_history CHANGE COLUMN `statement` `statement` longtext DEFAULT NULL COMMENT 'statement set';
ALTER TABLE dinky_history CHANGE COLUMN `result` `result` text DEFAULT NULL COMMENT 'result set';
ALTER TABLE dinky_task CHANGE COLUMN `statement` `statement` longtext DEFAULT NULL COMMENT 'sql statement';
ALTER TABLE dinky_task_version CHANGE COLUMN `statement` `statement` longtext DEFAULT NULL COMMENT 'flink sql statement';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ ALTER TABLE dinky_alert_template MODIFY COLUMN `name` varchar(20) CHARACTER SET

ALTER TABLE dinky_history CHANGE COLUMN `statement` `statement` mediumtext DEFAULT NULL COMMENT 'statement set';

ALTER TABLE dinky_history CHANGE COLUMN `result` `result` mediumtext DEFAULT NULL COMMENT 'result set';

ALTER TABLE dinky_task CHANGE COLUMN `statement` `statement` mediumtext DEFAULT NULL COMMENT 'sql statement';

ALTER TABLE dinky_task_version CHANGE COLUMN `statement` `statement` mediumtext DEFAULT NULL COMMENT 'flink sql statement';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.data.constant;

/**
* MysqlConstant
*
* @since 2024/5/30 15:36
*/
public class MysqlConstant {

public static final Long MEDIUMTEXT_MAX_LENGTH = 16777215L;
}
12 changes: 12 additions & 0 deletions dinky-core/src/main/java/org/dinky/data/result/ResultBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.dinky.data.result;

import org.dinky.job.JobHandler;
import org.dinky.parser.SqlType;

import org.apache.flink.table.api.TableResult;
Expand Down Expand Up @@ -53,4 +54,15 @@ static ResultBuilder build(
}

IResult getResult(TableResult tableResult);

/**
* Get the results and store them persistently.
*
* @param tableResult table result
* @param jobHandler job handler
* @return IResult
*/
default IResult getResultWithPersistence(TableResult tableResult, JobHandler jobHandler) {
return getResult(tableResult);
}
}
35 changes: 24 additions & 11 deletions dinky-core/src/main/java/org/dinky/data/result/ResultPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,59 @@

package org.dinky.data.result;

import java.util.concurrent.TimeUnit;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import cn.hutool.cache.Cache;
import cn.hutool.cache.impl.TimedCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

/**
* ResultPool
*
* @since 2021/7/1 22:20
*/
@Slf4j
public final class ResultPool {

private ResultPool() {}

private static final Cache<String, SelectResult> results = new TimedCache<>(TimeUnit.MINUTES.toMillis(10));
private static final Map<String, SelectResult> RESULTS = Maps.newConcurrentMap();

public static boolean containsKey(String key) {
return results.containsKey(key);
return RESULTS.containsKey(key);
}

public static void put(SelectResult result) {
results.put(result.getJobId(), result);
RESULTS.put(result.getJobId(), result);
log.info("Put job result into cache. Job id: {}", result.getJobId());
log.info("Number of results in the running: {}", RESULTS.size());
}

public static SelectResult get(String key) {
if (containsKey(key)) {
return results.get(key);
SelectResult selectResult = RESULTS.get(key);
if (Objects.nonNull(selectResult)) {
return selectResult;
}
return SelectResult.buildDestruction(key);
}

public static boolean remove(String key) {
if (results.containsKey(key)) {
results.remove(key);
log.info("Remove job result from cache. Job id: {}", key);
if (RESULTS.containsKey(key)) {
RESULTS.remove(key);
return true;
}
return false;
}

public static void clear() {
results.clear();
RESULTS.clear();
}

public static List<String> getJobIds() {
return Lists.newArrayList(RESULTS.keySet());
}
}
Loading

0 comments on commit 2b9f627

Please sign in to comment.