diff --git a/extension/DataX/doriswriter/doc/doriswriter.md b/extension/DataX/doriswriter/doc/doriswriter.md index ba6ed559079f20..4e058ff70045e3 100644 --- a/extension/DataX/doriswriter/doc/doriswriter.md +++ b/extension/DataX/doriswriter/doc/doriswriter.md @@ -16,7 +16,6 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> - # DorisWriter 插件文档 ## 1 快速介绍 @@ -185,7 +184,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter ```json "loadProps": { "column_separator": "\\x01", - "row_delimiter": "\\x02" + "line_delimiter": "\\x02" } ``` diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java index 0a644b1e20f399..3435b391d58af9 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -141,7 +142,7 @@ private void checkStreamLoadState(String host, String label) throws IOException private byte[] addRows(List rows, int totalBytes) { if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); - byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); for (byte[] row : rows) { bos.put(row); @@ -178,6 +179,8 @@ protected boolean isRedirectable(String method) { }); try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { HttpPut httpPut = new HttpPut(loadUrl); + httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH); + httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING); List cols = options.getColumns(); if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); @@ -189,9 +192,9 @@ protected boolean isRedirectable(String method) { } httpPut.setHeader("Expect", "100-continue"); httpPut.setHeader("label", label); - httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); + httpPut.setHeader("two_phase_commit", "false"); httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); - httpPut.setEntity(new ByteArrayEntity (data)); + httpPut.setEntity(new ByteArrayEntity(data)); httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) { HttpEntity respEntity = getHttpEntity(resp); @@ -247,4 +250,4 @@ private boolean checkConnection(String host) { return false; } } -} +} \ No newline at end of file