Skip to content

Commit

Permalink
http refresh.
Browse files Browse the repository at this point in the history
  • Loading branch information
yu199195 committed Jul 14, 2020
1 parent 482c83e commit c919b38
Show file tree
Hide file tree
Showing 12 changed files with 628 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,17 @@ public enum ConfigGroupEnum {
* App auth config group enum.
*/
APP_AUTH,



/**
* Plugin config group enum.
*/
PLUGIN,



/**
* Rule config group enum.
*/
RULE,



/**
* Selector config group enum.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.soul.common.concurrent.SoulThreadFactory;
import org.dromara.soul.common.constant.HttpConstants;
import org.dromara.soul.common.dto.AppAuthData;
import org.dromara.soul.common.dto.ConfigData;
import org.dromara.soul.common.dto.MetaData;
import org.dromara.soul.common.dto.PluginData;
import org.dromara.soul.common.dto.RuleData;
import org.dromara.soul.common.dto.SelectorData;
import org.dromara.soul.common.enums.ConfigGroupEnum;
import org.dromara.soul.common.exception.SoulException;
import org.dromara.soul.common.utils.ThreadUtils;
Expand All @@ -42,7 +44,7 @@
import org.dromara.soul.sync.data.api.PluginDataSubscriber;
import org.dromara.soul.sync.data.api.SyncDataService;
import org.dromara.soul.sync.data.http.config.HttpConfig;
import org.dromara.soul.sync.data.http.handler.HttpSyncDataHandler;
import org.dromara.soul.sync.data.http.refresh.DataRefreshFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
Expand All @@ -52,17 +54,6 @@
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* HTTP long polling implementation.
*
Expand All @@ -71,15 +62,10 @@
*/
@SuppressWarnings("all")
@Slf4j
public class HttpSyncDataService extends HttpSyncDataHandler implements SyncDataService, AutoCloseable {
public class HttpSyncDataService implements SyncDataService, AutoCloseable {

private static final AtomicBoolean RUNNING = new AtomicBoolean(false);

/**
* cache group config with md5 info.
*/
private static final ConcurrentMap<ConfigGroupEnum, ConfigData> GROUP_CACHE = new ConcurrentHashMap<>();

private static final Gson GSON = new Gson();

/**
Expand All @@ -98,22 +84,22 @@ public class HttpSyncDataService extends HttpSyncDataHandler implements SyncData

private List<String> serverList;

private DataRefreshFactory factory;

public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
super(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.httpConfig = httpConfig;
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
this.start(httpConfig);
}

private void start(final HttpConfig httpConfig) {

// init RestTemplate
OkHttp3ClientHttpRequestFactory factory = new OkHttp3ClientHttpRequestFactory();
factory.setConnectTimeout((int) this.connectionTimeout.toMillis());
factory.setReadTimeout((int) HttpConstants.CLIENT_POLLING_READ_TIMEOUT);
this.httpClient = new RestTemplate(factory);

// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
Expand Down Expand Up @@ -146,12 +132,10 @@ private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulExcept
}

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {

StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}

String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
log.info("request configs: [{}]", url);
String json = null;
Expand All @@ -162,131 +146,40 @@ private void doFetchGroupConfig(final String server, final ConfigGroupEnum... gr
log.warn(message);
throw new SoulException(message, e);
}

// update local cache
boolean updated = this.updateCacheWithJson(json);
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}

// not updated. it is likely that the current config server has not been updated yet. wait a moment.
log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
ThreadUtils.sleep(TimeUnit.SECONDS, 30);

}

/**
* If the MD5 values are different and the last update time of the old data is less than
* the last update time of the new data, the configuration cache is considered to have been changed.
* @param newVal the lasted config
* @param groupEnum the group enum
* @return true: if need update
*/
private <T> boolean updateCacheIfNeed(final ConfigData<T> newVal, final ConfigGroupEnum groupEnum) {
// first init cache
if (GROUP_CACHE.putIfAbsent(groupEnum, newVal) == null) {
return true;
}
ResultHolder holder = new ResultHolder(false);
GROUP_CACHE.merge(groupEnum, newVal, (oldVal, value) -> {
// must compare the last update time
if (oldVal == null || (!StringUtils.equals(oldVal.getMd5(), newVal.getMd5())
&& oldVal.getLastModifyTime() < newVal.getLastModifyTime())) {
log.info("update {} config: {}", groupEnum, newVal);
holder.result = true;
return newVal;
}
log.info("Get the same config, the [{}] config cache will not be updated, md5:{}", groupEnum, oldVal.getMd5());
return oldVal;
});
return holder.result;
}


/**
* update local cache.
* @param json the response from config server.
* @return true: the local cache was updated. false: not updated.
*/
private boolean updateCacheWithJson(final String json) {

JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");

// if the config cache will be updated?
boolean updated = false;

// plugin
JsonObject pluginData = data.getAsJsonObject(ConfigGroupEnum.PLUGIN.name());
if (pluginData != null) {
ConfigData<PluginData> result = GSON.fromJson(pluginData, new TypeToken<ConfigData<PluginData>>() {
}.getType());
if (this.updateCacheIfNeed(result, ConfigGroupEnum.PLUGIN)) {
updated = true;
this.flushAllPlugin(result.getData());
}
}

// rule
JsonObject ruleData = data.getAsJsonObject(ConfigGroupEnum.RULE.name());
if (ruleData != null) {
ConfigData<RuleData> result = GSON.fromJson(ruleData, new TypeToken<ConfigData<RuleData>>() {
}.getType());
if (this.updateCacheIfNeed(result, ConfigGroupEnum.RULE)) {
updated = true;
this.flushAllRule(result.getData());
}
}

// selector
JsonObject selectorData = data.getAsJsonObject(ConfigGroupEnum.SELECTOR.name());
if (selectorData != null) {
ConfigData<SelectorData> result = GSON.fromJson(selectorData, new TypeToken<ConfigData<SelectorData>>() {
}.getType());
if (this.updateCacheIfNeed(result, ConfigGroupEnum.SELECTOR)) {
updated = true;
this.flushAllSelector(result.getData());
}
}

// appAuth
JsonObject appAuthData = data.getAsJsonObject(ConfigGroupEnum.APP_AUTH.name());
if (appAuthData != null) {
ConfigData<AppAuthData> result = GSON.fromJson(appAuthData, new TypeToken<ConfigData<AppAuthData>>() {
}.getType());
if (this.updateCacheIfNeed(result, ConfigGroupEnum.APP_AUTH)) {
updated = true;
this.flushAllAppAuth(result.getData());
}
}

// metaData
JsonObject metaData = data.getAsJsonObject(ConfigGroupEnum.META_DATA.name());
if (metaData != null) {
ConfigData<MetaData> result = GSON.fromJson(metaData, new TypeToken<ConfigData<MetaData>>() {
}.getType());
if (this.updateCacheIfNeed(result, ConfigGroupEnum.META_DATA)) {
updated = true;
this.flushMetaData(result.getData());
}
}

return updated;
return factory.executor(data);
}

@SuppressWarnings("unchecked")
private void doLongPolling(final String server) {

MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = GROUP_CACHE.get(group);
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);

String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
Expand All @@ -298,7 +191,6 @@ private void doLongPolling(final String server) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new SoulException(message, e);
}

if (groupJson != null) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
Expand Down Expand Up @@ -352,14 +244,4 @@ public void run() {
log.warn("Stop http long polling.");
}
}

private static final class ResultHolder {

private boolean result;

ResultHolder(final boolean result) {
this.result = result;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import lombok.Data;

/**
* The type Http config.
*/
@Data
public class HttpConfig {

Expand Down
Loading

0 comments on commit c919b38

Please sign in to comment.