Skip to content

Commit 45b3bb1

Browse files
authored
Merge pull request #23 from reckless11/fix_getconfig_bug
Fix getconfig bug+add config ut
2 parents 8efd0af + 331a975 commit 45b3bb1

File tree

10 files changed

+417
-46
lines changed

10 files changed

+417
-46
lines changed

capa-spi-aws-config/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,18 @@
5151
<artifactId>junit-jupiter-engine</artifactId>
5252
<scope>test</scope>
5353
</dependency>
54+
55+
<!-- power mock -->
56+
<dependency>
57+
<groupId>org.powermock</groupId>
58+
<artifactId>powermock-module-junit4</artifactId>
59+
<scope>test</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>org.powermock</groupId>
63+
<artifactId>powermock-api-mockito2</artifactId>
64+
<scope>test</scope>
65+
</dependency>
5466
</dependencies>
5567

5668
<build>

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfigStore.java

Lines changed: 71 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,23 @@ protected <T> Mono<List<ConfigurationItem<T>>> doGet(String appId, String group,
111111
// todo:need to get the specific env from system properties
112112
String applicationName = appId + "_FAT";
113113
String configurationName = keys.get(0);
114+
/*
115+
check whether the config file has been initialized before
116+
the function of if here is to first check init status to rough filter
117+
*/
118+
if (!isInitialized(applicationName, configurationName)) {
119+
/*
120+
return Configuration.EMPTY if has been initialized before by others,
121+
or return configuration which initialized just now.
122+
*/
123+
Configuration<T> initConfiguration = initConfig(applicationName, configurationName, group, label, metadata, type);
124+
if (!Objects.equals(initConfiguration, Configuration.EMPTY)) {
125+
//init just now and the config value is the latest value,return immediately
126+
return Mono.just(Lists.newArrayList(initConfiguration.getConfigurationItem()));
127+
}
128+
}
129+
130+
//has been initialized before, and need to get the latest value
114131
String clientConfigurationVersion = getCurVersion(applicationName, configurationName);
115132

116133
GetConfigurationRequest request = GetConfigurationRequest.builder()
@@ -148,10 +165,21 @@ protected <T> Flux<SubscribeResp<T>> doSubscribe(String appId, String group, Str
148165
return doSub(applicationName, configurationName, group, label, metadata, type, appId);
149166
}
150167

151-
private synchronized <T> Mono<Boolean> initConfig(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
168+
/**
169+
* @param applicationName applicationName
170+
* @param configurationName configurationName
171+
* @param group group
172+
* @param label label
173+
* @param metadata metadata
174+
* @param type type
175+
* @param <T> T
176+
* @return return Configuration.EMPTY if has been initialized before by others and not been done this time;
177+
* or return configuration value which initialized just now.
178+
*/
179+
private synchronized <T> Configuration<T> initConfig(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
152180
// double check whether has been initialized
153181
if (isInitialized(applicationName, configurationName)) {
154-
return Mono.just(true);
182+
return Configuration.EMPTY;
155183
}
156184
return Mono.create(monoSink -> {
157185
AwsCapaConfigurationScheduler.INSTANCE.configInitScheduler
@@ -173,16 +201,18 @@ private synchronized <T> Mono<Boolean> initConfig(String applicationName, String
173201
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
174202
}
175203
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
176-
initConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
177-
monoSink.success(true);
204+
Configuration<T> tConfiguration = initConfigurationItem(applicationName, configurationName, type, resp.content(), resp.configurationVersion());
205+
monoSink.success(tConfiguration);
178206
}
179-
}, 0, TimeUnit.SECONDS);
180-
});
207+
});
208+
})
209+
.map(resp -> (Configuration<T>) resp)
210+
.block();
181211
}
182212

