Skip to content

Commit

Permalink
1.13版本Doris连接,作为来源隐藏会收集到隐藏位置“__DORIS_DELETE_”,读取数据会报错 (DataLinkDC#717)
Browse files Browse the repository at this point in the history
* 1.13版本Doris连接,作为来源隐藏会收集到隐藏位置“__DORIS_DELETE_”,读取数据会报错

* build 修改

* doris 连接器版本修改为0.6.6-SNAPSHOT

Co-authored-by: zhaoxiaojiang <zhaoxiaojiang@aliyun.com>
  • Loading branch information
DeepMakerAi and zhaoxiaojiang authored Jul 15, 2022
1 parent 5950cd6 commit f3b8396
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 0 deletions.
54 changes: 54 additions & 0 deletions dlink-connectors/dlink-connector-doris-1.13/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dlink-connectors</artifactId>
<groupId>com.dlink</groupId>
<version>0.6.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dlink-connector-doris-1.13</artifactId>

<properties>
<java.version>1.8</java.version>
<flink.version>1.13.6</flink.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.13_2.12</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.serialization;

import org.apache.doris.shaded.org.apache.arrow.memory.RootAllocator;

import org.apache.doris.shaded.org.apache.arrow.vector.BigIntVector;
import org.apache.doris.shaded.org.apache.arrow.vector.BitVector;
import org.apache.doris.shaded.org.apache.arrow.vector.DecimalVector;
import org.apache.doris.shaded.org.apache.arrow.vector.FieldVector;
import org.apache.doris.shaded.org.apache.arrow.vector.Float4Vector;
import org.apache.doris.shaded.org.apache.arrow.vector.Float8Vector;
import org.apache.doris.shaded.org.apache.arrow.vector.IntVector;
import org.apache.doris.shaded.org.apache.arrow.vector.SmallIntVector;
import org.apache.doris.shaded.org.apache.arrow.vector.TinyIntVector;
import org.apache.doris.shaded.org.apache.arrow.vector.VarBinaryVector;
import org.apache.doris.shaded.org.apache.arrow.vector.VarCharVector;
import org.apache.doris.shaded.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.doris.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.doris.shaded.org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.thrift.TScanBatchResult;

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;

/**
* row batch data container.
*/
public class RowBatch {
private static Logger logger = LoggerFactory.getLogger(RowBatch.class);

public static class Row {
private List<Object> cols;

Row(int colCount) {
this.cols = new ArrayList<>(colCount);
}

public List<Object> getCols() {
return cols;
}

public void put(Object o) {
cols.add(o);
}
}

// offset for iterate the rowBatch
private int offsetInRowBatch = 0;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
private List<Row> rowBatch = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
private final Schema schema;

public List<Row> getRowBatch() {
return rowBatch;
}

public RowBatch(TScanBatchResult nextResult, Schema schema) {
this.schema = schema;
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(nextResult.getRows()),
rootAllocator
);
this.offsetInRowBatch = 0;
}

public RowBatch readArrow() throws DorisException {
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != schema.size()) {
logger.error("Schema size '{}' is not equal to arrow field size '{}'.",
fieldVectors.size(), schema.size());
throw new DorisException("Load Doris data failed, schema size of fetch data is wrong.");
}
if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
logger.debug("One batch in arrow has no data.");
continue;
}
rowCountInOneBatch = root.getRowCount();
// init the rowBatch
for (int i = 0; i < rowCountInOneBatch; ++i) {
rowBatch.add(new Row(fieldVectors.size()));
}
convertArrowToRowBatch();
readRowCount += root.getRowCount();
}
return this;
} catch (Exception e) {
logger.error("Read Doris Data failed because: ", e);
throw new DorisException(e.getMessage());
} finally {
close();
}
}

public boolean hasNext() {
if (offsetInRowBatch < readRowCount) {
return true;
}
return false;
}

