42
42
import java .util .Iterator ;
43
43
import java .util .List ;
44
44
import java .util .Map ;
45
+ import java .util .regex .Pattern ;
45
46
46
47
/**
47
48
* source data parse to json format
@@ -64,6 +65,9 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
64
65
private final List <AbstractTableInfo .FieldExtraInfo > fieldExtraInfos ;
65
66
private final String charsetName ;
66
67
68
+ private static final Pattern TIMESTAMP_PATTERN = Pattern .compile ("^\\ d+$" );
69
+ private static final Pattern TIME_FORMAT_PATTERN = Pattern .compile ("\\ w+\\ d+:\\ d+:\\ d+" );
70
+
67
71
public DtNestRowDeserializationSchema (TypeInformation <Row > typeInfo , Map <String , String > rowAndFieldMapping ,
68
72
List <AbstractTableInfo .FieldExtraInfo > fieldExtraInfos ,
69
73
String charsetName ) {
@@ -146,11 +150,11 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
146
150
return Date .valueOf (node .asText ());
147
151
} else if (info .getTypeClass ().equals (Types .SQL_TIME .getTypeClass ())) {
148
152
// local zone
149
- return Time . valueOf (node .asText ());
153
+ return convertToTime (node .asText ());
150
154
} else if (info .getTypeClass ().equals (Types .SQL_TIMESTAMP .getTypeClass ())) {
151
155
// local zone
152
- return Timestamp . valueOf (node .asText ());
153
- } else if (info instanceof RowTypeInfo ) {
156
+ return convertToTimestamp (node .asText ());
157
+ } else if (info instanceof RowTypeInfo ) {
154
158
return convertRow (node , (RowTypeInfo ) info );
155
159
} else if (info instanceof ObjectArrayTypeInfo ) {
156
160
return convertObjectArray (node , ((ObjectArrayTypeInfo ) info ).getComponentInfo ());
@@ -165,6 +169,29 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
165
169
}
166
170
}
167
171
172
+ /**
173
+ * 将 2020-09-07 14:49:10.0 和 1598446699685 两种格式都转化为 Timestamp
174
+ */
175
+ private Timestamp convertToTimestamp (String timestamp ) {
176
+ if (TIMESTAMP_PATTERN .matcher (timestamp ).find ()) {
177
+ return new Timestamp (Long .parseLong (timestamp ));
178
+ }
179
+ if (TIME_FORMAT_PATTERN .matcher (timestamp ).find ()) {
180
+ return Timestamp .valueOf (timestamp );
181
+ }
182
+ throw new IllegalArgumentException ("Incorrect time format of timestamp" );
183
+ }
184
+
185
+ private Time convertToTime (String timestamp ) {
186
+ if (TIMESTAMP_PATTERN .matcher (timestamp ).find ()) {
187
+ return new Time (Long .parseLong (timestamp ));
188
+ }
189
+ if (TIME_FORMAT_PATTERN .matcher (timestamp ).find ()) {
190
+ return Time .valueOf (timestamp );
191
+ }
192
+ throw new IllegalArgumentException ("Incorrect time format of time" );
193
+ }
194
+
168
195
private Row convertTopRow () {
169
196
Row row = new Row (fieldNames .length );
170
197
try {
@@ -175,7 +202,7 @@ private Row convertTopRow() {
175
202
if (node == null ) {
176
203
if (fieldExtraInfo != null && fieldExtraInfo .getNotNull ()) {
177
204
throw new IllegalStateException ("Failed to find field with name '"
178
- + fieldNames [i ] + "'." );
205
+ + fieldNames [i ] + "'." );
179
206
} else {
180
207
row .setField (i , null );
181
208
}
@@ -216,6 +243,7 @@ private Object convertObjectArray(JsonNode node, TypeInformation<?> elementType)
216
243
}
217
244
return array ;
218
245
}
246
+
219
247
@ Override
220
248
public TypeInformation <Row > getProducedType () {
221
249
return typeInfo ;
0 commit comments