Skip to content

Commit

Permalink
bugfix: Support serialization for dm.jdbc.driver.DmdbTimestamp (#6701)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyl2008dsg authored Aug 11, 2024
1 parent 6753f35 commit 35820c0
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 0 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6707](https://github.com/apache/incubator-seata/pull/6707)] fix readonly branch commit errors in Oracle XA transactions
- [[#6711](https://github.com/apache/incubator-seata/pull/6711)] fix dameng rollback info un compress fail
- [[#6714](https://github.com/apache/incubator-seata/pull/6714)] fix dameng delete undo fail
- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] fix support serialization for dm.jdbc.driver.DmdbTimestamp


### optimize:
Expand Down Expand Up @@ -76,6 +77,7 @@ Thanks to these contributors for their code commits. Please report an unintended
- [liuqiufeng](https://github.com/liuqiufeng)
- [caohdgege](https://github.com/caohdgege)
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)


Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 3 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [[#6707](https://github.com/apache/incubator-seata/pull/6707)] 修复Oracle XA事务中只读分支提交出错的问题
- [[#6711](https://github.com/apache/incubator-seata/pull/6711)] 修复达梦数据库的getRollbackInfo没有解压缩的问题
- [[#6714](https://github.com/apache/incubator-seata/pull/6714)] 修复达梦数据库的delete sql回滚失败的问题
- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] 修复达梦数据库的对dm.jdbc.driver.DmdbTimestamp的支持

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
Expand Down Expand Up @@ -80,6 +81,8 @@
- [liuqiufeng](https://github.com/liuqiufeng)
- [caohdgege](https://github.com/caohdgege)
- [imashimaro](https://github.com/hmj776521114)
- [lyl2008dsg](https://github.com/lyl2008dsg)



同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
Expand Down
5 changes: 5 additions & 0 deletions rm-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,10 @@
<artifactId>commons-logging</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -71,6 +73,10 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize {

private static final Logger LOGGER = LoggerFactory.getLogger(JacksonUndoLogParser.class);

private static final String DM_JDBC_DRIVER_DMDB_TIMESTAMP = "dm.jdbc.driver.DmdbTimestamp";

private static final String VALUE_OF = "valueOf";

/**
* the zoneId for LocalDateTime
*/
Expand Down Expand Up @@ -120,6 +126,16 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize {
*/
private final JsonDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer();

/**
* customize serializer for dm.jdbc.driver.DmdbTimestamp
*/
private final JsonSerializer dmdbTimestampSerializer = new DmdbTimestampSerializer();

/**
* customize deserializer for dm.jdbc.driver.DmdbTimestamp
*/
private final JsonDeserializer dmdbTimestampDeserializer = new DmdbTimestampDeserializer();

@Override
public void init() {
try {
Expand Down Expand Up @@ -152,12 +168,25 @@ public void init() {
module.addDeserializer(SerialClob.class, clobDeserializer);
module.addSerializer(LocalDateTime.class, localDateTimeSerializer);
module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer);
registerDmdbTimestampModuleIfPresent();
mapper.registerModule(module);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
mapper.enable(MapperFeature.PROPAGATE_TRANSIENT_MARKER);
}

private void registerDmdbTimestampModuleIfPresent() {
try {
Class<?> dmdbTimestampClass = Class.forName(DM_JDBC_DRIVER_DMDB_TIMESTAMP);
module.addSerializer(dmdbTimestampClass, dmdbTimestampSerializer);
module.addDeserializer(dmdbTimestampClass, dmdbTimestampDeserializer);
} catch (ClassNotFoundException e) {
// If the DmdbTimestamp class is not found, the serializers and deserializers will not be registered.
// This is expected behavior since not all environments will have the dm.jdbc.driver.DmdbTimestamp class.
// Therefore, no error log is recorded to avoid confusion for users without the dm driver.
}
}

@Override
public String getName() {
return NAME;
Expand Down Expand Up @@ -395,6 +424,98 @@ public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) thro
}
}

private static class DmdbTimestampSerializer extends JsonSerializer<Object> {

private static final String TO_INSTANT = "toInstant";
private static final String GET_NANOS = "getNanos";

@Override
public void serializeWithType(Object dmdbTimestamp, JsonGenerator gen, SerializerProvider serializers, TypeSerializer typeSer) throws IOException {
JsonToken valueShape = JsonToken.VALUE_NUMBER_INT;
int nanos = getNanos(dmdbTimestamp);
if (nanos % 1000000 > 0) {
valueShape = JsonToken.START_ARRAY;
}

WritableTypeId typeIdDef = typeSer.writeTypePrefix(gen, typeSer.typeId(dmdbTimestamp, valueShape));
serialize(dmdbTimestamp, gen, serializers);
typeSer.writeTypeSuffix(gen, typeIdDef);
}

@Override
public void serialize(Object dmdbTimestamp, JsonGenerator gen, SerializerProvider serializers) {
try {
Instant instant = getInstant(dmdbTimestamp);
gen.writeNumber(instant.toEpochMilli());
// if has microseconds, serialized as an array, write the nano to the array
int nanos = instant.getNano();
if (nanos % 1000000 > 0) {
gen.writeNumber(nanos);
}
} catch (Exception e) {
LOGGER.error("serialize dm.jdbc.driver.DmdbTimestamp error : {}", e.getMessage(), e);
}
}

private int getNanos(Object dmdbTimestamp) throws IOException {
try {
Method getNanosMethod = dmdbTimestamp.getClass().getMethod(GET_NANOS);
return (int) getNanosMethod.invoke(dmdbTimestamp);
} catch (Exception e) {
throw new IOException("Error getting nanos value from DmdbTimestamp", e);
}
}

private Instant getInstant(Object dmdbTimestamp) throws IOException {
try {
Method toInstantMethod = dmdbTimestamp.getClass().getMethod(TO_INSTANT);
return (Instant) toInstantMethod.invoke(dmdbTimestamp);
} catch (Exception e) {
throw new IOException("Error getting instant from DmdbTimestamp", e);
}
}
}

public class DmdbTimestampDeserializer extends JsonDeserializer<Object> {

@Override
public Object deserialize(JsonParser p, DeserializationContext ctxt) {
try {
Instant instant = parseInstant(p);
return createDmdbTimestamp(instant);
} catch (Exception e) {
LOGGER.error("deserialize dm.jdbc.driver.DmdbTimestamp error : {}", e.getMessage(), e);
}
return null;
}

private Instant parseInstant(JsonParser p) throws IOException {
try {
if (p.isExpectedStartArrayToken()) {
ArrayNode arrayNode = p.getCodec().readTree(p);
long timestamp = arrayNode.get(0).asLong();
Instant instant = Instant.ofEpochMilli(timestamp);
if (arrayNode.size() > 1) {
int nano = arrayNode.get(1).asInt();
instant = instant.plusNanos(nano % 1000000);
}
return instant;
} else {
long timestamp = p.getLongValue();
return Instant.ofEpochMilli(timestamp);
}
} catch (IOException e) {
throw new IOException("Error parsing Instant from JSON", e);
}
}

private Object createDmdbTimestamp(Instant instant) throws Exception {
Class<?> dmdbTimestampClass = Class.forName(DM_JDBC_DRIVER_DMDB_TIMESTAMP);
Method valueOfMethod = dmdbTimestampClass.getMethod(VALUE_OF, ZonedDateTime.class);
return valueOfMethod.invoke(null, instant.atZone(zoneId));
}
}

/**
* set zone id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
package org.apache.seata.rm.datasource.undo.parser;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.sql.JDBCType;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import javax.sql.rowset.serial.SerialBlob;
import javax.sql.rowset.serial.SerialClob;

Expand All @@ -32,6 +38,7 @@
import org.apache.seata.rm.datasource.undo.BaseUndoLogParserTest;
import org.apache.seata.rm.datasource.undo.UndoLogParser;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;


Expand Down Expand Up @@ -99,6 +106,46 @@ public void encode() throws NoSuchFieldException, IllegalAccessException, IOExce
bytes = mapper.writeValueAsBytes(field);
sameField = mapper.readValue(bytes, Field.class);
Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult());

}

@Test
public void testSerializeAndDeserializeDmdbTimestamp() throws NoSuchFieldException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IOException {
java.lang.reflect.Field reflectField = parser.getClass().getDeclaredField("mapper");
reflectField.setAccessible(true);
ObjectMapper mapper = (ObjectMapper)reflectField.get(parser);

Class<?> dmdbTimestampClass = Class.forName("dm.jdbc.driver.DmdbTimestamp");
Method valueOfMethod = dmdbTimestampClass.getMethod("valueOf", ZonedDateTime.class);
Method valueOfDateMethod = dmdbTimestampClass.getMethod("valueOf", Date.class);

Object dmdbTimestamp = valueOfMethod.invoke(null, Instant.ofEpochMilli(1721985847000L).atZone(ZoneId.systemDefault()));
Field field = new Field("dmdb_timestamp", JDBCType.TIMESTAMP.getVendorTypeNumber(), dmdbTimestamp);
byte[] bytes = mapper.writeValueAsBytes(field);
Field sameField = mapper.readValue(bytes, Field.class);
Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult());

Object originalTimestamp = valueOfDateMethod.invoke(null, new Date(1721985847000L));
field = new Field("dmdb_timestamp_type2", JDBCType.TIMESTAMP.getVendorTypeNumber(), originalTimestamp);
bytes = mapper.writeValueAsBytes(field);
sameField = mapper.readValue(bytes, Field.class);
Assertions.assertFalse(DataCompareUtils.isFieldEquals(field, sameField).getResult());

Object dmdbTimestampnanos = valueOfMethod.invoke(null, Instant.ofEpochMilli(1721985847000L).plusNanos(12345L).atZone(ZoneId.systemDefault()));
field = new Field("dmdb_timestamp_nanos", JDBCType.TIMESTAMP.getVendorTypeNumber(), dmdbTimestampnanos);
bytes = mapper.writeValueAsBytes(field);
sameField = mapper.readValue(bytes, Field.class);
Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult());

}

private boolean checkClassExists(String className) {
try {
Class.forName(className);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}

@Override
Expand Down

0 comments on commit 35820c0

Please sign in to comment.