Skip to content

Commit

Permalink
support for Mono
Browse files Browse the repository at this point in the history
  • Loading branch information
yuankui committed Jun 14, 2019
1 parent 15f980a commit b7ee327
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -75,13 +76,21 @@ public void init(Method method) {
// print the plan of the provider
ResourceProvider provider = optional.get();
provider.setSelected(true);
log.info("execution plan for method: {}", method);
System.out.println("execution plan for method: " + method);
log.info("execution plan for method: {}", methodName(method));
System.out.println("execution plan for method: " + methodName(method));
DependencyPrinter.print(provider, log::info);
DependencyPrinter.print(provider, System.out::println);
this.caller = provider.getCaller();
}

private String methodName(Method method) {
String className = method.getDeclaringClass().getSimpleName();
String methodName = method.getName();
String params = Arrays.stream(method.getParameterTypes())
.map(Class::getSimpleName)
.collect(Collectors.joining(", "));
return String.format("%s#%s(%s)", className, methodName, params);
}

@Override
public Object execute(IOContext IOContext) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.github.yuankui.easyio.runner.es.providers.async;

import io.github.yuankui.easyio.generic.Caller;
import io.github.yuankui.easyio.generic.MethodAdapter;
import io.github.yuankui.easyio.generic.provider.Depend;
import io.github.yuankui.easyio.generic.provider.Provide;
import io.github.yuankui.easyio.generic.provider.Provider;
import io.github.yuankui.easyio.runner.es.EsProvider;
import io.github.yuankui.easyio.runner.es.providers.parser.Parser;
import org.elasticsearch.action.search.SearchResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import reactor.core.publisher.Mono;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@EsProvider
public class AsyncSearchResultProvider implements Provider {
@Autowired
private ApplicationContext context;
private Parser parser = null;

@Override
public void init(MethodAdapter methodAdapter) {
Type type = methodAdapter.getMethod().getGenericReturnType();
if (!(type instanceof ParameterizedType)) {
throw new RuntimeException("return type not generic");
}

if (((ParameterizedType) type).getRawType() != Mono.class) {
throw new RuntimeException("return type not Mono<T>");
}

Type innerType = ((ParameterizedType) type).getActualTypeArguments()[0];
Map<String, Parser> beansOfType = context.getBeansOfType(Parser.class);
List<String> messages = new ArrayList<>();

beansOfType.forEach((s, parser) -> {
try {
parser.init(SearchResponse.class, innerType);
this.parser = parser;
} catch (Exception e) {
messages.add(e.getMessage());
}
});

if (this.parser == null) {
throw new RuntimeException("no proper parser found:" + messages);
}
}

@Provide("result")
public Caller<Mono<Object>> provide(@Depend("asyncResponse") Caller<Mono<SearchResponse>> caller) {
return ioContext -> {
Mono<SearchResponse> monoResponse = caller.call(ioContext);
return monoResponse.map(response -> parser.parse(response));
};
}

@Override
public int race(Provider other) {
// 比默认的SearchResultProvider优先级要高一些
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.github.yuankui.easyio.runner.es.resource.Result;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import reactor.core.publisher.Mono;

@RunWith(EsRunner.class)
@Host(host = "localhost", port = 9200)
Expand All @@ -16,4 +17,7 @@ public interface EsDemoService {
Result<Person> findByName(@Term("name") String name);
SearchResponse findByName2(@Term("name") String name, Page page);
SearchResponse findByName3(SearchRequest request, Page page);

// async
Mono<Result<Person>> findByName4(@Term("name") String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Configuration
@Import(EsConfiguration.class)
Expand Down Expand Up @@ -43,4 +45,16 @@ public void findByName2() {

System.out.println("response = " + response);
}

@Test
public void findByName4() throws InterruptedException {
Mono<Result<Person>> mono = esDemoService.findByName4("yuankui");

mono.subscribe(r -> {
List<Person> people = r.getData();
System.out.println("people = " + people);
});

TimeUnit.SECONDS.sleep(5);
}
}

0 comments on commit b7ee327

Please sign in to comment.