Skip to content

Commit 144e933

Browse files
committed
Adding a try/catch around deserialization of individual cells in order to increase the amount of information we output in the event of an error.
1 parent 0571a8e commit 144e933

1 file changed

Lines changed: 140 additions & 136 deletions

File tree

mysql-replicator-augmenter-model/src/main/java/com/booking/replication/augmenter/model/format/MysqlTypeDeserializer.java

Lines changed: 140 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -46,188 +46,192 @@ public class MysqlTypeDeserializer {
4646
}
4747

4848
public static Object convertToObject(Serializable cellValue, ColumnSchema columnSchema, String[] groupValues) {
49+
try {
50+
if (cellValue == null) {
51+
return null;
52+
}
4953

50-
if (cellValue == null) {
51-
return null;
52-
}
54+
String collation = columnSchema.getCollation();
55+
String columnType = columnSchema.getColumnType();
56+
DataType dataType = columnSchema.getDataType();
57+
boolean isUnsigned = columnType.contains("unsigned");
5358

54-
String collation = columnSchema.getCollation();
55-
String columnType = columnSchema.getColumnType();
56-
DataType dataType = columnSchema.getDataType();
57-
boolean isUnsigned = columnType.contains("unsigned");
59+
switch (dataType) {
60+
case BINARY:
61+
case VARBINARY: {
62+
byte[] bytes = (byte[]) cellValue;
5863

59-
switch (dataType) {
60-
case BINARY:
61-
case VARBINARY: {
62-
byte[] bytes = (byte[]) cellValue;
64+
if (bytes.length == columnSchema.getCharMaxLength()) {
65+
return DatatypeConverter.printHexBinary(bytes);
66+
} else {
67+
byte[] bytesWithPadding = new byte[columnSchema.getCharMaxLength()];
6368

64-
if (bytes.length == columnSchema.getCharMaxLength()) {
65-
return DatatypeConverter.printHexBinary(bytes);
66-
} else {
67-
byte[] bytesWithPadding = new byte[columnSchema.getCharMaxLength()];
69+
for (int i = 0; i < bytesWithPadding.length; ++i) {
70+
bytesWithPadding[i] = (i < bytes.length) ? bytes[i] : 0;
71+
}
6872

69-
for (int i = 0; i < bytesWithPadding.length; ++i) {
70-
bytesWithPadding[i] = (i < bytes.length) ? bytes[i] : 0;
73+
return DatatypeConverter.printHexBinary(bytesWithPadding);
7174
}
72-
73-
return DatatypeConverter.printHexBinary(bytesWithPadding);
7475
}
75-
}
7676

77-
case TINYBLOB:
78-
case MEDIUMBLOB:
79-
case BLOB:
80-
case LONGBLOB: {
81-
byte[] bytes = (byte[]) cellValue;
82-
return DatatypeConverter.printHexBinary(bytes);
83-
}
77+
case TINYBLOB:
78+
case MEDIUMBLOB:
79+
case BLOB:
80+
case LONGBLOB: {
81+
byte[] bytes = (byte[]) cellValue;
82+
return DatatypeConverter.printHexBinary(bytes);
83+
}
8484

85-
case CHAR:
86-
case VARCHAR:
87-
case TEXT:
88-
case MEDIUMTEXT:
89-
case TINYTEXT: {
90-
byte[] bytes = (byte[]) cellValue;
85+
case CHAR:
86+
case VARCHAR:
87+
case TEXT:
88+
case MEDIUMTEXT:
89+
case TINYTEXT: {
90+
byte[] bytes = (byte[]) cellValue;
9191

92-
if (collation.contains("latin1")) {
93-
return new String(bytes, StandardCharsets.ISO_8859_1);
94-
} else {
95-
return new String(bytes, StandardCharsets.UTF_8);
92+
if (collation.contains("latin1")) {
93+
return new String(bytes, StandardCharsets.ISO_8859_1);
94+
} else {
95+
return new String(bytes, StandardCharsets.UTF_8);
96+
}
9697
}
97-
}
9898

99-
case JSON: {
100-
byte[] bytes = (byte[]) cellValue;
101-
try {
102-
return JsonBinary.parseAsString(bytes);
103-
} catch (IOException ex) {
104-
LOG.error(
105-
String.format("Could not parse JSON string Column Name : %s, byte[]%s",
106-
columnSchema.getName(), Arrays.toString(bytes)), ex);
107-
return null;
99+
case JSON: {
100+
byte[] bytes = (byte[]) cellValue;
101+
try {
102+
return JsonBinary.parseAsString(bytes);
103+
} catch (IOException ex) {
104+
LOG.error(
105+
String.format("Could not parse JSON string Column Name : %s, byte[]%s",
106+
columnSchema.getName(), Arrays.toString(bytes)), ex);
107+
return null;
108+
}
108109
}
109-
}
110110

111-
case BIT: {
112-
final BitSet data = (BitSet) cellValue;
111+
case BIT: {
112+
final BitSet data = (BitSet) cellValue;
113113

114-
if (data.length() == 0) {
115-
return "0";
116-
}
114+
if (data.length() == 0) {
115+
return "0";
116+
}
117117

118-
final StringBuilder buffer = new StringBuilder(data.length());
119-
IntStream.range(0, data.length()).mapToObj(i -> data.get(i) ? '1' : '0').forEach(buffer::append);
120-
return buffer.reverse().toString();
121-
}
118+
final StringBuilder buffer = new StringBuilder(data.length());
119+
IntStream.range(0, data.length()).mapToObj(i -> data.get(i) ? '1' : '0').forEach(buffer::append);
120+
return buffer.reverse().toString();
121+
}
122122

123-
case DATE: {
124-
return DATE_FORMAT.format(cellValue);
125-
}
123+
case DATE: {
124+
return DATE_FORMAT.format(cellValue);
125+
}
126126

127-
case TIME: {
128-
return TIME_FORMAT.format(cellValue);
129-
}
127+
case TIME: {
128+
return TIME_FORMAT.format(cellValue);
129+
}
130130

131-
case DATETIME:
132-
case TIMESTAMP: {
133-
Long timestamp = (Long) cellValue;
131+
case DATETIME:
132+
case TIMESTAMP: {
133+
Long timestamp = (Long) cellValue;
134134

135-
ZoneId zoneId = ZonedDateTime.now().getZone();
136-
LocalDateTime aLDT = Instant.ofEpochMilli(timestamp).atZone(zoneId).toLocalDateTime();
135+
ZoneId zoneId = ZonedDateTime.now().getZone();
136+
LocalDateTime aLDT = Instant.ofEpochMilli(timestamp).atZone(zoneId).toLocalDateTime();
137137

138-
Integer offset = ZonedDateTime.from(aLDT.atZone(zoneId)).getOffset().getTotalSeconds();
139-
timestamp = timestamp - offset * 1000;
138+
Integer offset = ZonedDateTime.from(aLDT.atZone(zoneId)).getOffset().getTotalSeconds();
139+
timestamp = timestamp - offset * 1000;
140140

141-
return String.valueOf(timestamp);
142-
}
141+
return String.valueOf(timestamp);
142+
}
143143

144-
case ENUM: {
145-
int index = (Integer) cellValue;
144+
case ENUM: {
145+
int index = (Integer) cellValue;
146146

147-
if (index > 0) {
148-
return String.valueOf(groupValues[index - 1]);
149-
} else {
150-
return null;
147+
if (index > 0) {
148+
return String.valueOf(groupValues[index - 1]);
149+
} else {
150+
return null;
151+
}
151152
}
152-
}
153153

154-
case SET: {
155-
long bits = (Long) cellValue;
154+
case SET: {
155+
long bits = (Long) cellValue;
156156

157-
if (bits > 0) {
158-
List<String> items = new ArrayList<>();
157+
if (bits > 0) {
158+
List<String> items = new ArrayList<>();
159159

160-
for (int index = 0; index < groupValues.length; index++) {
161-
if (((bits >> index) & 1) == 1) {
162-
items.add(groupValues[index]);
160+
for (int index = 0; index < groupValues.length; index++) {
161+
if (((bits >> index) & 1) == 1) {
162+
items.add(groupValues[index]);
163+
}
163164
}
165+
166+
return String.join(",", items.toArray(new String[0]));
167+
} else {
168+
return null;
164169
}
170+
}
165171

166-
return String.join(",", items.toArray(new String[0]));
167-
} else {
168-
return null;
172+
case TINYINT: {
173+
Long mask = isUnsigned ? UNSIGNED_TINYINT_MASK : DEFAULT_MASK;
174+
return maskAndGet(cellValue, mask);
169175
}
170-
}
171176

172-
case TINYINT: {
173-
Long mask = isUnsigned ? UNSIGNED_TINYINT_MASK : DEFAULT_MASK;
174-
return maskAndGet(cellValue, mask);
175-
}
177+
case SMALLINT: {
178+
Long mask = isUnsigned ? UNSIGNED_SMALLINT_MASK : DEFAULT_MASK;
179+
return maskAndGet(cellValue, mask);
180+
}
176181

177-
case SMALLINT: {
178-
Long mask = isUnsigned ? UNSIGNED_SMALLINT_MASK : DEFAULT_MASK;
179-
return maskAndGet(cellValue, mask);
180-
}
182+
case MEDIUMINT: {
183+
Long mask = isUnsigned ? UNSIGNED_MEDIUMINT_MASK : DEFAULT_MASK;
184+
return maskAndGet(cellValue, mask);
185+
}
181186

182-
case MEDIUMINT: {
183-
Long mask = isUnsigned ? UNSIGNED_MEDIUMINT_MASK : DEFAULT_MASK;
184-
return maskAndGet(cellValue, mask);
185-
}
187+
case INT: {
188+
Long mask = isUnsigned ? UNSIGNED_INT_MASK : DEFAULT_MASK;
189+
return maskAndGet(cellValue, mask);
190+
}
186191

187-
case INT: {
188-
Long mask = isUnsigned ? UNSIGNED_INT_MASK : DEFAULT_MASK;
189-
return maskAndGet(cellValue, mask);
190-
}
192+
case BIGINT: {
193+
if (isUnsigned) {
194+
long longValue = (Long) cellValue;
191195

192-
case BIGINT: {
193-
if (isUnsigned) {
194-
long longValue = (Long) cellValue;
196+
int upper = (int) (longValue >>> 32);
197+
int lower = (int) longValue;
195198

196-
int upper = (int) (longValue >>> 32);
197-
int lower = (int) longValue;
199+
BigInteger bigInteger = BigInteger.valueOf(Integer.toUnsignedLong(upper))
200+
.shiftLeft(32)
201+
.add(BigInteger.valueOf(Integer.toUnsignedLong(lower)));
198202

199-
BigInteger bigInteger = BigInteger.valueOf(Integer.toUnsignedLong(upper))
200-
.shiftLeft(32)
201-
.add(BigInteger.valueOf(Integer.toUnsignedLong(lower)));
203+
return bigInteger;
204+
} else {
205+
return maskAndGet(cellValue, DEFAULT_MASK);
206+
}
207+
}
208+
case FLOAT:
209+
case DOUBLE: {
210+
//FLOT converted as java.lang.Float
211+
//Double converted as java.lang.Double
212+
return cellValue;
213+
}
202214

203-
return bigInteger;
204-
} else {
205-
return maskAndGet(cellValue, DEFAULT_MASK);
215+
case DECIMAL: {
216+
BigDecimal decimal = (BigDecimal) cellValue;
217+
return decimal.toPlainString();
206218
}
207-
}
208-
case FLOAT:
209-
case DOUBLE: {
210-
//FLOT converted as java.lang.Float
211-
//Double converted as java.lang.Double
212-
return cellValue;
213-
}
214219

215-
case DECIMAL: {
216-
BigDecimal decimal = (BigDecimal) cellValue;
217-
return decimal.toPlainString();
218-
}
220+
case UNKNOWN:
221+
default: {
219222

220-
case UNKNOWN:
221-
default: {
223+
if (cellValue instanceof byte[]) {
224+
byte[] bytes = (byte[]) cellValue;
225+
return DatatypeConverter.printHexBinary(bytes);
226+
}
222227

223-
if (cellValue instanceof byte[]) {
224-
byte[] bytes = (byte[]) cellValue;
225-
return DatatypeConverter.printHexBinary(bytes);
228+
LOG.error(String.format("The datatype is %s hence returning null", dataType.getCode()));
229+
return null;
226230
}
227-
228-
LOG.error(String.format("The datatype is %s hence returning null", dataType.getCode()));
229-
return null;
230231
}
232+
} catch ( Exception e ) {
233+
LOG.warn("Error deserializing cell: cellValue: " + cellValue + ", columnSchema: " + columnSchema.toString() + ", groupValues: " + ( groupValues != null ? groupValues.toString() : "" ), e );
234+
throw e;
231235
}
232236
}
233237

0 commit comments

Comments
 (0)