Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -551,6 +552,22 @@ public static Object deserializeBasicField(
String fieldText,
String nullLiteral,
FailureHandler failureHandler) throws Exception {
return deserializeBasicField(fieldName, fieldFormatInfo, fieldText, nullLiteral,
null, null, null, failureHandler);
}

/**
* Deserializes the basic field.
*/
public static Object deserializeBasicField(
String fieldName,
FormatInfo fieldFormatInfo,
String fieldText,
String nullLiteral,
InLongMsgHead head,
InLongMsgBody inLongMsgBody,
String originBody,
FailureHandler failureHandler) throws Exception {
checkState(fieldFormatInfo instanceof BasicFormatInfo);

if (fieldText == null) {
Expand All @@ -574,42 +591,49 @@ public static Object deserializeBasicField(
try {
return ((BasicFormatInfo<?>) fieldFormatInfo).deserialize(fieldText);
} catch (Exception e) {
LOG.warn("Could not properly deserialize the " + "text "
+ fieldText + " for field " + fieldName + ".", e);
if (failureHandler != null) {
failureHandler.onConvertingFieldFailure(fieldName, fieldText, fieldFormatInfo, e);
failureHandler.onConvertingFieldFailure(fieldName, fieldText, fieldFormatInfo,
head, inLongMsgBody, originBody, e);
} else {
LOG.warn("Could not properly deserialize the" + "text: {},for field:{}"
+ ". predefinedFields = {},fields = {}, attr={}, originBody={}",
fieldText, fieldName,
head == null ? "" : head.getPredefinedFields(),
inLongMsgBody == null ? "" : inLongMsgBody.getFields(),
head == null ? "" : head.getAttributes(),
originBody == null ? (inLongMsgBody == null ? "" : new String(inLongMsgBody.getDataBytes()))
: originBody,
e);
}
}
return null;
}

public static long getFormatValueLength(FormatInfo fieldFormatInfo, String fieldText) {
if (fieldFormatInfo instanceof BooleanFormatInfo) {
return 4;
} else if (fieldFormatInfo instanceof ByteFormatInfo) {
return 4;
} else if (fieldFormatInfo instanceof BooleanFormatInfo) {
return 4;
} else if (fieldFormatInfo instanceof ShortFormatInfo) {
return 4;
} else if (fieldFormatInfo instanceof IntFormatInfo) {
return 4;
if (fieldFormatInfo instanceof StringFormatInfo) {
return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
} else if (fieldFormatInfo instanceof LongFormatInfo) {
return 8;
} else if (fieldFormatInfo instanceof FloatFormatInfo) {
return 8;
return 24;
} else if (fieldFormatInfo instanceof IntFormatInfo) {
return 16;
} else if (fieldFormatInfo instanceof DoubleFormatInfo) {
return 8;
} else if (fieldFormatInfo instanceof DecimalFormatInfo) {
return 8;
return 24;
} else if (fieldFormatInfo instanceof FloatFormatInfo) {
return 16;
} else if (fieldFormatInfo instanceof DateFormatInfo
|| fieldFormatInfo instanceof TimeFormatInfo
|| fieldFormatInfo instanceof TimestampFormatInfo) {
return 8;
} else if (StringUtils.isNotEmpty(fieldText)) {
return fieldText.length();
return 24;
} else if (fieldFormatInfo instanceof BooleanFormatInfo) {
return 16;
} else if (fieldFormatInfo instanceof ByteFormatInfo) {
return 16;
} else if (fieldFormatInfo instanceof ShortFormatInfo) {
return 16;
} else if (fieldFormatInfo instanceof DecimalFormatInfo) {
return 24;
}
return 0L;
return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,34 @@ default void onParsingMsgFailure(Object msg, Exception exception) throws Excepti
void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
Exception exception) throws Exception;

/**
* This method is called when there is a failure occurred while converting any field to row.
*
* @param fieldName the filed name
* @param fieldText the filed test
* @param formatInfo the filed target type info
* @param exception the thrown exception
* @param head the predefined fields
* @param inLongMsgBody the fields
* @param originBody the origin body
* @throws Exception the exception
*/
default void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
InLongMsgHead head, InLongMsgBody inLongMsgBody, String originBody,
Exception exception) throws Exception {
onConvertingFieldFailure(fieldName, fieldText, formatInfo, exception);
}

/**
* This method is called when there is a failure occurred while field num error.
*
* @param predefinedFields predefined fields
* @param originBodyBytes origin body bytes
* @param originBody origin body
* @param actualNumFields actual number of fields
* @param fieldNameSize expected number of fields
*/
default void onFieldNumError(String predefinedFields, byte[] originBodyBytes, String originBody,
int actualNumFields, int fieldNameSize) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.formats.inlongmsg;

import lombok.Data;
import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
Expand All @@ -25,14 +28,20 @@
/**
* The body deserialized from {@link InLongMsgBody}.
*/
@Data
public class InLongMsgBody implements Serializable {

private static final long serialVersionUID = 1L;

/**
* The body of the record.
*/
private final byte[] data;
private final byte[] dataBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be a compatibility issue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InLongMsgBody is used internally when parsing messages, and there are no compatibility issues.


/**
* The body of the record.
*/
private final String data;

/**
* The interface of the record.
Expand All @@ -50,32 +59,18 @@ public class InLongMsgBody implements Serializable {
private final Map<String, String> entries;

public InLongMsgBody(
byte[] data,
byte[] dataBytes,
String data,
String streamId,
List<String> fields,
Map<String, String> entries) {
this.dataBytes = dataBytes;
this.data = data;
this.streamId = streamId;
this.fields = fields;
this.entries = entries;
}

public byte[] getData() {
return data;
}

public String getStreamId() {
return streamId;
}

public List<String> getFields() {
return fields;
}

public Map<String, String> getEntries() {
return entries;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -87,17 +82,22 @@ public boolean equals(Object o) {
}

InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
return Arrays.equals(data, inLongMsgBody.data);
return StringUtils.equals(data, inLongMsgBody.data)
&& Arrays.equals(dataBytes, inLongMsgBody.dataBytes);
}

@Override
public int hashCode() {
return Arrays.hashCode(data);
if (dataBytes != null) {
return Arrays.hashCode(dataBytes);
}
return data == null ? super.hashCode() : data.hashCode();
}

@Override
public String toString() {
return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", streamId='" + streamId + '\''
return "InLongMsgBody{" + "data=" + (data == null ? new String(dataBytes) : data)
+ ", streamId='" + streamId + '\''
+ ", fields=" + fields + ", entries=" + entries + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,27 @@

package org.apache.inlong.sort.formats.inlongmsg;

import lombok.Data;

import java.io.Serializable;
import java.util.List;

/**
* The body deserialized from {@link InLongMsgWrap}.
*/
@Data
public class InLongMsgWrap implements Serializable {

private final InLongMsgHead inLongMsgHead;

private final List<InLongMsgBody> inLongMsgBodyList;

public InLongMsgWrap(InLongMsgHead inLongMsgHead, List<InLongMsgBody> inLongMsgBodyList) {
private final byte[] originBody;

public InLongMsgWrap(InLongMsgHead inLongMsgHead,
List<InLongMsgBody> inLongMsgBodyList, byte[] originBody) {
this.inLongMsgHead = inLongMsgHead;
this.inLongMsgBodyList = inLongMsgBodyList;
}

public InLongMsgHead getInLongMsgHead() {
return inLongMsgHead;
}

public List<InLongMsgBody> getInLongMsgBodyList() {
return inLongMsgBodyList;
this.originBody = originBody;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) throws E
attributesFieldName,
metadataFieldName,
head.getAttributes(),
body.getData(),
body.getDataBytes(),
includeUpdateBefore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public static InLongMsgBody parseBody(byte[] bytes) {
return new InLongMsgBody(
bytes,
null,
null,
Collections.emptyList(),
Collections.emptyMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public static List<InLongMsgBody> parseBodyList(
// Only parsed fields will be used by downstream, so it's safe to leave
// the other parameters empty.
return new InLongMsgBody(
null,
bytes,
bodyStr,
null,
Arrays.asList(line),
Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public static List<InLongMsgBody> parseBodyList(
return list.stream().map((line) -> {
return new InLongMsgBody(
bytes,
text,
null,
Collections.emptyList(),
line);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static InLongMsgBody parseBody(
List<String> fields =
Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), segments.length).collect(Collectors.toList());

return new InLongMsgBody(bytes, streamId, fields, Collections.emptyMap());
return new InLongMsgBody(bytes, text, streamId, fields, Collections.emptyMap());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static InLongMsgBody parseBody(
entries = Collections.emptyMap();
}

return new InLongMsgBody(bytes, streamId, Collections.emptyList(), entries);
return new InLongMsgBody(bytes, text, streamId, Collections.emptyList(), entries);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -47,8 +46,6 @@ public abstract class AbstractInLongMsgFormatDeserializer implements ResultTypeQ

private static final Logger LOG = LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);

protected long lastPrintTimestamp = 0L;
protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
protected int fieldNameSize = 0;

protected FailureHandler failureHandler;
Expand Down Expand Up @@ -84,15 +81,6 @@ public AbstractInLongMsgFormatDeserializer(@Nonnull FailureHandler failureHandle

protected abstract List<FormatMsg> convertFormatMsgList(InLongMsgHead head, InLongMsgBody body) throws Exception;

protected boolean needPrint() {
long now = Instant.now().toEpochMilli();
if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
lastPrintTimestamp = now;
return true;
}
return false;
}

public void flatMap(
byte[] bytes,
Collector<RowData> collector) throws Exception {
Expand Down Expand Up @@ -160,7 +148,7 @@ public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
continue;
}

result.add(new InLongMsgWrap(head, bodyList));
result.add(new InLongMsgWrap(head, bodyList, bodyBytes));
}
}

Expand Down Expand Up @@ -234,4 +222,21 @@ public int hashCode() {
public void setFormatMetricGroup(FormatMetricGroup formatMetricGroup) {
this.formatMetricGroup = formatMetricGroup;
}

protected void checkFieldNameSize(InLongMsgHead head, InLongMsgBody body,
int actualNumFields, int fieldNameSize,
FailureHandler failureHandler) {
if (actualNumFields != fieldNameSize) {
if (failureHandler != null) {
failureHandler.onFieldNumError(StringUtils.join(head.getPredefinedFields(), ","),
body.getDataBytes(), body.getData(),
actualNumFields, fieldNameSize);
} else {
LOG.warn("The number of fields mismatches: {}"
+ ",expected, but was {}. origin text: {}, PredefinedFields: {}",
fieldNameSize, actualNumFields, body,
StringUtils.join(head.getPredefinedFields(), ","));
}
}
}
}
Loading
Loading