25
25
import group .rxcloud .capa .spi .aws .mesh .http .serializer .AwsCapaSerializerProvider ;
26
26
import group .rxcloud .capa .spi .http .CapaSerializeHttpSpi ;
27
27
import group .rxcloud .capa .spi .http .config .RpcServiceOptions ;
28
+ import group .rxcloud .cloudruntimes .domain .core .invocation .HttpExtension ;
28
29
import group .rxcloud .cloudruntimes .utils .TypeRef ;
29
30
import okhttp3 .Headers ;
30
31
import okhttp3 .OkHttpClient ;
34
35
import org .slf4j .LoggerFactory ;
35
36
import software .amazon .awssdk .utils .StringUtils ;
36
37
38
+ import java .util .List ;
37
39
import java .util .Map ;
38
40
import java .util .Objects ;
39
41
import java .util .concurrent .CompletableFuture ;
@@ -54,15 +56,25 @@ public AwsCapaHttp(OkHttpClient httpClient, CapaObjectSerializer objectSerialize
54
56
}
55
57
56
58
@ Override
57
- protected <T > CompletableFuture <HttpResponse <T >> invokeSpiApi (String appId ,
58
- String method ,
59
- Object requestData ,
60
- Map <String , String > headers ,
61
- TypeRef <T > type ,
62
- RpcServiceOptions rpcServiceOptions ) {
59
+ protected <T > CompletableFuture <HttpResponse <T >> invokeSpiApi (
60
+ String appId ,
61
+ String method ,
62
+ Object requestData ,
63
+ String httpMethod ,
64
+ Map <String , String > headers ,
65
+ Map <String , List <String >> urlParameters ,
66
+ TypeRef <T > type ,
67
+ RpcServiceOptions rpcServiceOptions ) {
63
68
Objects .requireNonNull (rpcServiceOptions , "rpcServiceOptions" );
64
69
AwsToAwsHttpServiceMeshInvoker awsToAwsHttpServiceMeshInvoker = new AwsToAwsHttpServiceMeshInvoker ();
65
- return awsToAwsHttpServiceMeshInvoker .doInvokeSpiApi (appId , method , requestData , headers , type , (AwsRpcServiceOptions ) rpcServiceOptions );
70
+ return awsToAwsHttpServiceMeshInvoker .doInvokeSpiApi (
71
+ appId ,
72
+ method ,
73
+ requestData ,
74
+ httpMethod ,
75
+ headers , urlParameters ,
76
+ type ,
77
+ (AwsRpcServiceOptions ) rpcServiceOptions );
66
78
}
67
79
68
80
private interface AwsHttpInvoker {
@@ -79,12 +91,15 @@ private interface AwsHttpInvoker {
79
91
* @param rpcServiceOptions the rpc service options
80
92
* @return the async completable future
81
93
*/
82
- <T > CompletableFuture <HttpResponse <T >> doInvokeSpiApi (String appId ,
83
- String method ,
84
- Object requestData ,
85
- Map <String , String > headers ,
86
- TypeRef <T > type ,
87
- AwsRpcServiceOptions rpcServiceOptions );
94
+ <T > CompletableFuture <HttpResponse <T >> doInvokeSpiApi (
95
+ String appId ,
96
+ String method ,
97
+ Object requestData ,
98
+ String httpMethod ,
99
+ Map <String , String > headers ,
100
+ Map <String , List <String >> urlParameters ,
101
+ TypeRef <T > type ,
102
+ AwsRpcServiceOptions rpcServiceOptions );
88
103
}
89
104
90
105
/**
@@ -98,12 +113,15 @@ private class AwsToAwsHttpServiceMeshInvoker implements AwsHttpInvoker {
98
113
private static final String POST = "POST" ;
99
114
100
115
@ Override
101
- public <T > CompletableFuture <HttpResponse <T >> doInvokeSpiApi (String appId ,
102
- String method ,
103
- Object requestData ,
104
- Map <String , String > headers ,
105
- TypeRef <T > type ,
106
- AwsRpcServiceOptions rpcServiceOptions ) {
116
+ public <T > CompletableFuture <HttpResponse <T >> doInvokeSpiApi (
117
+ String appId ,
118
+ String method ,
119
+ Object requestData ,
120
+ String httpMethod ,
121
+ Map <String , String > headers ,
122
+ Map <String , List <String >> urlParameters ,
123
+ TypeRef <T > type ,
124
+ AwsRpcServiceOptions rpcServiceOptions ) {
107
125
AwsRpcServiceOptions .AwsToAwsServiceOptions awsToAwsServiceOptions = rpcServiceOptions .getAwsToAwsServiceOptions ();
108
126
final String serviceId = awsToAwsServiceOptions .getServiceId ();
109
127
if (StringUtils .isBlank (serviceId )) {
@@ -114,25 +132,50 @@ public <T> CompletableFuture<HttpResponse<T>> doInvokeSpiApi(String appId,
114
132
final int servicePort = awsToAwsServiceOptions .getServicePort ();
115
133
final String namespace = awsToAwsServiceOptions .getNamespace ();
116
134
117
- return doAsyncInvoke (method , requestData , headers , type , serviceId , namespace , servicePort );
135
+ if (httpMethod == null ) {
136
+ httpMethod = POST ;
137
+ }
138
+
139
+ return doAsyncInvoke (
140
+ method ,
141
+ requestData ,
142
+ httpMethod ,
143
+ headers ,
144
+ urlParameters ,
145
+ type ,
146
+ serviceId ,
147
+ namespace ,
148
+ servicePort );
118
149
}
119
150
120
- private <T > CompletableFuture <HttpResponse <T >> doAsyncInvoke (String method ,
121
- Object requestData ,
122
- Map <String , String > headers ,
123
- TypeRef <T > type ,
124
- String serviceId ,
125
- String namespace ,
126
- int servicePort ) {
151
+ private <T > CompletableFuture <HttpResponse <T >> doAsyncInvoke (
152
+ String method ,
153
+ Object requestData ,
154
+ String httpMethod ,
155
+ Map <String , String > headers ,
156
+ Map <String , List <String >> urlParameters ,
157
+ TypeRef <T > type ,
158
+ String serviceId ,
159
+ String namespace ,
160
+ int servicePort ) {
127
161
// generate app mesh http url
128
162
final String appMeshHttpUrl = AwsCapaRpcProperties .AppMeshProperties .Settings .getRpcAwsAppMeshTemplate ()
129
163
.replace ("{serviceId}" , serviceId )
130
164
.replace ("{namespace}" , namespace )
131
165
.replace ("{servicePort}" , String .valueOf (servicePort ))
132
166
.replace ("{operation}" , method );
133
167
168
+ if (HttpExtension .GET .getMethod ().toString ().equals (httpMethod )) {
169
+ // TODO: 2021/11/30 append urlParameters to url
170
+ }
171
+
134
172
// async invoke
135
- CompletableFuture <HttpResponse <T >> asyncInvoke0 = post (appMeshHttpUrl , requestData , headers , type );
173
+ CompletableFuture <HttpResponse <T >> asyncInvoke0 = invokeHttp (
174
+ appMeshHttpUrl ,
175
+ requestData ,
176
+ httpMethod ,
177
+ headers ,
178
+ type );
136
179
asyncInvoke0 .exceptionally (throwable -> {
137
180
if (logger .isWarnEnabled ()) {
138
181
logger .warn ("[AwsCapaHttp] async invoke error" , throwable );
@@ -142,10 +185,12 @@ private <T> CompletableFuture<HttpResponse<T>> doAsyncInvoke(String method,
142
185
return asyncInvoke0 ;
143
186
}
144
187
145
- private <T > CompletableFuture <HttpResponse <T >> post (String url ,
146
- Object requestData ,
147
- Map <String , String > headers ,
148
- TypeRef <T > type ) {
188
+ private <T > CompletableFuture <HttpResponse <T >> invokeHttp (
189
+ String url ,
190
+ Object requestData ,
191
+ String httpMethod ,
192
+ Map <String , String > headers ,
193
+ TypeRef <T > type ) {
149
194
// generate http request body
150
195
RequestBody body = getRequestBodyWithSerialize (requestData , headers );
151
196
Headers header = getRequestHeaderWithParams (headers );
@@ -154,7 +199,7 @@ private <T> CompletableFuture<HttpResponse<T>> post(String url,
154
199
Request request = new Request .Builder ()
155
200
.url (url )
156
201
.headers (header )
157
- .method (POST , body )
202
+ .method (httpMethod , body )
158
203
.build ();
159
204
160
205
return doAsyncInvoke0 (request , type );
0 commit comments