Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ build-%: %-docker-build
@:

.PHONY: build
build: database-docker-build backend-docker-build frontend-docker-build runtime-docker-build backend-python-docker-build
build: database-docker-build gateway-docker-build backend-docker-build frontend-docker-build runtime-docker-build backend-python-docker-build

# ========== Utility Targets ==========

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import com.datamate.datamanagement.domain.model.dataset.Dataset;
import com.datamate.datamanagement.domain.model.dataset.DatasetFile;
import com.datamate.datamanagement.interfaces.dto.CreateDatasetRequest;
import com.datamate.operator.domain.repository.OperatorRepository;
import com.datamate.operator.infrastructure.exception.OperatorErrorCode;
import com.datamate.operator.interfaces.dto.OperatorDto;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.yaml.snakeyaml.DumperOptions;
Expand All @@ -39,6 +46,8 @@
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand All @@ -52,6 +61,8 @@

private final OperatorInstanceRepository operatorInstanceRepo;

private final OperatorRepository operatorRepo;

private final CleaningResultRepository cleaningResultRepo;

private final CleaningTaskScheduler taskScheduler;
Expand All @@ -66,11 +77,16 @@

private final String FLOW_PATH = "/flow";

private final Pattern LEVEL_PATTERN = Pattern.compile(
"\\b(TRACE|DEBUG|INFO|WARN|WARNING|ERROR|FATAL)\\b",
Pattern.CASE_INSENSITIVE
private static final Pattern STANDARD_LEVEL_PATTERN = Pattern.compile(
"\\b(DEBUG|Debug|INFO|Info|WARN|Warn|WARNING|Warning|ERROR|Error|FATAL|Fatal)\\b"
);

private static final Pattern EXCEPTION_SUFFIX_PATTERN = Pattern.compile(
"\\b\\w+(Warning|Error|Exception)\\b"
);

private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public List<CleaningTaskDto> getTasks(String status, String keywords, Integer page, Integer size) {
List<CleaningTaskDto> tasks = cleaningTaskRepo.findTasks(status, keywords, page, size);
tasks.forEach(this::setProcess);
Expand Down Expand Up @@ -133,6 +149,7 @@
}

public List<CleaningTaskLog> getTaskLog(String taskId) {
cleanTaskValidator.checkTaskId(taskId);
String logPath = FLOW_PATH + "/" + taskId + "/output.log";
try (Stream<String> lines = Files.lines(Paths.get(logPath))) {
List<CleaningTaskLog> logs = new ArrayList<>();
Expand All @@ -156,18 +173,31 @@
return defaultLevel;
}

Matcher matcher = LEVEL_PATTERN.matcher(logLine);
if (matcher.find()) {
return matcher.group(1).toUpperCase();
Matcher stdMatcher = STANDARD_LEVEL_PATTERN.matcher(logLine);
if (stdMatcher.find()) {
return stdMatcher.group(1).toUpperCase();
}

Matcher exMatcher = EXCEPTION_SUFFIX_PATTERN.matcher(logLine);
if (exMatcher.find()) {
String match = exMatcher.group(1).toUpperCase();
if ("WARNING".equals(match)) return "WARN";
if ("ERROR".equals(match) || "EXCEPTION".equals(match)) return "ERROR";
}
return defaultLevel;
}

@Transactional
public void deleteTask(String taskId) {
cleanTaskValidator.checkTaskId(taskId);
cleaningTaskRepo.deleteTaskById(taskId);
operatorInstanceRepo.deleteByInstanceId(taskId);
cleaningResultRepo.deleteByInstanceId(taskId);
try {
FileUtils.deleteDirectory(new File(FLOW_PATH + "/" + taskId));
} catch (IOException e) {
log.warn("Can't delete flow path with task id: {}.", taskId, e);
}
}

public void executeTask(String taskId) {
Expand All @@ -180,14 +210,26 @@
}

private void prepareTask(CleaningTaskDto task, List<OperatorInstanceDto> instances) {
List<OperatorDto> allOperators = operatorRepo.findAllOperators();
Map<String, OperatorDto> defaultSettings = allOperators.stream()
.filter(operatorDto -> StringUtils.isNotBlank(operatorDto.getSettings()))
.collect(Collectors.toMap(OperatorDto::getId, Function.identity()));

TaskProcess process = new TaskProcess();
process.setInstanceId(task.getId());
process.setDatasetId(task.getDestDatasetId());
process.setDatasetPath(FLOW_PATH + "/" + task.getId() + "/dataset.jsonl");
process.setExportPath(DATASET_PATH + "/" + task.getDestDatasetId());
process.setExecutorType(ExecutorType.DATAMATE.getValue());
process.setProcess(instances.stream()
.map(instance -> Map.of(instance.getId(), instance.getOverrides()))
.map(instance -> {
OperatorDto operatorDto = defaultSettings.get(instance.getId());
Map<String, Object> stringObjectMap = getDefaultValue(operatorDto);
stringObjectMap.putAll(instance.getOverrides());
Map<String, Object> runtime = getRuntime(operatorDto);
stringObjectMap.putAll(runtime);
return Map.of(instance.getId(), stringObjectMap);
})
.toList());

ObjectMapper jsonMapper = new ObjectMapper(new YAMLFactory());
Expand All @@ -210,67 +252,113 @@
}
}

private void scanDataset(String taskId, String srcDatasetId) {
int pageNumber = 0;
int pageSize = 500;
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
PagedResponse<DatasetFile> datasetFiles;
do {
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
private Map<String, Object> getDefaultValue(OperatorDto operatorDto) {
if (StringUtils.isBlank(operatorDto.getSettings())) {
return new HashMap<>();
}

Map<String, Object> defaultSettings = new HashMap<>();
try {
Map<String, Map<String, Object>> settings = OBJECT_MAPPER.readValue(operatorDto.getSettings(), Map.class);
for (Map.Entry<String, Map<String, Object>> entry : settings.entrySet()) {
String key = entry.getKey();
Map<String, Object> setting = entry.getValue();
String type = setting.get("type").toString();
switch (type) {
case "slider":
case "switch":
case "select":
case "input":
case "radio":
case "checkbox":
if (setting.containsKey("defaultVal")) {
defaultSettings.put(key, setting.get("defaultVal"));
}
break;
case "range":
List<Object> rangeDefault = getRangeDefault(setting);
if (CollectionUtils.isNotEmpty(rangeDefault)) {
defaultSettings.put(key, rangeDefault);
}
break;
default:
}
}
List<Map<String, Object>> files = datasetFiles.getContent().stream()
.map(content -> Map.of("fileName", (Object) content.getFileName(),
"fileSize", content.getFileSize(),
"filePath", content.getFilePath(),
"fileType", content.getFileType(),
"fileId", content.getId()))
.toList();
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
pageNumber += 1;
} while (pageNumber < datasetFiles.getTotalPages());
return defaultSettings;
} catch (JsonProcessingException e) {
throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage());
}
}

private List<Object> getRangeDefault(Map<String, Object> setting) {
List<Object> defaultValue = new ArrayList<>();
Object properties = setting.get("properties");
if (properties instanceof List<?> list) {
for (Object o : list) {
Map<String, Object> map = OBJECT_MAPPER.convertValue(o, Map.class);
if (map.containsKey("defaultVal")) {
defaultValue.add(map.get("defaultVal"));
}
}
}
return defaultValue;
}

private Map<String, Object> getRuntime(OperatorDto operatorDto) {
if (StringUtils.isBlank(operatorDto.getRuntime())) {
return new HashMap<>();
}
try {
return OBJECT_MAPPER.readValue(operatorDto.getRuntime(), Map.class);
} catch (JsonProcessingException e) {
throw BusinessException.of(OperatorErrorCode.SETTINGS_PARSE_FAILED, e.getMessage());
}
}

private void scanDataset(String taskId, String srcDatasetId) {
doScan(taskId, srcDatasetId, file -> true);
}

private void scanDataset(String taskId, String srcDatasetId, Set<String> succeedFiles) {
doScan(taskId, srcDatasetId, file -> !succeedFiles.contains(file.getId()));
}

private void doScan(String taskId, String srcDatasetId, Predicate<DatasetFile> filterCondition) {
cleanTaskValidator.checkTaskId(taskId);
String targetFilePath = FLOW_PATH + "/" + taskId + "/dataset.jsonl";
File targetFile = new File(targetFilePath);
if (targetFile.getParentFile() != null && !targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
}

int pageNumber = 0;
int pageSize = 500;
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
PagedResponse<DatasetFile> datasetFiles;
do {
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null,null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
}
List<Map<String, Object>> files = datasetFiles.getContent().stream()
.filter(content -> !succeedFiles.contains(content.getId()))
.map(content -> Map.of("fileName", (Object) content.getFileName(),
try (BufferedWriter writer = new BufferedWriter(new FileWriter(targetFile))) {
PagedResponse<DatasetFile> datasetFiles;
do {
PagingQuery pageRequest = new PagingQuery(pageNumber, pageSize);
datasetFiles = datasetFileService.getDatasetFiles(srcDatasetId, null, null, null, pageRequest);
if (datasetFiles.getContent().isEmpty()) {
break;
}
for (DatasetFile content : datasetFiles.getContent()) {
if (!filterCondition.test(content)) {
continue;
}
Map<String, Object> fileMap = Map.of(
"fileName", content.getFileName(),
"fileSize", content.getFileSize(),
"filePath", content.getFilePath(),
"fileType", content.getFileType(),
"fileId", content.getId()))
.toList();
writeListMapToJsonlFile(files, FLOW_PATH + "/" + taskId + "/dataset.jsonl");
pageNumber += 1;
} while (pageNumber < datasetFiles.getTotalPages());
}

private void writeListMapToJsonlFile(List<Map<String, Object>> mapList, String fileName) {
ObjectMapper objectMapper = new ObjectMapper();

try (BufferedWriter writer = new BufferedWriter(new FileWriter(fileName))) {
if (!mapList.isEmpty()) { // 检查列表是否为空,避免异常
String jsonString = objectMapper.writeValueAsString(mapList.getFirst());
writer.write(jsonString);

for (int i = 1; i < mapList.size(); i++) {
"fileId", content.getId()
);
writer.write(OBJECT_MAPPER.writeValueAsString(fileMap));
writer.newLine();
jsonString = objectMapper.writeValueAsString(mapList.get(i));
writer.write(jsonString);
}
}
pageNumber++;
} while (pageNumber < datasetFiles.getTotalPages());
} catch (IOException e) {
log.error("Failed to prepare dataset.jsonl.", e);
log.error("Failed to write dataset.jsonl for taskId: {}", taskId, e);
throw BusinessException.of(SystemErrorCode.FILE_SYSTEM_ERROR);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@
import com.datamate.cleaning.domain.repository.CleaningTaskRepository;
import com.datamate.cleaning.interfaces.dto.OperatorInstanceDto;
import com.datamate.common.infrastructure.exception.BusinessException;
import com.datamate.common.infrastructure.exception.SystemErrorCode;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;


@Component
@RequiredArgsConstructor
public class CleanTaskValidator {
private final CleaningTaskRepository cleaningTaskRepo;

private final Pattern UUID_PATTERN = Pattern.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
);

public void checkNameDuplication(String name) {
if (cleaningTaskRepo.isNameExist(name)) {
throw BusinessException.of(CleanErrorCode.DUPLICATE_TASK_NAME);
Expand All @@ -39,4 +45,10 @@ public void checkInputAndOutput(List<OperatorInstanceDto> operators) {
front.getName(), back.getName()));
}
}

public void checkTaskId(String id) {
if (id == null || !UUID_PATTERN.matcher(id).matches()) {
throw BusinessException.of(SystemErrorCode.INVALID_PARAMETER);
}
}
}
2 changes: 1 addition & 1 deletion deployment/helm/datamate/charts/ray-cluster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ worker:
groupName: workergroup
replicas: 1
minReplicas: 1
maxReplicas: 3
maxReplicas: 1
labels: {}
serviceAccountName: ""
restartPolicy: ""
Expand Down
Loading
Loading