Skip to content

Commit

Permalink
[fix](datax)doris writer write error (apache#14276)
Browse files Browse the repository at this point in the history
* doris writer write error
  • Loading branch information
hf200012 authored Nov 18, 2022
1 parent 734525d commit bd5882d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
3 changes: 1 addition & 2 deletions extension/DataX/doriswriter/doc/doriswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# DorisWriter 插件文档

## 1 快速介绍
Expand Down Expand Up @@ -185,7 +184,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
```json
"loadProps": {
"column_separator": "\\x01",
"row_delimiter": "\\x02"
"line_delimiter": "\\x02"
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fastjson.JSON;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
Expand Down Expand Up @@ -141,7 +142,7 @@ private void checkStreamLoadState(String host, String label) throws IOException
private byte[] addRows(List<byte[]> rows, int totalBytes) {
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
for (byte[] row : rows) {
bos.put(row);
Expand Down Expand Up @@ -178,6 +179,8 @@ protected boolean isRedirectable(String method) {
});
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
HttpPut httpPut = new HttpPut(loadUrl);
httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH);
httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
List<String> cols = options.getColumns();
if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
Expand All @@ -189,9 +192,9 @@ protected boolean isRedirectable(String method) {
}
httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("label", label);
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
httpPut.setHeader("two_phase_commit", "false");
httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
httpPut.setEntity(new ByteArrayEntity (data));
httpPut.setEntity(new ByteArrayEntity(data));
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) {
HttpEntity respEntity = getHttpEntity(resp);
Expand Down Expand Up @@ -247,4 +250,4 @@ private boolean checkConnection(String host) {
return false;
}
}
}
}

0 comments on commit bd5882d

Please sign in to comment.