Skip to content

Commit

Permalink
[improvement](flink-connector) flush data without multi httpclients (a…
Browse files Browse the repository at this point in the history
…pache#7329) (apache#7450)

reuse http client to flush data
  • Loading branch information
Heng Zhao authored Dec 24, 2021
1 parent c596b03 commit b4ce189
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public DorisDynamicOutputFormat(DorisOptions option,
this.readOptions = readOptions;
this.executionOptions = executionOptions;

Properties streamLoadProp=executionOptions.getStreamLoadProp();
Properties streamLoadProp = executionOptions.getStreamLoadProp();

boolean ifEscape = Boolean.parseBoolean(streamLoadProp.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
if (ifEscape) {
Expand All @@ -121,16 +121,16 @@ public DorisDynamicOutputFormat(DorisOptions option,
}
}

private String escapeString( String s) {
Pattern p = Pattern.compile("\\\\x(\\d{2})");
Matcher m = p.matcher(s);
private String escapeString(String s) {
Pattern p = Pattern.compile("\\\\x(\\d{2})");
Matcher m = p.matcher(s);

StringBuffer buf = new StringBuffer();
while (m.find()) {
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
}
m.appendTail(buf);
return buf.toString();
StringBuffer buf = new StringBuffer();
while (m.find()) {
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1))));
}
m.appendTail(buf);
return buf.toString();
}

@Override
Expand Down Expand Up @@ -220,6 +220,8 @@ public synchronized void close() throws IOException {
} catch (Exception e) {
LOG.warn("Writing records to doris failed.", e);
throw new RuntimeException("Writing records to doris failed.", e);
} finally {
this.dorisStreamLoad.close();
}
}
checkFlushException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ public class DorisStreamLoad implements Serializable {
private String tbl;
private String authEncoding;
private Properties streamLoadProp;
private final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});
private CloseableHttpClient httpClient;

public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) {
this.hostPort = hostPort;
Expand All @@ -74,6 +83,7 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = basicAuthHeader(user, passwd);
this.streamLoadProp = streamLoadProp;
this.httpClient = httpClientBuilder.build();
}

public String getLoadUrlStr() {
Expand All @@ -94,7 +104,7 @@ public void load(String value) throws StreamLoadException {
try {
RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg=String.format("stream load error: %s, see more in %s",respContent.getMessage(),respContent.getErrorURL());
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
throw new StreamLoadException(errMsg);
}
} catch (IOException e) {
Expand All @@ -112,16 +122,7 @@ private LoadResponse loadBatch(String value) {
UUID.randomUUID().toString().replaceAll("-", ""));
}

final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy() {
@Override
protected boolean isRedirectable(String method) {
return true;
}
});

try (CloseableHttpClient client = httpClientBuilder.build()) {
try {
HttpPut put = new HttpPut(loadUrlStr);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding);
Expand All @@ -132,7 +133,7 @@ protected boolean isRedirectable(String method) {
StringEntity entity = new StringEntity(value, "UTF-8");
put.setEntity(entity);

try (CloseableHttpResponse response = client.execute(put)) {
try (CloseableHttpResponse response = httpClient.execute(put)) {
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
String loadResult = "";
Expand All @@ -154,6 +155,17 @@ private String basicAuthHeader(String username, String password) {
return "Basic " + new String(encoded);
}

public void close() throws IOException {
if (null != httpClient) {
try {
httpClient.close();
} catch (IOException e) {
LOG.error("Closing httpClient failed.", e);
throw new RuntimeException("Closing httpClient failed.", e);
}
}
}

public static class LoadResponse {
public int status;
public String respMsg;
Expand Down

0 comments on commit b4ce189

Please sign in to comment.