Skip to content

Commit

Permalink
[improvement](spark-connector) Stream load http exception handling (a…
Browse files Browse the repository at this point in the history
…pache#7514)

Stream load http exception handling
  • Loading branch information
hf200012 authored Jan 9, 2022
1 parent 3a8a85b commit 9aaa3f6
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ public void load(List<List<Object>> rows) throws StreamLoadException {
public void load(String value) throws StreamLoadException {
LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}",loadResponse);
if(loadResponse.status != 200){
throw new StreamLoadException("stream load error: " + loadResponse.respContent);
}else{
LOG.info("Streamload Response:{}",loadResponse);
ObjectMapper obj = new ObjectMapper();
try {
RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
Expand All @@ -182,6 +182,7 @@ private LoadResponse loadBatch(String value) {

HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
int status = -1;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
Expand All @@ -191,7 +192,7 @@ private LoadResponse loadBatch(String value) {
bos.close();

// get respond
int status = beConn.getResponseCode();
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
Expand All @@ -204,9 +205,9 @@ private LoadResponse loadBatch(String value) {

} catch (Exception e) {
e.printStackTrace();
String err = "failed to execute spark streamload with label: " + label;
String err = "http request exception,load url : "+loadUrlStr+",failed to execute spark streamload with label: " + label;
LOG.warn(err, e);
return new LoadResponse(-1, e.getMessage(), err);
return new LoadResponse(status, e.getMessage(), err);
} finally {
if (feConn != null) {
feConn.disconnect();
Expand Down

0 comments on commit 9aaa3f6

Please sign in to comment.