Skip to content

Commit

Permalink
[INLONG-10763][Manager] Fix the mutual influence of version for group…
Browse files Browse the repository at this point in the history
… and schdule (apache#10765)

Co-authored-by: Aloys Zhang <aloyszhang@apche.org>
  • Loading branch information
aloyszhang and Aloys Zhang authored Aug 9, 2024
1 parent d8c2bf4 commit d56f47e
Showing 1 changed file with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,24 +309,18 @@ public InlongGroupInfo get(String groupId) {
groupInfo.setSortConf(sortConf);
if (DATASYNC_OFFLINE_MODE.equals(entity.getInlongGroupMode())) {
// get schedule info and set into group info
addScheduleInfo(entity, groupInfo);
fillInScheduleInfo(entity, groupInfo);
}
LOGGER.debug("success to get inlong group for groupId={}", groupId);
return groupInfo;
}

private void addScheduleInfo(InlongGroupEntity entity, InlongGroupInfo groupInfo) {
checkOfflineSyncScheduleExist(entity);
ScheduleInfo scheduleInfo = scheduleOperator.getScheduleInfo(entity.getInlongGroupId());
CommonBeanUtils.copyProperties(scheduleInfo, groupInfo);
}

private void checkOfflineSyncScheduleExist(InlongGroupEntity entity) {
// check schedule info for offline sync
if (!isScheduleInfoExist(entity)) {
String errorMsg = String.format("Schedule info not found for groupId=%s", entity.getInlongGroupId());
LOGGER.error(errorMsg);
throw new BusinessException(ErrorCodeEnum.SCHEDULE_NOT_FOUND, errorMsg);
private void fillInScheduleInfo(InlongGroupEntity entity, InlongGroupInfo groupInfo) {
if (isScheduleInfoExist(entity)) {
ScheduleInfo scheduleInfo = scheduleOperator.getScheduleInfo(entity.getInlongGroupId());
int groupVersion = groupInfo.getVersion();
CommonBeanUtils.copyProperties(scheduleInfo, groupInfo);
groupInfo.setVersion(groupVersion);
}
}

Expand Down Expand Up @@ -524,8 +518,11 @@ public String update(InlongGroupRequest request, String operator) {
// save schedule info for offline group
if (DATASYNC_OFFLINE_MODE.equals(request.getInlongGroupMode())) {
constrainStartAndEndTime(request);
scheduleOperator.updateAndRegister(CommonBeanUtils.copyProperties(request, ScheduleInfoRequest::new),
operator);
ScheduleInfoRequest scheduleRequest = CommonBeanUtils.copyProperties(request, ScheduleInfoRequest::new);
if (scheduleOperator.scheduleInfoExist(groupId)) {
scheduleRequest.setVersion(scheduleOperator.getScheduleInfo(groupId).getVersion());
}
scheduleOperator.updateAndRegister(scheduleRequest, operator);
}

LOGGER.info("success to update inlong group for groupId={} by user={}", groupId, operator);
Expand Down

0 comments on commit d56f47e

Please sign in to comment.