private void addValueToRow(int rowIndex, Object obj) {
if (rowIndex > rowCountInOneBatch) {
String errMsg = "Get row offset: " + rowIndex + " larger than row size: " +
rowCountInOneBatch;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
rowBatch.get(readRowCount + rowIndex).put(obj);
}

public void convertArrowToRowBatch() throws DorisException {
try {
for (int col = 0; col < fieldVectors.size(); col++) {
FieldVector curFieldVector = fieldVectors.get(col);
Types.MinorType mt = curFieldVector.getMinorType();
if ("__DORIS_DELETE_SIGN__".equals(schema.get(col).getName())) {
continue;
}
final String currentType = schema.get(col).getType();
switch (currentType) {
case "NULL_TYPE":
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
addValueToRow(rowIndex, null);
}
break;
case "BOOLEAN":
Preconditions.checkArgument(mt.equals(Types.MinorType.BIT),
typeMismatchMessage(currentType, mt));
BitVector bitVector = (BitVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
addValueToRow(rowIndex, fieldValue);
}
break;
case "TINYINT":
Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT),
typeMismatchMessage(currentType, mt));
TinyIntVector tinyIntVector = (TinyIntVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "SMALLINT":
Preconditions.checkArgument(mt.equals(Types.MinorType.SMALLINT),
typeMismatchMessage(currentType, mt));
SmallIntVector smallIntVector = (SmallIntVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "INT":
Preconditions.checkArgument(mt.equals(Types.MinorType.INT),
typeMismatchMessage(currentType, mt));
IntVector intVector = (IntVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "BIGINT":

Preconditions.checkArgument(mt.equals(Types.MinorType.BIGINT),
typeMismatchMessage(currentType, mt));
BigIntVector bigIntVector = (BigIntVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "FLOAT":
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4),
typeMismatchMessage(currentType, mt));
Float4Vector float4Vector = (Float4Vector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "TIME":
case "DOUBLE":
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT8),
typeMismatchMessage(currentType, mt));
Float8Vector float8Vector = (Float8Vector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "BINARY":
Preconditions.checkArgument(mt.equals(Types.MinorType.VARBINARY),
typeMismatchMessage(currentType, mt));
VarBinaryVector varBinaryVector = (VarBinaryVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
}
break;
case "DECIMAL":
case "DECIMALV2":
Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL),
typeMismatchMessage(currentType, mt));
DecimalVector decimalVector = (DecimalVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
if (decimalVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
continue;
}
BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
addValueToRow(rowIndex, DecimalData.fromBigDecimal(value, value.precision(), value.scale()));
}
break;
case "DATE":
case "LARGEINT":
case "DATETIME":
case "CHAR":
case "VARCHAR":
case "STRING":
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
typeMismatchMessage(currentType, mt));
VarCharVector varCharVector = (VarCharVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
if (varCharVector.isNull(rowIndex)) {
addValueToRow(rowIndex, null);
continue;
}
String value = new String(varCharVector.get(rowIndex));
addValueToRow(rowIndex, StringData.fromString(value));
}
break;
default:
String errMsg = "Unsupported type " + schema.get(col).getType();
logger.error(errMsg);
throw new DorisException(errMsg);
}
}
} catch (Exception e) {
close();
throw e;
}
}

public List<Object> next() throws DorisException {
if (!hasNext()) {
String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
return rowBatch.get(offsetInRowBatch++).getCols();
}

private String typeMismatchMessage(final String sparkType, final Types.MinorType arrowType) {
final String messageTemplate = "FLINK type is %1$s, but arrow type is %2$s.";
return String.format(messageTemplate, sparkType, arrowType.name());
}

public int getReadRowCount() {
return readRowCount;
}

public void close() {
try {
if (arrowStreamReader != null) {
arrowStreamReader.close();
}
if (rootAllocator != null) {
rootAllocator.close();
}
} catch (IOException ioe) {
// do nothing
}
}
}
1 change: 1 addition & 0 deletions dlink-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<module>dlink-connector-jdbc-1.14</module>
<module>dlink-connector-phoenix-1.13</module>
<module>dlink-connector-phoenix-1.14</module>
<module>dlink-connector-doris-1.13</module>
</modules>
<artifactId>dlink-connectors</artifactId>
</project>

0 comments on commit f3b8396

Please sign in to comment.