Skip to content

Commit 35ab76e

Browse files
authored
add support for tencent polaris by sidecar (#152)
1 parent ed25727 commit 35ab76e

File tree

5 files changed

+395
-0
lines changed

5 files changed

+395
-0
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@
170170
<excludes>
171171
<exclude>org/apache/hadoop/fs/cosn/TencentCloudL5EndpointResolverImpl.java</exclude>
172172
<exclude>org/apache/hadoop/fs/cosn/TencentPolarisEndpointResolverImpl.java</exclude>
173+
<exclude>org/apache/hadoop/fs/cosn/TencentPolarisSidecarEndpointResolverImpl.java</exclude>
173174
</excludes>
174175
</configuration>
175176
</plugin>

src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
7474
public static final String COSN_L5_UPDATE_MAX_RETRIES_KEY = "fs.cosn.l5.update.maxRetries";
7575
public static final int DEFAULT_COSN_L5_UPDATE_MAX_RETRIES = 5;
7676

77+
// 如果进程不能内嵌运行北极星,使用sidecar方式运行
78+
public static final String COSN_USE_POLARIS_SIDECAR_ENABLED = "fs.cosn.polaris.sidecar.enabled";
79+
public static final boolean DEFAULT_COSN_USE_POLARIS_SIDECAR_ENABLED = false;
80+
public static final String COSN_POLARIS_SIDECAR_ADDRESS = "fs.cosn.polaris.sidecar.address";
81+
7782
public static final String COSN_TMP_DIR = "fs.cosn.tmp.dir";
7883
public static final String DEFAULT_TMP_DIR = "/tmp/hadoop_cos";
7984

src/main/java/org/apache/hadoop/fs/CosNativeFileSystemStore.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.hadoop.fs.cosn.TencentCloudL5EndpointResolver;
6666
import org.apache.hadoop.fs.cosn.CosNPartListing;
6767
import org.apache.hadoop.fs.cosn.TencentPolarisEndpointResolver;
68+
import org.apache.hadoop.fs.cosn.TencentPolarisSidecarClient;
6869
import org.slf4j.Logger;
6970
import org.slf4j.LoggerFactory;
7071

@@ -226,6 +227,25 @@ private void initCOSClient(URI uri, Configuration conf) throws IOException {
226227
config.setHandlerAfterProcess(this.tencentPolarisEndpointResolver);
227228
}
228229

230+
boolean usePolarisSidecar = conf.getBoolean(CosNConfigKeys.COSN_USE_POLARIS_SIDECAR_ENABLED,
231+
CosNConfigKeys.DEFAULT_COSN_USE_POLARIS_SIDECAR_ENABLED);
232+
if( usePolarisSidecar ){
233+
String namespace = conf.get(CosNConfigKeys.COSN_POLARIS_NAMESPACE);
234+
String service = conf.get(CosNConfigKeys.COSN_POLARIS_SERVICE);
235+
String address = conf.get(CosNConfigKeys.COSN_POLARIS_SIDECAR_ADDRESS);
236+
TencentPolarisSidecarClient polarisSideCarClient = new TencentPolarisSidecarClient(address);
237+
try {
238+
Class<?> polarisEndpointResolverClass = Class.forName("org.apache.hadoop.fs.cosn.TencentPolarisSidecarEndpointResolverImpl");
239+
Constructor<?> constructor = polarisEndpointResolverClass.getConstructor(TencentPolarisSidecarClient.class, String.class, String.class);
240+
this.tencentPolarisEndpointResolver = (TencentPolarisEndpointResolver) constructor.newInstance(polarisSideCarClient, namespace, service);
241+
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException |
242+
InstantiationException | IllegalAccessException e) {
243+
throw new IOException("The current version does not support Polaris sidecar resolver.", e);
244+
}
245+
config.setEndpointResolver(this.tencentPolarisEndpointResolver);
246+
config.turnOnRefreshEndpointAddrSwitch();
247+
config.setHandlerAfterProcess(this.tencentPolarisEndpointResolver);
248+
}
229249
} else {
230250
config = new ClientConfig(new Region(""));
231251
LOG.info("Use Customer Domain is {}", customerDomain);
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
package org.apache.hadoop.fs.cosn;
2+
3+
import com.qcloud.cos.http.IdleConnectionMonitorThread;
4+
import com.qcloud.cos.utils.ExceptionUtils;
5+
import com.qcloud.cos.utils.Jackson;
6+
import com.qcloud.cos.thirdparty.org.apache.http.HttpResponse;
7+
import com.qcloud.cos.thirdparty.org.apache.http.HttpStatus;
8+
import com.qcloud.cos.thirdparty.org.apache.http.StatusLine;
9+
import com.qcloud.cos.thirdparty.org.apache.http.client.HttpClient;
10+
import com.qcloud.cos.thirdparty.org.apache.http.client.config.RequestConfig;
11+
import com.qcloud.cos.thirdparty.org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
12+
import com.qcloud.cos.thirdparty.org.apache.http.client.methods.HttpPost;
13+
import com.qcloud.cos.thirdparty.org.apache.http.client.methods.HttpRequestBase;
14+
import com.qcloud.cos.thirdparty.org.apache.http.client.protocol.HttpClientContext;
15+
import com.qcloud.cos.thirdparty.org.apache.http.entity.InputStreamEntity;
16+
import com.qcloud.cos.thirdparty.org.apache.http.impl.client.HttpClientBuilder;
17+
import com.qcloud.cos.thirdparty.org.apache.http.impl.client.HttpClients;
18+
import com.qcloud.cos.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
19+
import com.qcloud.cos.thirdparty.org.apache.http.protocol.HttpContext;
20+
import com.qcloud.cos.thirdparty.org.apache.http.util.EntityUtils;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
import java.io.ByteArrayInputStream;
25+
import java.io.IOException;
26+
import java.net.URI;
27+
import java.net.URISyntaxException;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import java.util.UUID;
31+
32+
public class TencentPolarisSidecarClient {
33+
private static final Logger LOG = LoggerFactory.getLogger(TencentPolarisSidecarClient.class);
34+
35+
private static final int MAX_TOTAL_CONN = 100;
36+
private static final int DEFAULT_MAX_PER_ROUTE = 10;
37+
private static final int IDLE_CONN_TIME_MS = 60000;
38+
private static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = -1;
39+
private static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
40+
private static final int DEFAULT_SOCKET_TIMEOUT = 1000;
41+
42+
private HttpClient httpClient;
43+
private RequestConfig requestConfig;
44+
private final String polarisSideSarAddress;
45+
46+
private static final PoolingHttpClientConnectionManager connectionManager;
47+
private static final IdleConnectionMonitorThread idleConnectionMonitor;
48+
49+
static {
50+
connectionManager = new PoolingHttpClientConnectionManager();
51+
connectionManager.setMaxTotal(MAX_TOTAL_CONN);
52+
connectionManager.setDefaultMaxPerRoute(DEFAULT_MAX_PER_ROUTE);
53+
connectionManager.setValidateAfterInactivity(1);
54+
idleConnectionMonitor = new IdleConnectionMonitorThread(connectionManager);
55+
idleConnectionMonitor.setIdleAliveMS(IDLE_CONN_TIME_MS);
56+
idleConnectionMonitor.setDaemon(true);
57+
idleConnectionMonitor.start();
58+
}
59+
60+
public TencentPolarisSidecarClient(String address) {
61+
this.polarisSideSarAddress = address;
62+
initHttpClient();
63+
}
64+
65+
private void initHttpClient() {
66+
HttpClientBuilder httpClientBuilder = HttpClients.custom().setConnectionManager(connectionManager);
67+
this.httpClient = httpClientBuilder.build();
68+
this.requestConfig =
69+
RequestConfig.custom()
70+
.setContentCompressionEnabled(false)
71+
.setConnectionRequestTimeout(DEFAULT_CONNECTION_REQUEST_TIMEOUT)
72+
.setConnectTimeout(DEFAULT_CONNECTION_TIMEOUT)
73+
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT).build();
74+
}
75+
76+
private boolean isRequestSuccessful(HttpResponse httpResponse) {
77+
StatusLine statusLine = httpResponse.getStatusLine();
78+
int statusCode = -1;
79+
if (statusLine != null) {
80+
statusCode = statusLine.getStatusCode();
81+
}
82+
return statusCode / 100 == HttpStatus.SC_OK / 100;
83+
}
84+
85+
public Instance getOneInstance(String namespace, String service) {
86+
LOG.debug("get one instance for {} and {} by polarisSidecar", namespace, service);
87+
String uuid = UUID.randomUUID().toString();
88+
HttpResponse httpResponse = null;
89+
HttpRequestBase httpRequest = null;
90+
try {
91+
httpRequest = buildGetOnInstanceHttpRequest(namespace, service, uuid);
92+
if (httpRequest != null) {
93+
HttpContext context = HttpClientContext.create();
94+
httpResponse = executeOneRequest(context, httpRequest);
95+
if (httpResponse != null) {
96+
if (!isRequestSuccessful(httpResponse)) {
97+
ErrorResponse errorResponse = ErrorResponse.FromString(responseBody(httpResponse));
98+
if (errorResponse != null) {
99+
LOG.error("get one instance for namespace:{}, service:{}," +
100+
" errorCode:{}, errorMsg:{}, requestId:{}",
101+
namespace, service, errorResponse.code, errorResponse.info, uuid);
102+
}
103+
} else {
104+
GetOnInstanceResponse response = GetOnInstanceResponse.FromString(responseBody(httpResponse));
105+
if (response != null && response.instances != null && !response.instances.isEmpty()) {
106+
return response.instances.get(0);
107+
} else {
108+
LOG.error("get empty result for namespace:{}, service:{}, requestId:{}", namespace, service, uuid);
109+
}
110+
}
111+
}
112+
}
113+
} catch (Exception ex) {
114+
LOG.error("get one instance for namespace failed:{}, service:{}, error:{}, requestId:{}", namespace, service, ex.getMessage(), uuid);
115+
} finally {
116+
if (httpRequest != null) {
117+
httpRequest.abort();
118+
}
119+
closeHttpResponseStream(httpResponse);
120+
}
121+
return null;
122+
}
123+
124+
private void closeHttpResponseStream(HttpResponse httpResponse) {
125+
try {
126+
if (httpResponse != null && httpResponse.getEntity() != null
127+
&& httpResponse.getEntity().getContent() != null) {
128+
httpResponse.getEntity().getContent().close();
129+
}
130+
} catch (IOException e) {
131+
LOG.error("exception occur when close http response:", e);
132+
}
133+
}
134+
135+
private String responseBody(HttpResponse httpResponse)
136+
throws Exception {
137+
return EntityUtils.toString(httpResponse.getEntity(), "utf-8");
138+
}
139+
140+
private HttpRequestBase buildGetOnInstanceHttpRequest(String namespace, String service, String uuid) {
141+
String urlString = "http://" + this.polarisSideSarAddress + "/v1/GetOneInstance";
142+
try {
143+
HttpEntityEnclosingRequestBase httpRequestBase = new HttpPost();
144+
httpRequestBase.setURI(new URI(urlString));
145+
httpRequestBase.addHeader("Request-Id", uuid);
146+
byte[] requestContent = new GetOnInstanceRequest(namespace, service).JsonString().getBytes();
147+
InputStreamEntity reqEntity =
148+
new InputStreamEntity(new ByteArrayInputStream(requestContent),
149+
requestContent.length);
150+
httpRequestBase.setEntity(reqEntity);
151+
httpRequestBase.setConfig(this.requestConfig);
152+
return httpRequestBase;
153+
} catch (URISyntaxException e) {
154+
LOG.error("build uri failed url:{}, uuid:{}", urlString, uuid);
155+
return null;
156+
}
157+
}
158+
159+
public void updateResult(CallResultStat result) {
160+
LOG.debug("updateServiceCallResult {} and {} by polarisSidecar", result.namespace, result.service);
161+
String uuid = UUID.randomUUID().toString();
162+
HttpResponse httpResponse = null;
163+
HttpRequestBase httpRequest = null;
164+
try {
165+
httpRequest = buildUpdateResultHttpRequest(result, uuid);
166+
if (httpRequest != null) {
167+
HttpContext context = HttpClientContext.create();
168+
httpResponse = executeOneRequest(context, httpRequest);
169+
if (httpResponse != null) {
170+
ErrorResponse errorResponse = ErrorResponse.FromString(responseBody(httpResponse));
171+
if (!isRequestSuccessful(httpResponse)) {
172+
if (errorResponse != null) {
173+
LOG.error("update result failed for namespace:{}, service:{}, errorCode:{}, errorMsg:{}, requestId:{}",
174+
result.namespace, result.service, errorResponse.code, errorResponse.info, uuid);
175+
}
176+
} else {
177+
LOG.debug("update result success for namespace:{}, service:{}, errorCode:{}, errorMsg:{}, requestId:{}",
178+
result.namespace, result.service, errorResponse.code, errorResponse.info, uuid);
179+
}
180+
}
181+
}
182+
} catch (Exception ex) {
183+
LOG.error("update result for namespace:{}, service:{}, error:{}, requestId:{}",
184+
result.namespace, result.service, ex.getMessage(), uuid);
185+
} finally {
186+
if (httpRequest != null) {
187+
httpRequest.abort();
188+
}
189+
closeHttpResponseStream(httpResponse);
190+
}
191+
}
192+
193+
private HttpRequestBase buildUpdateResultHttpRequest(CallResultStat result, String uuid) {
194+
String urlString = "http://" + this.polarisSideSarAddress + "/v1/UpdateCallResult";
195+
try {
196+
HttpEntityEnclosingRequestBase httpRequestBase = new HttpPost();
197+
httpRequestBase.setURI(new URI(urlString));
198+
httpRequestBase.addHeader("Request-Id", uuid);
199+
byte[] requestContent = result.JsonString().getBytes();
200+
InputStreamEntity reqEntity =
201+
new InputStreamEntity(new ByteArrayInputStream(requestContent),
202+
requestContent.length);
203+
httpRequestBase.setEntity(reqEntity);
204+
httpRequestBase.setConfig(this.requestConfig);
205+
return httpRequestBase;
206+
} catch (URISyntaxException e) {
207+
LOG.error("build uri failed, url:{}, uuid:{}", urlString, uuid);
208+
return null;
209+
}
210+
}
211+
212+
private HttpResponse executeOneRequest(HttpContext context, HttpRequestBase httpRequest) {
213+
HttpResponse httpResponse;
214+
try {
215+
httpResponse = httpClient.execute(httpRequest, context);
216+
} catch (IOException e) {
217+
httpRequest.abort();
218+
throw ExceptionUtils.createClientException(e);
219+
}
220+
return httpResponse;
221+
}
222+
223+
public static class GetOnInstanceRequest {
224+
public String service;
225+
public String namespace;
226+
227+
public GetOnInstanceRequest(String namespace, String service) {
228+
this.service = service;
229+
this.namespace = namespace;
230+
}
231+
232+
public String JsonString() {
233+
return Jackson.toJsonString(this);
234+
}
235+
}
236+
237+
public static class CallResultStat {
238+
public String namespace;
239+
public String service;
240+
public List<CallResult> callResults;
241+
242+
public String JsonString() {
243+
return Jackson.toJsonString(this);
244+
}
245+
246+
public CallResultStat(String namespace, String service) {
247+
this.namespace = namespace;
248+
this.service = service;
249+
callResults = new ArrayList<>();
250+
}
251+
252+
public void updateCallResult(CallResult item) {
253+
callResults.clear();
254+
callResults.add(item);
255+
}
256+
}
257+
258+
public static class CallResult {
259+
public String id;
260+
public String host;
261+
public int port;
262+
public int retStatus;
263+
public int retCode;
264+
public long callDelay;
265+
}
266+
267+
public static class ErrorResponse {
268+
public int code;
269+
public String info;
270+
271+
public static ErrorResponse FromString(String jsonString) {
272+
return Jackson.fromJsonString(jsonString, ErrorResponse.class);
273+
}
274+
}
275+
276+
public static class Instance {
277+
public String id;
278+
public String host;
279+
public int port;
280+
}
281+
282+
public static class GetOnInstanceResponse {
283+
public List<Instance> instances;
284+
285+
public static GetOnInstanceResponse FromString(String jsonString) {
286+
return Jackson.fromJsonString(jsonString, GetOnInstanceResponse.class);
287+
}
288+
}
289+
}

0 commit comments

Comments
 (0)