Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #381

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions nacossync-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<!-- 默认使用HikariCP连接池 -->
<dependency>
Expand Down Expand Up @@ -166,7 +167,15 @@
<configuration>
<source>17</source>
<target>17</target>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
</path>
</annotationProcessorPaths>
</configuration>

</plugin>
</plugins>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package com.alibaba.nacossync;

import com.alibaba.nacossync.util.BatchTaskExecutor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;

/**
* @author NacosSync
Expand All @@ -30,6 +32,14 @@ public class NacosSyncMain {

public static void main(String[] args) {

SpringApplication.run(NacosSyncMain.class, args);
ConfigurableApplicationContext context = SpringApplication.run(NacosSyncMain.class, args);

// Register shutdown callback using Spring Boot's context lifecycle
context.registerShutdownHook();
context.addApplicationListener(event -> {
if (event instanceof org.springframework.context.event.ContextClosedEvent) {
BatchTaskExecutor.shutdown();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,55 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.function.Consumer;

@Slf4j
public class BatchTaskExecutor {

private static final int MAX_THREAD_NUM = 200;
private static final ExecutorService executorService = Executors.newFixedThreadPool(MAX_THREAD_NUM);

/**
* 批量操作方法
* Batch operation method
*
* @param items 任务列表
* @param operation 要执行的操作
* @param items Task list
* @param operation Operation to be executed
*/
public static void batchOperation(List<TaskDO> items, Consumer<TaskDO> operation) {
Stopwatch stopwatch = Stopwatch.createStarted();

List<Tuple<Integer, List<TaskDO>>> taskGroupList = averageAssign(items, MAX_THREAD_NUM);

// 创建一个包含所有任务的 CompletableFuture
// Create a CompletableFuture for each task group
CompletableFuture<?>[] futures = taskGroupList.stream().map(tuple -> CompletableFuture.runAsync(() -> {
for (TaskDO taskDO : tuple.getT2()) {
operation.accept(taskDO);
try {
// Add timeout control for each task to avoid long-running tasks
CompletableFuture.runAsync(() -> operation.accept(taskDO), executorService)
.orTimeout(5, TimeUnit.SECONDS) // Task timeout set to 5 seconds
.exceptionally(ex -> {
log.error("Task execution timed out: {}", taskDO.getServiceName(), ex);
return null;
}).join();
} catch (Exception e) {
log.error("Error occurred during task execution: {}", taskDO.getServiceName(), e);
}
}
}, executorService)).toArray(CompletableFuture[]::new);

try {
// 等待所有任务完成
// Wait for all tasks to complete
CompletableFuture.allOf(futures).join();
} catch (Exception e) {
log.error("Error occurred during sync operation", e);
} finally {
log.info("Total sync tasks: {}, Execution time: {} ms", items.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}

log.debug("Total sync tasks: {}, Execution time: {} ms", items.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}


/**
* // Divide a list into n sublists, mainly implemented by offset
* Divide a list into n sublists, mainly implemented by offset
* @param source collection to be divided
* @param limit maximum value
* @return list after division
Expand All @@ -59,19 +66,44 @@ private static <T> List<Tuple<Integer, List<T>>> averageAssign(List<T> source, i
if (CollectionUtils.isEmpty(source)) {
return Collections.emptyList();
}

int size = source.size();
int listCount = (int) Math.ceil((double) size / limit); // Calculate the number of sublist
int listCount = (int) Math.ceil((double) size / limit); // Calculate the number of sublists
int remainder = size % listCount; // Calculate the number of remaining elements after even distribution
List<Tuple<Integer, List<T>>> result = new ArrayList<>(listCount); // Initialize the result list with the expected size

for (int i = 0, assigned = 0; i < listCount; i++) {
int sublistSize = size / listCount + (remainder-- > 0 ? 1 : 0); // Determine the size of each sublist, distribute remaining elements
List<T> sublist = new ArrayList<>(source.subList(assigned, assigned + sublistSize)); // Create the sublist
result.add(Tuple.of(i, sublist)); // Add the sublist to the result
assigned += sublistSize; // Update the assigned index
}

return result;
}
}

/**
* Shutdown the executor service to avoid resource leakage
*/
public static void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Executor service did not terminate");
}
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

// Changes Made:
// 1. Added a timeout control for each task execution to avoid long-running tasks (`CompletableFuture.runAsync().orTimeout(5, TimeUnit.SECONDS)`).
// 2. Improved error handling by logging specific task information when a timeout or exception occurs.
// 3. Changed logging level from `debug` to `info` for the summary of task execution to better track in production.
// 4. Added a `shutdown()` method to properly manage the lifecycle of the executor service and prevent resource leakage.
// 5. Minor code simplifications to improve readability.
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.alibaba.nacossync.util;

import com.alibaba.nacos.client.naming.utils.CollectionUtils;
Expand All @@ -20,16 +8,21 @@
import java.util.stream.Collectors;

/**
* @author paderlol
* @date: 2019-04-25 00:01
* Utility class for handling Consul metadata.
*/
public class ConsulUtils {
public static Map<String, String> transferMetadata(List<String> tags) {
Map<String, String> metadata = new HashMap<>();
if (!CollectionUtils.isEmpty(tags)) {
return tags.stream().filter(tag -> tag.split("=", -1).length == 2).map(tag -> tag.split("=", -1))
.collect(Collectors.toMap(tagSplitArray -> tagSplitArray[0], tagSplitArray -> tagSplitArray[1]));
if (CollectionUtils.isEmpty(tags)) {
return new HashMap<>();
}
return metadata;

return tags.stream()
.map(tag -> tag.split("=", -1))
.filter(tagArray -> tagArray.length == 2)
.collect(Collectors.toMap(
tagArray -> tagArray[0],
tagArray -> tagArray[1],
(existing, replacement) -> existing // In case of duplicate keys, keep the existing value
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
* @author paderlol
Expand Down Expand Up @@ -49,45 +48,51 @@ public final class DubboConstants {

public static final String DUBBO_ROOT_PATH = "/dubbo";
public static final String ALL_SERVICE_NAME_PATTERN = "*";

/**
* if Dubbo version greater than 2.7.2, service name is providers:interface:version:
* if Dubbo version less than 2.7.2, service name is providers:interface:version
* @param queryParam
* @return
* Creates a service name based on Dubbo version compatibility.
* if Dubbo version greater than 2.7.2, service name is providers:interface:version:
* if Dubbo version less than 2.7.2, service name is providers:interface:version
*
* @param queryParam the query parameters that include keys such as interface, version, group, etc.
* @return the constructed service name string
*/
public static String createServiceName(Map<String, String> queryParam) {

String group = queryParam.get(GROUP_KEY);
String release = queryParam.get(RELEASE_KEY);
Predicate<String> isBlankGroup = StringUtils::isBlank;
Predicate<String> isNotBlankRelease = StringUtils::isNotBlank;
String serviceName = Joiner.on(SEPARATOR_KEY).skipNulls().join(CATALOG_KEY, queryParam.get(INTERFACE_KEY),
queryParam.get(VERSION_KEY), group);

//TODO The code here is to deal with service metadata format problems caused by dubbo version incompatibility
if (isBlankGroup.test(group) && isNotBlankRelease.test(release)) {

String baseServiceName = Joiner.on(SEPARATOR_KEY).skipNulls().join(CATALOG_KEY, queryParam.get(INTERFACE_KEY),
queryParam.get(VERSION_KEY), group);

if (StringUtils.isBlank(group) && StringUtils.isNotBlank(release)) {
List<String> versions = Splitter.on(RELEASE_SEPARATOR_KEY).splitToList(release);
if (!CollectionUtils.isEmpty(versions) && versions.size() >= DUBBO_VERSION_INDEX) {
String firstVersion = versions.get(0);
String secondVersion = versions.get(1);
if (DUBBO_VERSION_INDEX == Integer.parseInt(firstVersion)) {
if (MIDDLE_DUBBO_VERSION_INDEX <= versions.size()) {
String thirdVersion = versions.get(2);
BigDecimal bigDecimal =
new BigDecimal(Joiner.on(RELEASE_SEPARATOR_KEY).join(secondVersion, thirdVersion));
if (bigDecimal.compareTo(COMPARE_NUMBER) > 0) {
serviceName = serviceName.concat(SEPARATOR_KEY);
}
} else if (versions.size() == DUBBO_VERSION_INDEX && Integer.parseInt(secondVersion) > 7) {
serviceName = serviceName.concat(SEPARATOR_KEY);
}
} else if (MIN_DUBBO_VERSION < Integer.parseInt(firstVersion)) {
serviceName = serviceName.concat(SEPARATOR_KEY);
BigDecimal bigDecimal = new BigDecimal(Joiner.on(RELEASE_SEPARATOR_KEY).join(secondVersion,
versions.size() > 2 ? versions.get(2) : "0"));
if (isVersionRequiresSeparator(firstVersion, secondVersion, bigDecimal)) {
baseServiceName = baseServiceName.concat(SEPARATOR_KEY);
}
}
}
return serviceName;
return baseServiceName;
}

/**
* Checks if the version requires a separator to be appended to the service name.
*
* @param firstVersion the major version
* @param secondVersion the minor version
* @param bigDecimal the version number as BigDecimal
* @return true if separator should be added, otherwise false
*/
private static boolean isVersionRequiresSeparator(String firstVersion, String secondVersion, BigDecimal bigDecimal) {
int majorVersion = Integer.parseInt(firstVersion);
int minorVersion = Integer.parseInt(secondVersion);

return (DUBBO_VERSION_INDEX == majorVersion && (MIDDLE_DUBBO_VERSION_INDEX <= minorVersion ||
bigDecimal.compareTo(COMPARE_NUMBER) > 0)) || (MIN_DUBBO_VERSION < majorVersion);
}

}
Loading