Skip to content

Commit f52c65c

Browse files
committed
fix(schema-registry): fix internal schema-registry deserialization (#14479)
1 parent cecc28d commit f52c65c

File tree

38 files changed

+24741
-233
lines changed

38 files changed

+24741
-233
lines changed

datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static org.mockito.Mockito.mock;
66
import static org.testng.Assert.assertEquals;
77
import static org.testng.Assert.assertNotNull;
8+
import static org.testng.Assert.assertTrue;
89

910
import com.linkedin.datahub.upgrade.system.SystemUpdate;
1011
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
@@ -76,10 +77,17 @@ public void testSystemUpdateKafkaProducerOverride() throws RestClientException,
7677
MockSystemUpdateSerializer serializer = new MockSystemUpdateSerializer();
7778
serializer.configure(schemaRegistryConfig.getProperties(null), false);
7879
SchemaRegistryClient registry = serializer.getSchemaRegistryClient();
79-
assertEquals(
80+
81+
// The RENAMED_MCL_AVRO_SCHEMA can have either schema ID 11 or 12
82+
// Both are valid for the METADATA_CHANGE_LOG_VERSIONED topic
83+
int actualSchemaId =
8084
registry.getId(
81-
topicToSubjectName(Topics.METADATA_CHANGE_LOG_VERSIONED), RENAMED_MCL_AVRO_SCHEMA),
82-
2);
85+
topicToSubjectName(Topics.METADATA_CHANGE_LOG_VERSIONED), RENAMED_MCL_AVRO_SCHEMA);
86+
87+
// Accept either schema ID 11 (METADATA_CHANGE_LOG) or 12 (METADATA_CHANGE_LOG_TIMESERIES)
88+
assertTrue(
89+
actualSchemaId == 11 || actualSchemaId == 12,
90+
"Expected schema ID 11 or 12, but got: " + actualSchemaId);
8391
}
8492