183213
private <T> void initSubscribe(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type) {
184214
if (!isInitialized(applicationName, configurationName)) {
185-
initConfig(applicationName, configurationName, group, label, metadata, type).block();
215+
initConfig(applicationName, configurationName, group, label, metadata, type);
186216
}
187217
if (!isSubscribed(applicationName, configurationName)) {
188218
createSubscribe(applicationName, configurationName, type);
@@ -194,33 +224,33 @@ private synchronized <T> void createSubscribe(String applicationName, String con
194224
return;
195225
}
196226
Flux.create(fluxSink -> {
197-
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler
198-
.schedulePeriodically(() -> {
199-
String version = getCurVersion(applicationName, configurationName);
200-
201-
GetConfigurationRequest request = GetConfigurationRequest.builder()
202-
.application(applicationName)
203-
.clientId(UUID.randomUUID().toString())
204-
.configuration(configurationName)
205-
.clientConfigurationVersion(version)
206-
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getAwsAppConfigEnv())
207-
.build();
208-
209-
GetConfigurationResponse resp = null;
210-
try {
211-
resp = appConfigAsyncClient.getConfiguration(request).get();
212-
} catch (InterruptedException | ExecutionException e) {
213-
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
214-
}
215-
// update subscribed status if needs
216-
getConfiguration(applicationName, configurationName).getSubscribed().compareAndSet(false, true);
217-
218-
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
219-
fluxSink.next(resp);
220-
}
221-
// todo: make the polling frequency configurable
222-
}, 0, 1, TimeUnit.SECONDS);
223-
})
227+
AwsCapaConfigurationScheduler.INSTANCE.configSubscribePollingScheduler
228+
.schedulePeriodically(() -> {
229+
String version = getCurVersion(applicationName, configurationName);
230+
231+
GetConfigurationRequest request = GetConfigurationRequest.builder()
232+
.application(applicationName)
233+
.clientId(UUID.randomUUID().toString())
234+
.configuration(configurationName)
235+
.clientConfigurationVersion(version)
236+
.environment(AwsCapaConfigurationProperties.AppConfigProperties.Settings.getAwsAppConfigEnv())
237+
.build();
238+
239+
GetConfigurationResponse resp = null;
240+
try {
241+
resp = appConfigAsyncClient.getConfiguration(request).get();
242+
} catch (InterruptedException | ExecutionException e) {
243+
LOGGER.error("error occurs when getConfiguration,configurationName:{},version:{}", request.configuration(), request.clientConfigurationVersion(), e);
244+
}
245+
// update subscribed status if needs
246+
getConfiguration(applicationName, configurationName).getSubscribed().compareAndSet(false, true);
247+
248+
if (resp != null && !Objects.equals(resp.configurationVersion(), version)) {
249+
fluxSink.next(resp);
250+
}
251+
// todo: make the polling frequency configurable
252+
}, 0, 1, TimeUnit.SECONDS);
253+
})
224254
.publishOn(AwsCapaConfigurationScheduler.INSTANCE.configPublisherScheduler)
225255
.map(origin -> {
226256
GetConfigurationResponse resp = (GetConfigurationResponse) origin;
@@ -235,11 +265,14 @@ private synchronized <T> void createSubscribe(String applicationName, String con
235265

236266
private <T> Flux<SubscribeResp<T>> doSub(String applicationName, String configurationName, String group, String label, Map<String, String> metadata, TypeRef<T> type, String appId) {
237267
Configuration<?> configuration = getConfiguration(applicationName, configurationName);
268+
if (Objects.equals(configuration, Configuration.EMPTY)) {
269+
return Flux.empty();
270+
}
238271
return Flux.create(fluxSink -> {
239-
configuration.addListener(configurationItem -> {
240-
fluxSink.next(configurationItem);
241-
});
242-
})
272+
configuration.addListener(configurationItem -> {
273+
fluxSink.next(configurationItem);
274+
});
275+
})
243276
.map(resp -> (ConfigurationItem<T>) resp)
244277
.map(resp -> convert(resp, appId));
245278
}

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/entity/Configuration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class Configuration<T> {
3030

3131
private static final Logger LOGGER = LoggerFactory.getLogger(Configuration.class);
3232

33-
public static final Configuration<Void> EMPTY = new Configuration<>();
33+
public static final Configuration EMPTY = new Configuration<>();
3434

3535
private String clientConfigurationVersion;
3636

capa-spi-aws-config/src/main/java/group/rxcloud/capa/spi/aws/config/serializer/PropertiesSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public <T> T deserialize(SdkBytes contentSdkBytes, TypeRef<T> type) {
4040
return (T) map;
4141
}
4242

43-
public Map<String, String> parsePropertiesToMap(InputStream inputStream) {
43+
private Map<String, String> parsePropertiesToMap(InputStream inputStream) {
4444
Properties properties = new Properties();
4545
try {
4646
properties.load(inputStream);

capa-spi-aws-config/src/test/java/group/rxcloud/capa/spi/aws/config/AwsCapaConfigStoreIgnoreTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import com.google.common.collect.Lists;
2020
import group.rxcloud.capa.component.configstore.ConfigurationItem;
21+
import group.rxcloud.capa.component.configstore.StoreConfig;
2122
import group.rxcloud.capa.component.configstore.SubscribeResp;
2223
import group.rxcloud.capa.infrastructure.serializer.DefaultObjectSerializer;
2324
import group.rxcloud.cloudruntimes.utils.TypeRef;
25+
import org.junit.jupiter.api.BeforeEach;
2426
import org.junit.jupiter.api.Disabled;
2527
import org.junit.jupiter.api.Test;
2628
import reactor.core.publisher.Flux;
@@ -35,19 +37,26 @@
3537
*/
3638

3739
class AwsCapaConfigStoreIgnoreTest {
40+
AwsCapaConfigStore ins = new AwsCapaConfigStore(new DefaultObjectSerializer());
41+
42+
@BeforeEach
43+
public void setUp(){
44+
ins.doInit(new StoreConfig());
45+
}
3846

3947
@Disabled
4048
@Test
4149
void testDoGet(){
42-
AwsCapaConfigStore ins = new AwsCapaConfigStore(new DefaultObjectSerializer());
4350

4451
Mono<List<ConfigurationItem<User>>> mono = ins.doGet("100012345", "", "", Lists.newArrayList("test1.json"), new HashMap<>(), TypeRef.get(User.class));
4552
mono.subscribe(resp->{
46-
System.out.println("hihi,age:"+resp.get(0).getContent().getAge());
47-
while(true){
48-
49-
}
53+
System.out.println(resp.get(0).getContent().getAge());
54+
});
55+
List<ConfigurationItem<User>> block = mono.block();
56+
mono.subscribe(resp->{
57+
System.out.println(resp.get(0).getContent().getAge());
5058
});
59+
List<ConfigurationItem<User>> block2 = mono.block();
5160
System.out.println("");
5261
while (true){
5362

@@ -58,7 +67,6 @@ void testDoGet(){
5867
@Disabled
5968
@Test
6069
void testDoSubscribe(){
61-
AwsCapaConfigStore ins = new AwsCapaConfigStore(new DefaultObjectSerializer());
6270

6371
Flux<SubscribeResp<User>> flux1 = ins.doSubscribe("100012345", "", "", Lists.newArrayList("test1.json"), new HashMap<>(), TypeRef.get(User.class));
6472
flux1.subscribe(resp -> {

0 commit comments

Comments
 (0)