Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5727 service-1] improve the code quality of nacos-config #5744

Merged
merged 5 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private void initAllCapacity(boolean isTenant) {
*
* @param counterMode increase or decrease mode.
* @param ignoreQuotaLimit ignoreQuotaLimit flag.
* @return
* @return the result of update cluster usage.
*/
public boolean insertAndUpdateClusterUsage(CounterMode counterMode, boolean ignoreQuotaLimit) {
Capacity capacity = groupCapacityPersistService.getClusterCapacity();
Expand Down Expand Up @@ -295,13 +295,12 @@ private void autoExpansion(String group, String tenant) {
int finalQuota = (int) (usage + defaultQuota * (1.0 * initialExpansionPercent / 100));
if (tenant != null) {
tenantCapacityPersistService.updateQuota(tenant, finalQuota);
LogUtil.DEFAULT_LOG
.warn("[capacityManagement] 初始化的时候该租户({})使用量({})就已经到达限额{},自动扩容到{}", tenant, usage, defaultQuota,
finalQuota);
LogUtil.DEFAULT_LOG.warn("[capacityManagement] The usage({}) already reach the upper limit({}) when init the tenant({}), "
+ "automatic upgrade to ({})", usage, defaultQuota, tenant, finalQuota);
} else {
groupCapacityPersistService.updateQuota(group, finalQuota);
LogUtil.DEFAULT_LOG.warn("[capacityManagement] 初始化的时候该Group({})使用量({})就已经到达限额{},自动扩容到{}", group, usage,
defaultQuota, finalQuota);
LogUtil.DEFAULT_LOG.warn("[capacityManagement] The usage({}) already reach the upper limit({}) when init the group({}), "
+ "automatic upgrade to ({})", usage, defaultQuota, group, finalQuota);
}
}
}
Expand Down Expand Up @@ -400,7 +399,7 @@ private boolean insertGroupCapacity(String group, Integer quota, Integer maxSize
groupCapacity.setGmtModified(now);
return groupCapacityPersistService.insertGroupCapacity(groupCapacity);
} catch (DuplicateKeyException e) {
// 并发情况下同时insert会出现,ignore
// this exception will meet when concurrent insert,ignore it
LogUtil.DEFAULT_LOG.warn("group: {}, message: {}", group, e.getMessage());
}
return false;
Expand Down Expand Up @@ -502,7 +501,7 @@ private boolean insertTenantCapacity(String tenant, Integer quota, Integer maxSi
tenantCapacity.setGmtModified(now);
return tenantCapacityPersistService.insertTenantCapacity(tenantCapacity);
} catch (DuplicateKeyException e) {
// 并发情况下同时insert会出现,ignore
// this exception will meet when concurrent insert,ignore it
LogUtil.DEFAULT_LOG.warn("tenant: {}, message: {}", tenant, e.getMessage());
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void init() {
try {
reload();
} catch (IOException e) {
e.printStackTrace();
FATAL_LOG.error("[ExternalDataSourceService] dats source reload error", e);
throw new RuntimeException(DB_LOAD_ERROR_MSG);
}

Expand Down Expand Up @@ -219,7 +219,7 @@ public void run() {
masterIndex = index;
break;
} catch (DataAccessException e) { // read only
e.printStackTrace(); // TODO remove
FATAL_LOG.warn("[master-db] master db access error", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,16 @@ public class LocalDataSourceServiceImpl implements DataSourceService {
@PostConstruct
@Override
public synchronized void init() throws Exception {
if (!PropertyUtil.isUseExternalDB()) {
if (!initialize) {
LogUtil.DEFAULT_LOG.info("use local db service for init");
final String jdbcUrl =
"jdbc:derby:" + Paths.get(EnvUtil.getNacosHome(), derbyBaseDir).toString()
+ ";create=true";
initialize(jdbcUrl);
initialize = true;
}
if (PropertyUtil.isUseExternalDB()) {
return;
}
if (!initialize) {
LogUtil.DEFAULT_LOG.info("use local db service for init");
final String jdbcUrl =
"jdbc:derby:" + Paths.get(EnvUtil.getNacosHome(), derbyBaseDir).toString()
+ ";create=true";
initialize(jdbcUrl);
initialize = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class DumpService {

private static final Logger LOGGER = LoggerFactory.getLogger(DumpService.class);

protected DumpProcessor processor;

protected DumpAllProcessor dumpAllProcessor;
Expand All @@ -90,6 +92,36 @@ public abstract class DumpService {

protected final ServerMemberManager memberManager;

/**
* full dump interval.
*/
static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60;

/**
* full dump delay.
*/
static final int INITIAL_DELAY_IN_MINUTE = 6 * 60;

private TaskManager dumpTaskMgr;

private TaskManager dumpAllTaskMgr;

static final AtomicInteger FINISHED = new AtomicInteger();

static final int INIT_THREAD_COUNT = 10;

int total = 0;

private static final String TRUE_STR = "true";

private static final String BETA_TABLE_NAME = "config_info_beta";

private static final String TAG_TABLE_NAME = "config_info_tag";

Boolean isQuickStart = false;

private int retentionDays = 30;

/**
* Here you inject the dependent objects constructively, ensuring that some of the dependent functionality is
* initialized ahead of time.
Expand Down Expand Up @@ -238,7 +270,7 @@ protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProc

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
Expand Down Expand Up @@ -455,36 +487,4 @@ public void run() {
* @return {@link Boolean}
*/
protected abstract boolean canExecute();

/**
* full dump interval.
*/
static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60;

/**
* full dump delay.
*/
static final int INITIAL_DELAY_IN_MINUTE = 6 * 60;

private TaskManager dumpTaskMgr;

private TaskManager dumpAllTaskMgr;

private static final Logger LOGGER = LoggerFactory.getLogger(DumpService.class);

static final AtomicInteger FINISHED = new AtomicInteger();

static final int INIT_THREAD_COUNT = 10;

int total = 0;

private static final String TRUE_STR = "true";

private static final String BETA_TABLE_NAME = "config_info_beta";

private static final String TAG_TABLE_NAME = "config_info_tag";

Boolean isQuickStart = false;

private int retentionDays = 30;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean process(NacosTask task) {
if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
for (ConfigInfoWrapper cf : page.getPageItems()) {
long id = cf.getId();
lastMaxId = id > lastMaxId ? id : lastMaxId;
lastMaxId = Math.max(id, lastMaxId);
if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(cf.getContent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
* @date 2020/7/5 12:19 PM
*/
public class DumpChangeProcessor implements NacosTaskProcessor {

final DumpService dumpService;

final PersistService persistService;

final Timestamp startTime;

final Timestamp endTime;

public DumpChangeProcessor(DumpService dumpService, Timestamp startTime, Timestamp endTime) {
this.dumpService = dumpService;
Expand Down Expand Up @@ -82,23 +90,12 @@ public boolean process(NacosTask task) {
.dumpChange(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified());
final String content = cf.getContent();
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
LogUtil.DEFAULT_LOG.info("[dump-change-ok] {}, {}, length={}, md5={}",
new Object[] {GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(),
content.length(), md5});
LogUtil.DEFAULT_LOG.info("[dump-change-ok] {}, {}, length={}, md5={}", GroupKey2.getKey(cf.getDataId(), cf.getGroup()),
cf.getLastModified(), content.length(), md5);
}
ConfigCacheService.reloadConfig();
long endChangeConfigTime = System.currentTimeMillis();
LogUtil.DEFAULT_LOG.warn("changeConfig done,cost:{}", endChangeConfigTime - startChangeConfigTime);
return true;
}

// =====================

final DumpService dumpService;

final PersistService persistService;

final Timestamp startTime;

final Timestamp endTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
*/
public class DumpProcessor implements NacosTaskProcessor {

final DumpService dumpService;

public DumpProcessor(DumpService dumpService) {
this.dumpService = dumpService;
}
Expand All @@ -60,34 +62,28 @@ public boolean process(NacosTask task) {
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);

if (isBeta) {
// beta发布,则dump数据,更新beta缓存
// if publish beta, then dump config, update beta cache
ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);

build.remove(Objects.isNull(cf));
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
build.content(Objects.isNull(cf) ? null : cf.getContent());

return DumpConfigHandler.configDump(build.build());
}
if (StringUtils.isBlank(tag)) {
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
} else {
if (StringUtils.isBlank(tag)) {
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);

build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());

return DumpConfigHandler.configDump(build.build());
} else {

ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);

build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());

return DumpConfigHandler.configDump(build.build());
}
ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);

build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());

}
return DumpConfigHandler.configDump(build.build());
}

final DumpService dumpService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
@Service
public class MergeDatumService {

private static final Logger LOGGER = LoggerFactory.getLogger(MergeDatumService.class);

final TaskManager mergeTasks;

private PersistService persistService;

static final int INIT_THREAD_COUNT = 40;
Expand Down Expand Up @@ -180,9 +184,4 @@ public void run() {
LOGGER.info("[all-merge-dump] {} / {}", FINISHED.get(), total);
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(MergeDatumService.class);

final TaskManager mergeTasks;

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@
* @author Nacos
*/
public class MergeTaskProcessor implements NacosTaskProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(MergeTaskProcessor.class);

private static final int PAGE_SIZE = 10000;

private PersistService persistService;

private MergeDatumService mergeService;

MergeTaskProcessor(PersistService persistService, MergeDatumService mergeService) {
this.persistService = persistService;
Expand Down Expand Up @@ -133,10 +139,4 @@ public static ConfigInfo merge(String dataId, String group, String tenant, List<
String content = sb.substring(0, sb.lastIndexOf(Constants.NACOS_LINE_SEPARATOR));
return new ConfigInfo(dataId, group, tenant, appName, content);
}

private static final Logger LOGGER = LoggerFactory.getLogger(MergeTaskProcessor.class);

private PersistService persistService;

private MergeDatumService mergeService;
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,24 @@
@Service
public class AsyncNotifyService {

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);

private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();

private static final int MIN_RETRY_INTERVAL = 500;

private static final int INCREASE_STEPS = 1000;

private static final int MAX_COUNT = 6;

@Autowired
private DumpService dumpService;

@Autowired
private ConfigClusterRpcClientProxy configClusterRpcClientProxy;

private ServerMemberManager memberManager;

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
Expand Down Expand Up @@ -120,15 +135,6 @@ public Class<? extends Event> subscribeType() {
});
}

private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate();

private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);

@Autowired
private ConfigClusterRpcClientProxy configClusterRpcClientProxy;

private ServerMemberManager memberManager;

class AsyncTask implements Runnable {

private Queue<NotifySingleTask> queue;
Expand Down Expand Up @@ -486,11 +492,4 @@ private static int getDelayTime(NotifyTask task) {
}
return delay;
}

private static final int MIN_RETRY_INTERVAL = 500;

private static final int INCREASE_STEPS = 1000;

private static final int MAX_COUNT = 6;

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public void run() {
try {
((ScheduledThreadPoolExecutor) executor).schedule(this, 500L, TimeUnit.MILLISECONDS);
} catch (Exception e) { // The notification failed, but at the same time, the node was offline
LOGGER.warn("[notify-thread-pool] cluster remove node {}, current thread was tear down.", target,
e);
LOGGER.warn("[notify-thread-pool] cluster remove node {}, current thread was tear down.", target, e);
}
}
}
Expand Down
Loading