8593
@Test
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package com.linkedin.metadata;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.util.ArrayList;
6+
import java.util.Collections;
7+
import java.util.HashMap;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Optional;
11+
import org.apache.avro.Schema;
12+
13+
/** Constants and utility methods for avro schema */
14+
public final class EventSchemaConstants {
15+
16+
private EventSchemaConstants() {
17+
// Utility class, prevent instantiation
18+
}
19+
20+
// Legacy v1 schema loaded from resource for backward compatibility
21+
public static final Schema MCP_V1_SCHEMA =
22+
loadSchemaFromResource(
23+
"v1/avro/com/linkedin/mxe/" + EventUtils.METADATA_CHANGE_PROPOSAL_SCHEMA_NAME + ".avsc");
24+
public static final Schema MCL_V1_SCHEMA =
25+
loadSchemaFromResource(
26+
"v1/avro/com/linkedin/mxe/" + EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME + ".avsc");
27+
public static final Schema MCL_TIMESERIES_V1_SCHEMA =
28+
loadSchemaFromResource(
29+
"v1/avro/com/linkedin/mxe/" + EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME + ".avsc");
30+
public static final Schema MCE_V1_SCHEMA =
31+
loadSchemaFromResource(
32+
"v1/avro/com/linkedin/mxe/" + EventUtils.METADATA_CHANGE_EVENT_SCHEMA_NAME + ".avsc");
33+
public static final Schema FMCE_V1_SCHEMA =
34+
loadSchemaFromResource(
35+
"v1/avro/com/linkedin/mxe/"
36+
+ EventUtils.FAILED_METADATA_CHANGE_EVENT_SCHEMA_NAME
37+
+ ".avsc");
38+
public static final Schema MAE_V1_SCHEMA =
39+
loadSchemaFromResource(
40+
"v1/avro/com/linkedin/mxe/" + EventUtils.METADATA_AUDIT_EVENT_SCHEMA_NAME + ".avsc");
41+
42+
// Current schemas used elsewhere in the codebase - use schemas with correct namespace
43+
public static final Schema MCP_SCHEMA = EventUtils.RENAMED_MCP_AVRO_SCHEMA;
44+
public static final Schema MCL_SCHEMA = EventUtils.RENAMED_MCL_AVRO_SCHEMA;
45+
public static final Schema MCL_TIMESERIES_SCHEMA = EventUtils.RENAMED_MCL_AVRO_SCHEMA;
46+
public static final Schema PE_SCHEMA = EventUtils.RENAMED_PE_AVRO_SCHEMA;
47+
public static final Schema MCE_SCHEMA = EventUtils.RENAMED_MCE_AVRO_SCHEMA;
48+
public static final Schema FMCE_SCHEMA = EventUtils.RENAMED_FAILED_MCE_AVRO_SCHEMA;
49+
public static final Schema MAE_SCHEMA = EventUtils.RENAMED_MAE_AVRO_SCHEMA;
50+
public static final Schema DUHE_SCHEMA = EventUtils.RENAMED_DUHE_AVRO_SCHEMA;
51+
public static final Schema FMCP_V1_SCHEMA =
52+
loadSchemaFromResource(
53+
"v1/avro/com/linkedin/mxe/"
54+
+ EventUtils.FAILED_METADATA_CHANGE_PROPOSAL_SCHEMA_NAME
55+
+ ".avsc");
56+
public static final Schema FMCP_SCHEMA = EventUtils.RENAMED_FMCP_AVRO_SCHEMA;
57+
58+
// Schema ID mappings for proper schema resolution
59+
// V1 schemas (backward compatible) - these get their own schema IDs
60+
public static final int MCP_V1_SCHEMA_ID =
61+
SchemaIdOrdinal.METADATA_CHANGE_PROPOSAL_V1.getSchemaId();
62+
public static final int FMCP_V1_SCHEMA_ID =
63+
SchemaIdOrdinal.FAILED_METADATA_CHANGE_PROPOSAL_V1.getSchemaId();
64+
public static final int MCL_V1_SCHEMA_ID = SchemaIdOrdinal.METADATA_CHANGE_LOG_V1.getSchemaId();
65+
public static final int MCL_TIMESERIES_V1_SCHEMA_ID =
66+
SchemaIdOrdinal.METADATA_CHANGE_LOG_TIMESERIES_V1.getSchemaId();
67+
public static final int MCE_V1_SCHEMA_ID = SchemaIdOrdinal.METADATA_CHANGE_EVENT_V1.getSchemaId();
68+
public static final int FMCE_V1_SCHEMA_ID =
69+
SchemaIdOrdinal.FAILED_METADATA_CHANGE_EVENT_V1.getSchemaId();
70+
public static final int MAE_V1_SCHEMA_ID = SchemaIdOrdinal.METADATA_AUDIT_EVENT_V1.getSchemaId();
71+
72+
// Current schemas (incompatible with V1) - these get new schema IDs
73+
public static final int MCP_SCHEMA_ID = SchemaIdOrdinal.METADATA_CHANGE_PROPOSAL.getSchemaId();
74+
public static final int FMCP_SCHEMA_ID =
75+
SchemaIdOrdinal.FAILED_METADATA_CHANGE_PROPOSAL.getSchemaId();
76+
public static final int MCL_SCHEMA_ID = SchemaIdOrdinal.METADATA_CHANGE_LOG.getSchemaId();
77+
public static final int MCL_TIMESERIES_SCHEMA_ID =
78+
SchemaIdOrdinal.METADATA_CHANGE_LOG_TIMESERIES.getSchemaId();
79+
public static final int MCE_SCHEMA_ID = SchemaIdOrdinal.METADATA_CHANGE_EVENT.getSchemaId();
80+
public static final int FMCE_SCHEMA_ID =
81+
SchemaIdOrdinal.FAILED_METADATA_CHANGE_EVENT.getSchemaId();
82+
public static final int MAE_SCHEMA_ID = SchemaIdOrdinal.METADATA_AUDIT_EVENT.getSchemaId();
83+
public static final int DUHE_SCHEMA_ID =
84+
SchemaIdOrdinal.DATAHUB_UPGRADE_HISTORY_EVENT.getSchemaId();
85+
86+
// Single version schemas (backward compatible)
87+
public static final int PE_SCHEMA_ID = SchemaIdOrdinal.PLATFORM_EVENT.getSchemaId();
88+
89+
// Schema name to list of schema IDs mapping
90+
private static final Map<String, List<Integer>> SCHEMA_NAME_TO_SCHEMA_IDS_MAP = new HashMap<>();
91+
92+
// Schema ID to schema name mapping
93+
private static final Map<Integer, String> SCHEMA_ID_TO_SCHEMA_NAME_MAP = new HashMap<>();
94+
95+
static {
96+
// Map schema names to their list of schema IDs
97+
// Each schema ID represents a different version of the schema (backwards incompatible)
98+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
99+
EventUtils.METADATA_CHANGE_PROPOSAL_SCHEMA_NAME, List.of(MCP_V1_SCHEMA_ID, MCP_SCHEMA_ID));
100+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
101+
EventUtils.FAILED_METADATA_CHANGE_PROPOSAL_SCHEMA_NAME,
102+
List.of(FMCP_V1_SCHEMA_ID, FMCP_SCHEMA_ID));
103+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
104+
EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME,
105+
List.of(
106+
MCL_V1_SCHEMA_ID,
107+
MCL_SCHEMA_ID,
108+
MCL_TIMESERIES_V1_SCHEMA_ID,
109+
MCL_TIMESERIES_SCHEMA_ID));
110+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(EventUtils.PLATFORM_EVENT_SCHEMA_NAME, List.of(PE_SCHEMA_ID));
111+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
112+
EventUtils.METADATA_CHANGE_EVENT_SCHEMA_NAME, List.of(MCE_V1_SCHEMA_ID, MCE_SCHEMA_ID));
113+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
114+
EventUtils.FAILED_METADATA_CHANGE_EVENT_SCHEMA_NAME,
115+
List.of(FMCE_V1_SCHEMA_ID, FMCE_SCHEMA_ID));
116+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
117+
EventUtils.METADATA_AUDIT_EVENT_SCHEMA_NAME, List.of(MAE_V1_SCHEMA_ID, MAE_SCHEMA_ID));
118+
SCHEMA_NAME_TO_SCHEMA_IDS_MAP.put(
119+
EventUtils.DATAHUB_UPGRADE_HISTORY_EVENT_SCHEMA_NAME, List.of(DUHE_SCHEMA_ID));
120+
121+
// Map schema IDs to their corresponding schema names
122+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
123+
MCP_V1_SCHEMA_ID, EventUtils.METADATA_CHANGE_PROPOSAL_SCHEMA_NAME);
124+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
125+
MCP_SCHEMA_ID, EventUtils.METADATA_CHANGE_PROPOSAL_SCHEMA_NAME);
126+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
127+
FMCP_V1_SCHEMA_ID, EventUtils.FAILED_METADATA_CHANGE_PROPOSAL_SCHEMA_NAME);
128+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
129+
FMCP_SCHEMA_ID, EventUtils.FAILED_METADATA_CHANGE_PROPOSAL_SCHEMA_NAME);
130+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(MCL_V1_SCHEMA_ID, EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME);
131+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(MCL_SCHEMA_ID, EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME);
132+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
133+
MCL_TIMESERIES_V1_SCHEMA_ID, EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME);
134+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
135+
MCL_TIMESERIES_SCHEMA_ID, EventUtils.METADATA_CHANGE_LOG_SCHEMA_NAME);
136+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(PE_SCHEMA_ID, EventUtils.PLATFORM_EVENT_SCHEMA_NAME);
137+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
138+
MCE_V1_SCHEMA_ID, EventUtils.METADATA_CHANGE_EVENT_SCHEMA_NAME);
139+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(MCE_SCHEMA_ID, EventUtils.METADATA_CHANGE_EVENT_SCHEMA_NAME);
140+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
141+
FMCE_V1_SCHEMA_ID, EventUtils.FAILED_METADATA_CHANGE_EVENT_SCHEMA_NAME);
142+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
143+
FMCE_SCHEMA_ID, EventUtils.FAILED_METADATA_CHANGE_EVENT_SCHEMA_NAME);
144+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(MAE_V1_SCHEMA_ID, EventUtils.METADATA_AUDIT_EVENT_SCHEMA_NAME);
145+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(MAE_SCHEMA_ID, EventUtils.METADATA_AUDIT_EVENT_SCHEMA_NAME);
146+
SCHEMA_ID_TO_SCHEMA_NAME_MAP.put(
147+
DUHE_SCHEMA_ID, EventUtils.DATAHUB_UPGRADE_HISTORY_EVENT_SCHEMA_NAME);
148+
}
149+
150+
public static Map<String, List<Integer>> getSchemaNameToSchemaIdsMap() {
151+
return Collections.unmodifiableMap(SCHEMA_NAME_TO_SCHEMA_IDS_MAP);
152+
}
153+
154+
public static Map<Integer, String> getSchemaIdToSchemaNameMap() {
155+
return Collections.unmodifiableMap(SCHEMA_ID_TO_SCHEMA_NAME_MAP);
156+
}
157+
158+
public static Optional<List<Integer>> getSchemaIdsForSchemaName(String schemaName) {
159+
return Optional.ofNullable(SCHEMA_NAME_TO_SCHEMA_IDS_MAP.get(schemaName));
160+
}
161+
162+
public static Optional<String> getSchemaNameForSchemaId(int schemaId) {
163+
return Optional.ofNullable(SCHEMA_ID_TO_SCHEMA_NAME_MAP.get(schemaId));
164+
}
165+
166+
public static List<String> getAllSchemaNames() {
167+
return new ArrayList<>(SCHEMA_NAME_TO_SCHEMA_IDS_MAP.keySet());
168+
}
169+
170+
// Schema compatibility constants
171+
/**
172+
* NONE: No compatibility checking is done. This means that any schema changes are allowed. This
173+
* is the most permissive mode and should be used when you want to make breaking changes without
174+
* any compatibility guarantees.
175+
*/
176+
public static final String SCHEMA_COMPATIBILITY_NONE = "NONE";
177+
178+
/**
179+
* BACKWARD: New schema can read data written by the previous schema. This means that new
180+
* consumers can read old data, but old consumers cannot read new data. This is useful when you
181+
* want to add new fields or make fields optional.
182+
*/
183+
public static final String SCHEMA_COMPATIBILITY_BACKWARD = "BACKWARD";
184+
185+
/**
186+
* FORWARD: Previous schema can read data written by the new schema. This means that old consumers
187+
* can read new data, but new consumers cannot read old data. This is useful when you want to
188+
* remove fields or make fields required.
189+
*/
190+
public static final String SCHEMA_COMPATIBILITY_FORWARD = "FORWARD";
191+
192+
/**
193+
* FULL: Both backward and forward compatibility are maintained. This means that both old and new
194+
* consumers can read data written by either schema. This is the most restrictive mode and
195+
* provides the strongest compatibility guarantees.
196+
*/
197+
public static final String SCHEMA_COMPATIBILITY_FULL = "FULL";
198+
199+
private static Schema loadSchemaFromResource(String resourceName) {
200+
try (InputStream inputStream =
201+
EventSchemaConstants.class.getClassLoader().getResourceAsStream(resourceName)) {
202+
if (inputStream == null) {
203+
throw new RuntimeException("Could not find schema resource: " + resourceName);
204+
}
205+
return new Schema.Parser().parse(inputStream);
206+
} catch (IOException e) {
207+
throw new RuntimeException("Error loading schema from resource: " + resourceName, e);
208+
}
209+
}
210+
}

metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,18 @@
3232

3333
public class EventUtils {
3434

35+
// Schema name constants
36+
public static final String METADATA_CHANGE_PROPOSAL_SCHEMA_NAME = "MetadataChangeProposal";
37+
public static final String METADATA_CHANGE_LOG_SCHEMA_NAME = "MetadataChangeLog";
38+
public static final String METADATA_CHANGE_EVENT_SCHEMA_NAME = "MetadataChangeEvent";
39+
public static final String FAILED_METADATA_CHANGE_EVENT_SCHEMA_NAME = "FailedMetadataChangeEvent";
40+
public static final String FAILED_METADATA_CHANGE_PROPOSAL_SCHEMA_NAME =
41+
"FailedMetadataChangeProposal";
42+
public static final String METADATA_AUDIT_EVENT_SCHEMA_NAME = "MetadataAuditEvent";
43+
public static final String PLATFORM_EVENT_SCHEMA_NAME = "PlatformEvent";
44+
public static final String DATAHUB_UPGRADE_HISTORY_EVENT_SCHEMA_NAME =
45+
"DataHubUpgradeHistoryEvent";
46+
3547
private static final RecordDataSchema MCE_PEGASUS_SCHEMA = new MetadataChangeEvent().schema();
3648

3749
private static final RecordDataSchema MAE_PEGASUS_SCHEMA = new MetadataAuditEvent().schema();
@@ -49,48 +61,50 @@ public class EventUtils {
4961
new DataHubUpgradeHistoryEvent().schema();
5062

5163
private static final Schema ORIGINAL_MCE_AVRO_SCHEMA =
52-
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeEvent.avsc");
64+
getAvroSchemaFromResource(
65+
"avro/com/linkedin/mxe/" + METADATA_CHANGE_EVENT_SCHEMA_NAME + ".avsc");
5366

5467
private static final Schema ORIGINAL_MAE_AVRO_SCHEMA =
55-
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataAuditEvent.avsc");
68+
getAvroSchemaFromResource(
69+
"avro/com/linkedin/mxe/" + METADATA_AUDIT_EVENT_SCHEMA_NAME + ".avsc");
5670

5771
private static final Schema ORIGINAL_FAILED_MCE_AVRO_SCHEMA =
58-
getAvroSchemaFromResource("avro/com/linkedin/mxe/FailedMetadataChangeEvent.avsc");
72+
getAvroSchemaFromResource(
73+
"avro/com/linkedin/mxe/" + FAILED_METADATA_CHANGE_EVENT_SCHEMA_NAME + ".avsc");
5974

6075
private static final Schema ORIGINAL_MCP_AVRO_SCHEMA =
61-
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeProposal.avsc");
76+
getAvroSchemaFromResource(
77+
"avro/com/linkedin/mxe/" + METADATA_CHANGE_PROPOSAL_SCHEMA_NAME + ".avsc");
6278

6379
private static final Schema ORIGINAL_MCL_AVRO_SCHEMA =
64-
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeLog.avsc");
80+
getAvroSchemaFromResource(
81+
"avro/com/linkedin/mxe/" + METADATA_CHANGE_LOG_SCHEMA_NAME + ".avsc");
6582

6683
private static final Schema ORIGINAL_FMCP_AVRO_SCHEMA =
67-
getAvroSchemaFromResource("avro/com/linkedin/mxe/FailedMetadataChangeProposal.avsc");
84+
getAvroSchemaFromResource(
85+
"avro/com/linkedin/mxe/" + FAILED_METADATA_CHANGE_PROPOSAL_SCHEMA_NAME + ".avsc");
6886

6987
private static final Schema ORIGINAL_PE_AVRO_SCHEMA =
70-
getAvroSchemaFromResource("avro/com/linkedin/mxe/PlatformEvent.avsc");
71-
72-
public static final Schema ORIGINAL_DUHE_AVRO_SCHEMA =
73-
getAvroSchemaFromResource("avro/com/linkedin/mxe/DataHubUpgradeHistoryEvent.avsc");
88+
getAvroSchemaFromResource("avro/com/linkedin/mxe/" + PLATFORM_EVENT_SCHEMA_NAME + ".avsc");
7489

75-
private static final Schema RENAMED_MCE_AVRO_SCHEMA =
90+
static final Schema RENAMED_MCE_AVRO_SCHEMA =
7691
com.linkedin.pegasus2avro.mxe.MetadataChangeEvent.SCHEMA$;
7792

78-
private static final Schema RENAMED_MAE_AVRO_SCHEMA =
93+
static final Schema RENAMED_MAE_AVRO_SCHEMA =
7994
com.linkedin.pegasus2avro.mxe.MetadataAuditEvent.SCHEMA$;
8095

81-
private static final Schema RENAMED_FAILED_MCE_AVRO_SCHEMA =
96+
static final Schema RENAMED_FAILED_MCE_AVRO_SCHEMA =
8297
com.linkedin.pegasus2avro.mxe.FailedMetadataChangeEvent.SCHEMA$;
8398

84-
private static final Schema RENAMED_PE_AVRO_SCHEMA =
85-
com.linkedin.pegasus2avro.mxe.PlatformEvent.SCHEMA$;
99+
static final Schema RENAMED_PE_AVRO_SCHEMA = com.linkedin.pegasus2avro.mxe.PlatformEvent.SCHEMA$;
86100

87101
public static final Schema RENAMED_MCP_AVRO_SCHEMA =
88102
com.linkedin.pegasus2avro.mxe.MetadataChangeProposal.SCHEMA$;
89103

90104
public static final Schema RENAMED_MCL_AVRO_SCHEMA =
91105
com.linkedin.pegasus2avro.mxe.MetadataChangeLog.SCHEMA$;
92106

93-
private static final Schema RENAMED_FMCP_AVRO_SCHEMA =
107+
static final Schema RENAMED_FMCP_AVRO_SCHEMA =
94108
com.linkedin.pegasus2avro.mxe.FailedMetadataChangeProposal.SCHEMA$;
95109

96110
public static final Schema RENAMED_DUHE_AVRO_SCHEMA =
@@ -223,9 +237,9 @@ public static DataHubUpgradeHistoryEvent avroToPegasusDUHE(@Nonnull GenericRecor
223237
throws IOException {
224238
return new DataHubUpgradeHistoryEvent(
225239
DataTranslator.genericRecordToDataMap(
226-
renameSchemaNamespace(record, RENAMED_DUHE_AVRO_SCHEMA, ORIGINAL_DUHE_AVRO_SCHEMA),
240+
renameSchemaNamespace(record, RENAMED_DUHE_AVRO_SCHEMA),
227241
DUHE_PEGASUS_SCHEMA,
228-
ORIGINAL_DUHE_AVRO_SCHEMA));
242+
RENAMED_DUHE_AVRO_SCHEMA));
229243
}
230244

231245
/**
@@ -375,7 +389,7 @@ public static GenericRecord pegasusToAvroDUHE(@Nonnull DataHubUpgradeHistoryEven
375389
throws IOException {
376390
GenericRecord original =
377391
DataTranslator.dataMapToGenericRecord(
378-
event.data(), event.schema(), ORIGINAL_DUHE_AVRO_SCHEMA);
392+
event.data(), event.schema(), RENAMED_DUHE_AVRO_SCHEMA);
379393
return renameSchemaNamespace(original, RENAMED_DUHE_AVRO_SCHEMA);
380394
}
381395

0 commit comments

Comments
 (0)