diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 3c9990871da..2bd7f9c9c9b 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -20,6 +20,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6707](https://github.com/apache/incubator-seata/pull/6707)] fix readonly branch commit errors in Oracle XA transactions - [[#6711](https://github.com/apache/incubator-seata/pull/6711)] fix dameng rollback info un compress fail - [[#6714](https://github.com/apache/incubator-seata/pull/6714)] fix dameng delete undo fail +- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] fix support serialization for dm.jdbc.driver.DmdbTimestamp ### optimize: @@ -76,6 +77,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [liuqiufeng](https://github.com/liuqiufeng) - [caohdgege](https://github.com/caohdgege) - [imashimaro](https://github.com/hmj776521114) +- [lyl2008dsg](https://github.com/lyl2008dsg) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 24c1ac9be3e..7f26a5b821b 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -22,6 +22,7 @@ - [[#6707](https://github.com/apache/incubator-seata/pull/6707)] 修复Oracle XA事务中只读分支提交出错的问题 - [[#6711](https://github.com/apache/incubator-seata/pull/6711)] 修复达梦数据库的getRollbackInfo没有解压缩的问题 - [[#6714](https://github.com/apache/incubator-seata/pull/6714)] 修复达梦数据库的delete sql回滚失败的问题 +- [[#6701](https://github.com/apache/incubator-seata/pull/6728)] 修复达梦数据库的对dm.jdbc.driver.DmdbTimestamp的支持 ### optimize: - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池 @@ -80,6 +81,8 @@ - [liuqiufeng](https://github.com/liuqiufeng) - [caohdgege](https://github.com/caohdgege) - [imashimaro](https://github.com/hmj776521114) +- [lyl2008dsg](https://github.com/lyl2008dsg) + 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/rm-datasource/pom.xml b/rm-datasource/pom.xml index 93f3c5a9e92..ed466e2cdf1 100644 --- a/rm-datasource/pom.xml +++ b/rm-datasource/pom.xml @@ -132,5 +132,10 @@ commons-logging true + + com.dameng + DmJdbcDriver18 + test + diff --git a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParser.java b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParser.java index 700b5d59baf..e6e6596dfd7 100644 --- a/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParser.java +++ b/rm-datasource/src/main/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParser.java @@ -18,11 +18,13 @@ import java.io.IOException; import java.io.Reader; +import java.lang.reflect.Method; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -71,6 +73,10 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize { private static final Logger LOGGER = LoggerFactory.getLogger(JacksonUndoLogParser.class); + private static final String DM_JDBC_DRIVER_DMDB_TIMESTAMP = "dm.jdbc.driver.DmdbTimestamp"; + + private static final String VALUE_OF = "valueOf"; + /** * the zoneId for LocalDateTime */ @@ -120,6 +126,16 @@ public class JacksonUndoLogParser implements UndoLogParser, Initialize { */ private final JsonDeserializer localDateTimeDeserializer = new LocalDateTimeDeserializer(); + /** + * customize serializer for dm.jdbc.driver.DmdbTimestamp + */ + private final JsonSerializer dmdbTimestampSerializer = new DmdbTimestampSerializer(); + + /** + * customize deserializer for dm.jdbc.driver.DmdbTimestamp + */ + private final JsonDeserializer dmdbTimestampDeserializer = new DmdbTimestampDeserializer(); + @Override public void init() { try { @@ -152,12 +168,25 @@ public void init() { module.addDeserializer(SerialClob.class, clobDeserializer); module.addSerializer(LocalDateTime.class, localDateTimeSerializer); module.addDeserializer(LocalDateTime.class, localDateTimeDeserializer); + registerDmdbTimestampModuleIfPresent(); mapper.registerModule(module); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); mapper.enable(MapperFeature.PROPAGATE_TRANSIENT_MARKER); } + private void registerDmdbTimestampModuleIfPresent() { + try { + Class dmdbTimestampClass = Class.forName(DM_JDBC_DRIVER_DMDB_TIMESTAMP); + module.addSerializer(dmdbTimestampClass, dmdbTimestampSerializer); + module.addDeserializer(dmdbTimestampClass, dmdbTimestampDeserializer); + } catch (ClassNotFoundException e) { + // If the DmdbTimestamp class is not found, the serializers and deserializers will not be registered. + // This is expected behavior since not all environments will have the dm.jdbc.driver.DmdbTimestamp class. + // Therefore, no error log is recorded to avoid confusion for users without the dm driver. + } + } + @Override public String getName() { return NAME; @@ -395,6 +424,98 @@ public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) thro } } + private static class DmdbTimestampSerializer extends JsonSerializer { + + private static final String TO_INSTANT = "toInstant"; + private static final String GET_NANOS = "getNanos"; + + @Override + public void serializeWithType(Object dmdbTimestamp, JsonGenerator gen, SerializerProvider serializers, TypeSerializer typeSer) throws IOException { + JsonToken valueShape = JsonToken.VALUE_NUMBER_INT; + int nanos = getNanos(dmdbTimestamp); + if (nanos % 1000000 > 0) { + valueShape = JsonToken.START_ARRAY; + } + + WritableTypeId typeIdDef = typeSer.writeTypePrefix(gen, typeSer.typeId(dmdbTimestamp, valueShape)); + serialize(dmdbTimestamp, gen, serializers); + typeSer.writeTypeSuffix(gen, typeIdDef); + } + + @Override + public void serialize(Object dmdbTimestamp, JsonGenerator gen, SerializerProvider serializers) { + try { + Instant instant = getInstant(dmdbTimestamp); + gen.writeNumber(instant.toEpochMilli()); + // if has microseconds, serialized as an array, write the nano to the array + int nanos = instant.getNano(); + if (nanos % 1000000 > 0) { + gen.writeNumber(nanos); + } + } catch (Exception e) { + LOGGER.error("serialize dm.jdbc.driver.DmdbTimestamp error : {}", e.getMessage(), e); + } + } + + private int getNanos(Object dmdbTimestamp) throws IOException { + try { + Method getNanosMethod = dmdbTimestamp.getClass().getMethod(GET_NANOS); + return (int) getNanosMethod.invoke(dmdbTimestamp); + } catch (Exception e) { + throw new IOException("Error getting nanos value from DmdbTimestamp", e); + } + } + + private Instant getInstant(Object dmdbTimestamp) throws IOException { + try { + Method toInstantMethod = dmdbTimestamp.getClass().getMethod(TO_INSTANT); + return (Instant) toInstantMethod.invoke(dmdbTimestamp); + } catch (Exception e) { + throw new IOException("Error getting instant from DmdbTimestamp", e); + } + } + } + + public class DmdbTimestampDeserializer extends JsonDeserializer { + + @Override + public Object deserialize(JsonParser p, DeserializationContext ctxt) { + try { + Instant instant = parseInstant(p); + return createDmdbTimestamp(instant); + } catch (Exception e) { + LOGGER.error("deserialize dm.jdbc.driver.DmdbTimestamp error : {}", e.getMessage(), e); + } + return null; + } + + private Instant parseInstant(JsonParser p) throws IOException { + try { + if (p.isExpectedStartArrayToken()) { + ArrayNode arrayNode = p.getCodec().readTree(p); + long timestamp = arrayNode.get(0).asLong(); + Instant instant = Instant.ofEpochMilli(timestamp); + if (arrayNode.size() > 1) { + int nano = arrayNode.get(1).asInt(); + instant = instant.plusNanos(nano % 1000000); + } + return instant; + } else { + long timestamp = p.getLongValue(); + return Instant.ofEpochMilli(timestamp); + } + } catch (IOException e) { + throw new IOException("Error parsing Instant from JSON", e); + } + } + + private Object createDmdbTimestamp(Instant instant) throws Exception { + Class dmdbTimestampClass = Class.forName(DM_JDBC_DRIVER_DMDB_TIMESTAMP); + Method valueOfMethod = dmdbTimestampClass.getMethod(VALUE_OF, ZonedDateTime.class); + return valueOfMethod.invoke(null, instant.atZone(zoneId)); + } + } + /** * set zone id * diff --git a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParserTest.java b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParserTest.java index da3d7877e55..acbe39d077f 100644 --- a/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParserTest.java +++ b/rm-datasource/src/test/java/org/apache/seata/rm/datasource/undo/parser/JacksonUndoLogParserTest.java @@ -17,11 +17,17 @@ package org.apache.seata.rm.datasource.undo.parser; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.sql.JDBCType; import java.sql.SQLException; import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Date; import javax.sql.rowset.serial.SerialBlob; import javax.sql.rowset.serial.SerialClob; @@ -32,6 +38,7 @@ import org.apache.seata.rm.datasource.undo.BaseUndoLogParserTest; import org.apache.seata.rm.datasource.undo.UndoLogParser; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; @@ -99,6 +106,46 @@ public void encode() throws NoSuchFieldException, IllegalAccessException, IOExce bytes = mapper.writeValueAsBytes(field); sameField = mapper.readValue(bytes, Field.class); Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult()); + + } + + @Test + public void testSerializeAndDeserializeDmdbTimestamp() throws NoSuchFieldException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IOException { + java.lang.reflect.Field reflectField = parser.getClass().getDeclaredField("mapper"); + reflectField.setAccessible(true); + ObjectMapper mapper = (ObjectMapper)reflectField.get(parser); + + Class dmdbTimestampClass = Class.forName("dm.jdbc.driver.DmdbTimestamp"); + Method valueOfMethod = dmdbTimestampClass.getMethod("valueOf", ZonedDateTime.class); + Method valueOfDateMethod = dmdbTimestampClass.getMethod("valueOf", Date.class); + + Object dmdbTimestamp = valueOfMethod.invoke(null, Instant.ofEpochMilli(1721985847000L).atZone(ZoneId.systemDefault())); + Field field = new Field("dmdb_timestamp", JDBCType.TIMESTAMP.getVendorTypeNumber(), dmdbTimestamp); + byte[] bytes = mapper.writeValueAsBytes(field); + Field sameField = mapper.readValue(bytes, Field.class); + Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult()); + + Object originalTimestamp = valueOfDateMethod.invoke(null, new Date(1721985847000L)); + field = new Field("dmdb_timestamp_type2", JDBCType.TIMESTAMP.getVendorTypeNumber(), originalTimestamp); + bytes = mapper.writeValueAsBytes(field); + sameField = mapper.readValue(bytes, Field.class); + Assertions.assertFalse(DataCompareUtils.isFieldEquals(field, sameField).getResult()); + + Object dmdbTimestampnanos = valueOfMethod.invoke(null, Instant.ofEpochMilli(1721985847000L).plusNanos(12345L).atZone(ZoneId.systemDefault())); + field = new Field("dmdb_timestamp_nanos", JDBCType.TIMESTAMP.getVendorTypeNumber(), dmdbTimestampnanos); + bytes = mapper.writeValueAsBytes(field); + sameField = mapper.readValue(bytes, Field.class); + Assertions.assertTrue(DataCompareUtils.isFieldEquals(field, sameField).getResult()); + + } + + private boolean checkClassExists(String className) { + try { + Class.forName(className); + return true; + } catch (ClassNotFoundException e) { + return false; + } } @Override