diff --git a/.gitignore b/.gitignore index ec1833389e778..40d10a8e1c862 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ # Pegasus & Avro **/src/mainGenerated* **/src/testGenerated* +metadata-events/mxe-registration/src/main/resources/**/*.avsc # Java .java-version diff --git a/metadata-dao-impl/ebean-dao/gma-create-all.sql b/metadata-dao-impl/ebean-dao/gma-create-all.sql index 2da3e063087e3..e1cf9bab6771c 100644 --- a/metadata-dao-impl/ebean-dao/gma-create-all.sql +++ b/metadata-dao-impl/ebean-dao/gma-create-all.sql @@ -15,3 +15,18 @@ create table metadata_aspect ( constraint pk_metadata_aspect primary key (urn,aspect,version) ); +create table metadata_index ( + id bigint auto_increment not null, + urn varchar(500) not null, + aspect varchar(200) not null, + path varchar(200) not null, + longval bigint, + stringval varchar(500), + doubleval double, + constraint pk_metadata_index primary key (id) +); + +create index idx_long_val on metadata_index (aspect,path,longval,urn); +create index idx_string_val on metadata_index (aspect,path,stringval,urn); +create index idx_double_val on metadata_index (aspect,path,doubleval,urn); +create index idx_urn on metadata_index (urn); diff --git a/metadata-dao-impl/ebean-dao/gma-drop-all.sql b/metadata-dao-impl/ebean-dao/gma-drop-all.sql index 0882d0deb8cd5..f66d4602e5d68 100644 --- a/metadata-dao-impl/ebean-dao/gma-drop-all.sql +++ b/metadata-dao-impl/ebean-dao/gma-drop-all.sql @@ -2,3 +2,9 @@ drop table if exists metadata_id; drop table if exists metadata_aspect; +drop table if exists metadata_index; + +drop index if exists idx_long_val; +drop index if exists idx_string_val; +drop index if exists idx_double_val; +drop index if exists idx_urn; diff --git a/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java new file mode 100644 index 0000000000000..0d81cbb509ea9 --- /dev/null +++ b/metadata-dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanMetadataIndex.java @@ -0,0 +1,78 @@ +package com.linkedin.metadata.dao; + +import io.ebean.annotation.Index; +import io.ebean.Model; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; + + +@Getter +@Setter +// define composite indexes +@Index(name = "idx_long_val", columnNames = { + EbeanMetadataIndex.ASPECT_COLUMN, + EbeanMetadataIndex.PATH_COLUMN, + EbeanMetadataIndex.LONG_COLUMN, + EbeanMetadataIndex.URN_COLUMN +}) +@Index(name = "idx_string_val", columnNames = { + EbeanMetadataIndex.ASPECT_COLUMN, + EbeanMetadataIndex.PATH_COLUMN, + EbeanMetadataIndex.STRING_COLUMN, + EbeanMetadataIndex.URN_COLUMN +}) +@Index(name = "idx_double_val", columnNames = { + EbeanMetadataIndex.ASPECT_COLUMN, + EbeanMetadataIndex.PATH_COLUMN, + EbeanMetadataIndex.DOUBLE_COLUMN, + EbeanMetadataIndex.URN_COLUMN +}) +@Entity +@Table(name = "metadata_index") +public class EbeanMetadataIndex extends Model { + + public static final long serialVersionUID = 1L; + + private static final String ID_COLUMN = "id"; + public static final String URN_COLUMN = "urn"; + public static final String ASPECT_COLUMN = "aspect"; + public static final String PATH_COLUMN = "path"; + public static final String LONG_COLUMN = "longVal"; + public static final String STRING_COLUMN = "stringVal"; + public static final String DOUBLE_COLUMN = "doubleVal"; + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + @Column(name = ID_COLUMN) + protected long id; + + @NonNull + @Index(name = "idx_urn") + @Column(name = URN_COLUMN, length = 500, nullable = false) + protected String urn; + + @NonNull + @Column(name = ASPECT_COLUMN, length = 200, nullable = false) + protected String aspect; + + @NonNull + @Column(name = PATH_COLUMN, length = 200, nullable = false) + protected String path; + + @Column(name = LONG_COLUMN) + protected Long longVal; + + @Column(name = STRING_COLUMN, length = 500) + protected String stringVal; + + @Column(name = DOUBLE_COLUMN) + protected Double doubleVal; + +} \ No newline at end of file diff --git a/metadata-dao-impl/ebean-dao/src/main/resources/gma-create-all.sql b/metadata-dao-impl/ebean-dao/src/main/resources/gma-create-all.sql index 98770bccba337..a36f3c604f2d5 100644 --- a/metadata-dao-impl/ebean-dao/src/main/resources/gma-create-all.sql +++ b/metadata-dao-impl/ebean-dao/src/main/resources/gma-create-all.sql @@ -15,3 +15,18 @@ create table metadata_id ( constraint uq_metadata_id_namespace_id unique (namespace,id) ); +create table metadata_index ( + id bigint auto_increment not null, + urn varchar(500) not null, + aspect varchar(200) not null, + path varchar(200) not null, + longval bigint, + stringval varchar(500), + doubleval double, + constraint pk_metadata_index primary key (id) +); + +create index idx_long_val on metadata_index (aspect,path,longval,urn); +create index idx_string_val on metadata_index (aspect,path,stringval,urn); +create index idx_double_val on metadata_index (aspect,path,doubleval,urn); +create index idx_urn on metadata_index (urn); diff --git a/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/ESUtils.java b/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/ESUtils.java index d44de97ad4592..c626c6cb41510 100644 --- a/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/ESUtils.java +++ b/metadata-dao-impl/elasticsearch-dao/src/main/java/com/linkedin/metadata/dao/utils/ESUtils.java @@ -18,10 +18,10 @@ public class ESUtils { private static final String DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD = "urn"; /* - * TODO: we might need to extend this list if need be, below link has the complete list - * https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html - * */ - private static final char[] ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS = {'*'}; + * Refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html for list of reserved + * characters in an Elasticsearch regular expression. + */ + private static final String ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS = "?+*|{}[]()"; private ESUtils() { @@ -81,7 +81,7 @@ public static void buildSortOrder(@Nonnull SearchSourceBuilder searchSourceBuild */ @Nonnull public static String escapeReservedCharacters(@Nonnull String input) { - for (char reservedChar : ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS) { + for (char reservedChar : ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS.toCharArray()) { input = input.replace(String.valueOf(reservedChar), "\\" + reservedChar); } return input; diff --git a/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/ESUtilsTest.java b/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/ESUtilsTest.java index 21cf75c4fe37e..1d749cbc66af7 100644 --- a/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/ESUtilsTest.java +++ b/metadata-dao-impl/elasticsearch-dao/src/test/java/com/linkedin/metadata/dao/ESUtilsTest.java @@ -39,5 +39,7 @@ public void testBuildFilterQuery() throws Exception { public void testEscapeReservedCharacters() { assertEquals(escapeReservedCharacters("foobar"), "foobar"); assertEquals(escapeReservedCharacters("**"), "\\*\\*"); + assertEquals(escapeReservedCharacters("()"), "\\(\\)"); + assertEquals(escapeReservedCharacters("{}"), "\\{\\}"); } } diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java index 8b5a574ed1903..852ee988073ca 100644 --- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaMetadataEventProducer.java @@ -8,10 +8,13 @@ import com.linkedin.metadata.dao.utils.ModelUtils; import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.snapshot.Snapshot; +import com.linkedin.mxe.Configs; import com.linkedin.mxe.MetadataAuditEvent; import com.linkedin.mxe.MetadataChangeEvent; import com.linkedin.mxe.Topics; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -19,6 +22,7 @@ import javax.annotation.Nullable; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -107,6 +111,46 @@ record = EventUtils.pegasusToAvroMAE(metadataAuditEvent); } } + @Override + public void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn, + @Nullable ASPECT oldValue, @Nonnull ASPECT newValue) { + + validateAspectSpecificTopic(ModelUtils.getAspectSpecificMAETopicName(urn, newValue)); + + String topic; + Class maeAvroClass; + RecordTemplate metadataAuditEvent; + try { + topic = (String) Topics.class.getField(ModelUtils.getAspectSpecificMAETopicName(urn, newValue)).get(null); + maeAvroClass = Configs.TOPIC_SCHEMA_CLASS_MAP.get(topic); + metadataAuditEvent = (RecordTemplate) EventUtils.getPegasusClass(maeAvroClass).newInstance(); + + metadataAuditEvent.getClass().getMethod("setUrn", urn.getClass()).invoke(metadataAuditEvent, urn); + metadataAuditEvent.getClass().getMethod("setNewValue", newValue.getClass()).invoke(metadataAuditEvent, newValue); + if (oldValue != null) { + metadataAuditEvent.getClass() + .getMethod("setOldValue", oldValue.getClass()) + .invoke(metadataAuditEvent, oldValue); + } + } catch (NoSuchFieldException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException + | InstantiationException | InvocationTargetException e) { + throw new IllegalArgumentException("Failed to compose the Pegasus aspect specific MAE", e); + } + + GenericRecord record; + try { + record = EventUtils.pegasusToAvroAspectSpecificMXE(maeAvroClass, metadataAuditEvent); + } catch (NoSuchFieldException | IOException | IllegalAccessException e) { + throw new ModelConversionException("Failed to convert Pegasus aspect specific MAE to Avro", e); + } + + if (_callback.isPresent()) { + _producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get()); + } else { + _producer.send(new ProducerRecord(topic, urn.toString(), record)); + } + } + @Nonnull private Snapshot makeSnapshot(@Nonnull URN urn, @Nonnull RecordTemplate value) { Snapshot snapshot = new Snapshot(); @@ -114,4 +158,10 @@ private Snapshot makeSnapshot(@Nonnull URN urn, @Nonnull RecordTemplate value) { RecordUtils.setSelectedRecordTemplateInUnion(snapshot, ModelUtils.newSnapshot(_snapshotClass, urn, aspects)); return snapshot; } -} + + static void validateAspectSpecificTopic(@Nonnull String topic) { + if (!Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic))) { + throw new IllegalArgumentException(String.format("The aspect specific topic %s is not registered.", topic)); + } + } +} \ No newline at end of file diff --git a/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jQueryDAO.java b/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jQueryDAO.java index 0bcc022fd60f0..0f4d918d19787 100644 --- a/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jQueryDAO.java +++ b/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jQueryDAO.java @@ -7,11 +7,11 @@ import com.linkedin.metadata.query.RelationshipFilter; import com.linkedin.metadata.validator.EntityValidator; import com.linkedin.metadata.validator.RelationshipValidator; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; -import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -185,7 +185,7 @@ public List findMixedTypesRelationships(@Nonnull Statement query @Nonnull public - List> getPathsToAllNodesTraversed( + List> getTraversedPaths( @Nullable Class sourceEntityClass, @Nonnull Filter sourceEntityFilter, @Nullable Class destinationEntityClass, @Nonnull Filter destinationEntityFilter, @Nonnull Class relationshipType, @Nonnull RelationshipFilter relationshipFilter, @@ -221,7 +221,7 @@ List> getPathsToAllNodesTraversed( final Statement statement = buildStatement(statementString, "length(p), dest.urn", offset, count); - return runQuery(statement, this::pathRecordToEntityList); + return runQuery(statement, this::pathRecordToPathList); } /** @@ -302,11 +302,21 @@ private ENTITY nodeRecordToEntity(@Nonnull Class } @Nonnull - private List pathRecordToEntityList(@Nonnull Record pathRecord) { + private List pathRecordToPathList(@Nonnull Record pathRecord) { final Path path = pathRecord.values().get(0).asPath(); - return StreamSupport.stream(path.nodes().spliterator(), false) - .map(Neo4jUtil::nodeToEntity) - .collect(Collectors.toList()); + final List pathList = new ArrayList<>(); + + StreamSupport.stream(path.spliterator(), false) + .map(Neo4jUtil::pathSegmentToRecordList) + .forEach(segment -> { + if (pathList.isEmpty()) { + pathList.add(segment.get(0)); + } + pathList.add(segment.get(1)); + pathList.add(segment.get(2)); + }); + + return pathList; } @Nonnull diff --git a/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jUtil.java b/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jUtil.java index cc645ad284a32..6f8564085c899 100644 --- a/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jUtil.java +++ b/metadata-dao-impl/neo4j-dao/src/main/java/com/linkedin/metadata/dao/Neo4jUtil.java @@ -8,14 +8,17 @@ import com.linkedin.metadata.query.Filter; import com.linkedin.metadata.query.RelationshipDirection; import com.linkedin.metadata.query.RelationshipFilter; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.StringJoiner; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.commons.lang3.ClassUtils; import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Path; import org.neo4j.driver.types.Relationship; import static com.linkedin.metadata.dao.utils.QueryUtils.*; @@ -165,6 +168,25 @@ public static RecordTemplate nodeToEntity(@Nonnull Node node) { return RecordUtils.toRecordTemplate(className, new DataMap(node.asMap())); } + /** + * Converts path segment (field:value map) list of {@link RecordTemplate}s of nodes & edges + * + * @param segment The segment of a path containing nodes & edges + * @return List + */ + @Nonnull + public static List pathSegmentToRecordList(@Nonnull Path.Segment segment) { + final Node startNode = segment.start(); + final Node endNode = segment.end(); + final Relationship edge = segment.relationship(); + + return Arrays.asList( + nodeToEntity(startNode), + edgeToRelationship(startNode, endNode, edge), + nodeToEntity(endNode) + ); + } + /** * Converts edge (source-relationship->destination) to RELATIONSHIP * diff --git a/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java b/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java index 4799e4a3f8997..633de5d0f2547 100644 --- a/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java +++ b/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jQueryDAOTest.java @@ -511,17 +511,17 @@ public void testFindNodesInPath() throws Exception { // Get reports roll-up - 2 levels Filter sourceFilter = newFilter("urn", urn1.toString()); RelationshipFilter relationshipFilter = createRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING); - List> nodesInPath = _dao.getPathsToAllNodesTraversed(EntityFoo.class, sourceFilter, null, + List> paths = _dao.getTraversedPaths(EntityFoo.class, sourceFilter, null, EMPTY_FILTER, RelationshipFoo.class, relationshipFilter, 1, 2, -1, -1); - assertEquals(nodesInPath.size(), 5); - assertEquals(nodesInPath.stream().filter(l -> l.size() == 2).collect(Collectors.toList()).size(), 2); - assertEquals(nodesInPath.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 3); + assertEquals(paths.size(), 5); + assertEquals(paths.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 2); + assertEquals(paths.stream().filter(l -> l.size() == 5).collect(Collectors.toList()).size(), 3); // Get reports roll-up - 1 level - nodesInPath = _dao.getPathsToAllNodesTraversed(EntityFoo.class, sourceFilter, null, + paths = _dao.getTraversedPaths(EntityFoo.class, sourceFilter, null, EMPTY_FILTER, RelationshipFoo.class, relationshipFilter, 1, 1, -1, -1); - assertEquals(nodesInPath.size(), 2); - assertEquals(nodesInPath.stream().filter(l -> l.size() == 2).collect(Collectors.toList()).size(), 2); - assertEquals(nodesInPath.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 0); + assertEquals(paths.size(), 2); + assertEquals(paths.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 2); + assertEquals(paths.stream().filter(l -> l.size() == 5).collect(Collectors.toList()).size(), 0); } } diff --git a/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jUtilTest.java b/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jUtilTest.java index c77f654bf8f52..db3fae4dc4719 100644 --- a/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jUtilTest.java +++ b/metadata-dao-impl/neo4j-dao/src/test/java/com/linkedin/metadata/dao/Neo4jUtilTest.java @@ -1,6 +1,7 @@ package com.linkedin.metadata.dao; import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.query.Condition; import com.linkedin.metadata.query.Criterion; @@ -8,18 +9,23 @@ import com.linkedin.metadata.query.Filter; import com.linkedin.metadata.query.RelationshipDirection; import com.linkedin.metadata.query.RelationshipFilter; -import com.linkedin.testing.RelationshipFoo; -import com.linkedin.testing.EntityFoo; import com.linkedin.testing.EntityBar; -import com.linkedin.testing.urn.FooUrn; +import com.linkedin.testing.EntityFoo; +import com.linkedin.testing.RelationshipFoo; import com.linkedin.testing.urn.BarUrn; +import com.linkedin.testing.urn.FooUrn; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.neo4j.driver.Value; import org.neo4j.driver.internal.InternalNode; +import org.neo4j.driver.internal.InternalPath; import org.neo4j.driver.internal.InternalRelationship; +import org.neo4j.driver.internal.value.IntegerValue; import org.neo4j.driver.internal.value.StringValue; import org.neo4j.driver.types.Node; +import org.neo4j.driver.types.Path; import org.neo4j.driver.types.Relationship; import org.testng.annotations.Test; @@ -184,4 +190,40 @@ public void testCreateRelationshipFilter() { assertEquals(createRelationshipFilter(field, value, direction), relationshipFilter); } + + @Test + public void testPathSegmentToRecordList() { + FooUrn fooUrn = makeFooUrn(1); + Node fooNode = new InternalNode(0, Collections.singletonList("com.linkedin.testing.EntityFoo"), + Collections.singletonMap("urn", new StringValue(fooUrn.toString()))); + + BarUrn barUrn = makeBarUrn(1); + Node barNode = new InternalNode(1, Collections.singletonList("com.linkedin.testing.EntityBar"), + Collections.singletonMap("urn", new StringValue(barUrn.toString()))); + + Relationship edge = new InternalRelationship(2, 0, 1, RelationshipFoo.class.getCanonicalName(), + new HashMap() { + { + put("intField", new IntegerValue(42)); + put("type", new StringValue("dummyType")); + } + }); + + Path path = new InternalPath(fooNode, edge, barNode); + + List pathList = pathSegmentToRecordList(path.iterator().next()); + + assertTrue(pathList.get(0) instanceof EntityFoo); + assertTrue(pathList.get(1) instanceof RelationshipFoo); + assertTrue(pathList.get(2) instanceof EntityBar); + + assertEquals(pathList.get(0), new EntityFoo().setUrn(fooUrn)); + assertTrue(DataTemplateUtil.areEqual(pathList.get(1), + new RelationshipFoo() + .setIntField(42) + .setSource(fooUrn) + .setDestination(barUrn) + .setType("dummyType"))); + assertEquals(pathList.get(2), new EntityBar().setUrn(barUrn)); + } } diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java index d8a39dc7e59a3..200af0cb40449 100644 --- a/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/BaseMetadataEventProducer.java @@ -18,7 +18,8 @@ public abstract class BaseMetadataEventProducer _snapshotClass; protected final Class _aspectUnionClass; - public BaseMetadataEventProducer(@Nonnull Class snapshotClass, @Nonnull Class aspectUnionClass) { + public BaseMetadataEventProducer(@Nonnull Class snapshotClass, + @Nonnull Class aspectUnionClass) { ModelUtils.validateSnapshotAspect(snapshotClass, aspectUnionClass); _snapshotClass = snapshotClass; _aspectUnionClass = aspectUnionClass; @@ -44,4 +45,14 @@ public abstract void produceSnapshotBasedMetadat */ public abstract void produceMetadataAuditEvent(@Nonnull URN urn, @Nullable ASPECT oldValue, @Nonnull ASPECT newValue); + + /** + * Produces an aspect specific Metadata Audit Event (MAE) after a metadata aspect is updated for an entity. + * + * @param urn {@link Urn} of the entity + * @param oldValue the value prior to the update, or null if there's none. + * @param newValue the value after the update + */ + public abstract void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn, + @Nullable ASPECT oldValue, @Nonnull ASPECT newValue); } diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java index 43be72cec2a0f..1904147c32224 100644 --- a/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducer.java @@ -29,4 +29,10 @@ public void produceMetadataAuditEvent(@Nonnull U @Nonnull ASPECT newValue) { // Do nothing } + + @Override + public void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn, + @Nullable ASPECT oldValue, @Nonnull ASPECT newValue) { + // Do nothing + } } diff --git a/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java b/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java index c1c943d8e2dd8..baf0b496a3b38 100644 --- a/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java +++ b/metadata-dao/src/main/java/com/linkedin/metadata/dao/utils/ModelUtils.java @@ -34,6 +34,7 @@ public class ModelUtils { private static final ClassLoader CLASS_LOADER = DummySnapshot.class.getClassLoader(); + private static final String METADATA_AUDIT_EVENT_PREFIX = "METADATA_AUDIT_EVENT"; private ModelUtils() { // Util class @@ -202,7 +203,8 @@ private static Urn getUrnFromRelationship( * Similar to {@link #getUrnFromRelationship} but extracts from a delta union instead */ @Nonnull - public static Urn getSourceUrnFromRelationship(@Nonnull RELATIONSHIP relationship) { + public static Urn getSourceUrnFromRelationship( + @Nonnull RELATIONSHIP relationship) { return getUrnFromRelationship(relationship, "source"); } @@ -210,7 +212,8 @@ public static Urn getSourceUrnFromRelatio * Similar to {@link #getUrnFromRelationship} but extracts from a delta union instead */ @Nonnull - public static Urn getDestinationUrnFromRelationship(@Nonnull RELATIONSHIP relationship) { + public static Urn getDestinationUrnFromRelationship( + @Nonnull RELATIONSHIP relationship) { return getUrnFromRelationship(relationship, "destination"); } @@ -399,8 +402,8 @@ public static Class urnClassForDocument(@Nonnull Class urnClassForRelationship(@Nonnull Class relationshipClass, - @Nonnull String fieldName) { + private static Class urnClassForRelationship( + @Nonnull Class relationshipClass, @Nonnull String fieldName) { RelationshipValidator.validateRelationshipSchema(relationshipClass); return urnClassForField(relationshipClass, fieldName); } @@ -409,7 +412,8 @@ private static Class urnClassForRelationship(@Nonnull Class sourceUrnClassForRelationship(@Nonnull Class relationshipClass) { + public static Class sourceUrnClassForRelationship( + @Nonnull Class relationshipClass) { return urnClassForRelationship(relationshipClass, "source"); } @@ -417,7 +421,8 @@ public static Class sourceUrnClassForRelationship(@Nonnull Class< * Gets the expected {@link Urn} class for the destination field of a specific kind of relationship. */ @Nonnull - public static Class destinationUrnClassForRelationship(@Nonnull Class relationshipClass) { + public static Class destinationUrnClassForRelationship( + @Nonnull Class relationshipClass) { return urnClassForRelationship(relationshipClass, "destination"); } @@ -491,8 +496,9 @@ public static > getAllEntities() { - return new Reflections("com.linkedin.metadata.entity") - .getSubTypesOf(RecordTemplate.class).stream().filter(EntityValidator::isValidEntitySchema) + return new Reflections("com.linkedin.metadata.entity").getSubTypesOf(RecordTemplate.class) + .stream() + .filter(EntityValidator::isValidEntitySchema) .collect(Collectors.toSet()); } @@ -507,4 +513,15 @@ public static String getEntityTypeFromUrnClass(@Nonnull Class urn throw new RuntimeException(e); } } + + /** + * Get aspect specific kafka topic name from urn & aspect classes. + */ + @Nonnull + public static String getAspectSpecificMAETopicName(@Nonnull URN urn, + @Nonnull ASPECT newValue) { + final String urnStr = urn.getClass().getSimpleName().toUpperCase(); + return String.format("%s_%s_%s", METADATA_AUDIT_EVENT_PREFIX, urnStr.substring(0, urnStr.length() - "Urn".length()), + newValue.getClass().getSimpleName().toUpperCase()); + } } diff --git a/metadata-dao/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java b/metadata-dao/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java index 9f3c379cbef31..88ea91cb2604f 100644 --- a/metadata-dao/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java +++ b/metadata-dao/src/test/java/com/linkedin/metadata/dao/producer/DummyMetadataEventProducerTest.java @@ -18,5 +18,6 @@ public void testCreateDummyMetadataEventProducer() { AspectFoo newValue = new AspectFoo().setValue("new"); producer.produceMetadataAuditEvent(urn, oldValue, newValue); + producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue); } } diff --git a/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java b/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java index 93c54783b4d17..666de814251e5 100644 --- a/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java +++ b/metadata-dao/src/test/java/com/linkedin/metadata/dao/utils/ModelUtilsTest.java @@ -139,9 +139,7 @@ public void testGetUrnFromEntity() { public void testGetUrnFromRelationship() { FooUrn expectedSource = makeFooUrn(1); BarUrn expectedDestination = makeBarUrn(1); - RelationshipFoo relationship = new RelationshipFoo() - .setSource(expectedSource) - .setDestination(expectedDestination); + RelationshipFoo relationship = new RelationshipFoo().setSource(expectedSource).setDestination(expectedDestination); Urn sourceUrn = ModelUtils.getSourceUrnFromRelationship(relationship); Urn destinationUrn = ModelUtils.getDestinationUrnFromRelationship(relationship); @@ -228,7 +226,6 @@ public void testUrnClassForEntity() { assertEquals(ModelUtils.urnClassForEntity(EntityBar.class), BarUrn.class); } - @Test public void testUrnClassForSnapshot() { assertEquals(ModelUtils.urnClassForSnapshot(EntitySnapshot.class), Urn.class); @@ -268,4 +265,12 @@ public void testNewRelatioshipUnion() { assertEquals(relationshipUnion.getRelationshipFoo(), foo); } + + @Test + public void testGetMAETopicName() throws URISyntaxException { + FooUrn urn = new FooUrn(1); + AspectFoo foo = new AspectFoo().setValue("foo"); + + assertEquals(ModelUtils.getAspectSpecificMAETopicName(urn, foo), "METADATA_AUDIT_EVENT_FOO_ASPECTFOO"); + } } diff --git a/metadata-events/mxe-registration/build.gradle b/metadata-events/mxe-registration/build.gradle index bbfeb03c22323..10c454e5e4b14 100644 --- a/metadata-events/mxe-registration/build.gradle +++ b/metadata-events/mxe-registration/build.gradle @@ -1 +1,34 @@ apply plugin: 'java' + +configurations { + avroOriginal +} + +dependencies { + compile project(':metadata-events:mxe-avro-1.7') + compile project(':metadata-models') + compile spec.product.pegasus.dataAvro1_6 + + testCompile project(':metadata-dao') + testCompile project(':metadata-testing:metadata-test-utils') + + avroOriginal project(path: ':metadata-models', configuration: 'avroSchema') +} + +// copy original MXE avro schema from metadata-models to resources +task copyOriginalMXESchemas(type: Copy) { + dependsOn configurations.avroOriginal + + from { // use of closure defers evaluation until execution time + configurations.avroOriginal.collect { zipTree(it) } + } + into("src/main/resources/") + include("avro/com/linkedin/mxe/") +} + +compileJava.dependsOn copyOriginalMXESchemas +processResources.dependsOn copyOriginalMXESchemas + +clean { + project.delete("src/main/resources/avro") +} \ No newline at end of file diff --git a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Configs.java b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Configs.java new file mode 100644 index 0000000000000..e94fe44450f28 --- /dev/null +++ b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Configs.java @@ -0,0 +1,77 @@ +package com.linkedin.mxe; + +import com.linkedin.pegasus2avro.mxe.FailedMetadataChangeEvent; +import com.linkedin.pegasus2avro.mxe.MetadataAuditEvent; +import com.linkedin.pegasus2avro.mxe.MetadataChangeEvent; +import com.linkedin.pegasus2avro.mxe.MetadataGraphEvent; +import com.linkedin.pegasus2avro.mxe.MetadataSearchEvent; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecord; + + +public class Configs { + + public static final Map FABRIC_SCHEMA_REGISTRY_MAP = + Collections.unmodifiableMap(new HashMap() { + { + put("ei", "http://1.schemaregistry.ei4.atd.int.linkedin.com:10252"); + put("corp", "http://1.schemaregistry.corp-lca1.atd.corp.linkedin.com:10252"); + } + }); + + public static final Map TOPIC_SCHEMA_MAP = Collections.unmodifiableMap(new HashMap() { + { + put(Topics.METADATA_AUDIT_EVENT, MetadataAuditEvent.SCHEMA$); + put(Topics.METADATA_CHANGE_EVENT, MetadataChangeEvent.SCHEMA$); + put(Topics.FAILED_METADATA_CHANGE_EVENT, FailedMetadataChangeEvent.SCHEMA$); + put(Topics.METADATA_GRAPH_EVENT, MetadataGraphEvent.SCHEMA$); + put(Topics.METADATA_SEARCH_EVENT, MetadataSearchEvent.SCHEMA$); + + put(Topics.DEV_METADATA_AUDIT_EVENT, MetadataAuditEvent.SCHEMA$); + put(Topics.DEV_METADATA_CHANGE_EVENT, MetadataChangeEvent.SCHEMA$); + put(Topics.DEV_FAILED_METADATA_CHANGE_EVENT, FailedMetadataChangeEvent.SCHEMA$); + } + }); + + public static final Map> TOPIC_SCHEMA_CLASS_MAP = + Collections.unmodifiableMap(new HashMap>() { + { + // Aspect-specific MCE topic to schema. + // CorpGroupUrn + put(Topics.METADATA_AUDIT_EVENT_CORPGROUP_CORPGROUPINFO, + com.linkedin.pegasus2avro.mxe.corpGroup.corpGroupInfo.MetadataAuditEvent.class); + // CorpUserUrn + put(Topics.METADATA_AUDIT_EVENT_CORPUSER_CORPUSERINFO, + com.linkedin.pegasus2avro.mxe.corpuser.corpUserInfo.MetadataAuditEvent.class); + put(Topics.METADATA_AUDIT_EVENT_CORPUSER_CORPUSEREDITABLEINFO, + com.linkedin.pegasus2avro.mxe.corpuser.corpUserEditableInfo.MetadataAuditEvent.class); + + // Aspect-specific MCE topic to schema. + // CorpGroupUrn + put(Topics.METADATA_CHANGE_EVENT_CORPGROUP_CORPGROUPINFO, + com.linkedin.pegasus2avro.mxe.corpGroup.corpGroupInfo.MetadataChangeEvent.class); + // CorpUserUrn + put(Topics.METADATA_CHANGE_EVENT_CORPUSER_CORPUSERINFO, + com.linkedin.pegasus2avro.mxe.corpuser.corpUserInfo.MetadataChangeEvent.class); + put(Topics.METADATA_CHANGE_EVENT_CORPUSER_CORPUSEREDITABLEINFO, + com.linkedin.pegasus2avro.mxe.corpuser.corpUserEditableInfo.MetadataChangeEvent.class); + + // Aspect-specific FMCE topic to schema. + // CorpGroupUrn + put(Topics.FAILED_METADATA_CHANGE_EVENT_CORPGROUP_CORPGROUPINFO, + com.linkedin.pegasus2avro.mxe.corpGroup.corpGroupInfo.FailedMetadataChangeEvent.class); + // CorpUserUrn + put(Topics.FAILED_METADATA_CHANGE_EVENT_CORPUSER_CORPUSERINFO, + com.linkedin.pegasus2avro.mxe.corpuser.corpUserInfo.FailedMetadataChangeEvent.class); + put(Topics.FAILED_METADATA_CHANGE_EVENT_CORPUSER_CORPUSEREDITABLEINFO, + com.linkedin.pegasus2avro.mxe.corpuser.corpUserEditableInfo.FailedMetadataChangeEvent.class); + } + }); + + private Configs() { + // Util class + } +} \ No newline at end of file diff --git a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java index eb4d7e9ffda65..1c88f1f509e3c 100644 --- a/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java +++ b/metadata-events/mxe-registration/src/main/java/com/linkedin/mxe/Topics.java @@ -1,13 +1,58 @@ package com.linkedin.mxe; public class Topics { - public static final String METADATA_AUDIT_EVENT = "MetadataAuditEvent"; - public static final String METADATA_CHANGE_EVENT = "MetadataChangeEvent"; - public static final String FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent"; - public static final String METADATA_GRAPH_EVENT = "MetadataGraphEvent"; - public static final String METADATA_SEARCH_EVENT = "MetadataSearchEvent"; + public static final String METADATA_AUDIT_EVENT = "MetadataAuditEvent_v4"; + public static final String METADATA_CHANGE_EVENT = "MetadataChangeEvent_v4"; + public static final String FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent_v4"; + public static final String METADATA_GRAPH_EVENT = "MetadataGraphEvent_v4"; + public static final String METADATA_SEARCH_EVENT = "MetadataSearchEvent_v4"; + + public static final String DEV_METADATA_AUDIT_EVENT = "MetadataAuditEvent_v4_dev"; + public static final String DEV_METADATA_CHANGE_EVENT = "MetadataChangeEvent_v4_dev"; + public static final String DEV_FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent_v4_dev"; + + /** + * aspect-specific MAE topics. + * format : METADATA_AUDIT_EVENT__ + */ + // MAE topics for CorpGroup entity. + public static final String METADATA_AUDIT_EVENT_CORPGROUP_CORPGROUPINFO = + "MetadataAuditEvent_CorpGroup_CorpGroupInfo_v1"; + + // MAE topics for CorpUser entity. + public static final String METADATA_AUDIT_EVENT_CORPUSER_CORPUSEREDITABLEINFO = + "MetadataAuditEvent_CorpUser_CorpUserEditableInfo_v2"; + public static final String METADATA_AUDIT_EVENT_CORPUSER_CORPUSERINFO = "MetadataAuditEvent_CorpUser_CorpUserInfo_v2"; + + /** + * aspect-specific MCE topics. + * format : METADATA_CHANGE_EVENT__ + */ + // MCE topics for CorpGroup entity. + public static final String METADATA_CHANGE_EVENT_CORPGROUP_CORPGROUPINFO = + "MetadataChangeEvent_CorpGroup_CorpGroupInfo_v1"; + + // MCE topics for CorpUser entity. + public static final String METADATA_CHANGE_EVENT_CORPUSER_CORPUSEREDITABLEINFO = + "MetadataChangeEvent_CorpUser_CorpUserEditableInfo_v1"; + public static final String METADATA_CHANGE_EVENT_CORPUSER_CORPUSERINFO = + "MetadataChangeEvent_CorpUser_CorpUserInfo_v1"; + + /** + * aspect-specific FMCE topics. + * format : FAILED_METADATA_CHANGE_EVENT__ + */ + // FMCE topics for CorpGroup entity. + public static final String FAILED_METADATA_CHANGE_EVENT_CORPGROUP_CORPGROUPINFO = + "FailedMetadataChangeEvent_CorpGroup_CorpGroupInfo_v1"; + + // FMCE topics for CorpUser entity. + public static final String FAILED_METADATA_CHANGE_EVENT_CORPUSER_CORPUSEREDITABLEINFO = + "FailedMetadataChangeEvent_CorpUser_CorpUserEditableInfo_v1"; + public static final String FAILED_METADATA_CHANGE_EVENT_CORPUSER_CORPUSERINFO = + "FailedMetadataChangeEvent_CorpUser_CorpUserInfo_v1"; private Topics() { // Util class } -} +} \ No newline at end of file diff --git a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java index cc2289da804e8..af3ef7d0cc36d 100644 --- a/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java +++ b/metadata-events/mxe-utils-avro-1.7/src/main/java/com/linkedin/metadata/EventUtils.java @@ -4,6 +4,7 @@ import com.google.common.io.Resources; import com.linkedin.data.avro.DataTranslator; import com.linkedin.data.schema.RecordDataSchema; +import com.linkedin.data.template.RecordTemplate; import com.linkedin.mxe.FailedMetadataChangeEvent; import com.linkedin.mxe.MetadataAuditEvent; import com.linkedin.mxe.MetadataChangeEvent; @@ -22,6 +23,7 @@ import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificRecord; public class EventUtils { @@ -113,6 +115,24 @@ public static GenericRecord pegasusToAvroMCE(@Nonnull MetadataChangeEvent event) return renameSchemaNamespace(original, ORIGINAL_MCE_AVRO_SCHEMA, RENAMED_MCE_AVRO_SCHEMA); } + /** + * Converts a Pegasus aspect specific MXE into the equivalent Avro model as a {@link GenericRecord}. + * + * @param event the Pegasus aspect specific MXE model + * @return the Avro model with com.linkedin.pegasus2avro.mxe namesapce + * @throws IOException if the conversion fails + */ + + @Nonnull + public static MXE pegasusToAvroAspectSpecificMXE( + @Nonnull Class clazz, @Nonnull RecordTemplate event) + throws NoSuchFieldException, IOException, IllegalAccessException { + final Schema newSchema = (Schema) clazz.getField("SCHEMA$").get(null); + final Schema originalSchema = getAvroSchemaFromResource(getAvroResourcePath(clazz)); + final GenericRecord original = DataTranslator.dataMapToGenericRecord(event.data(), event.schema(), originalSchema); + return (MXE) renameSchemaNamespace(original, originalSchema, newSchema); + } + /** * Converts a Pegasus Failed MCE into the equivalent Avro model as a {@link GenericRecord}. * @@ -170,4 +190,18 @@ private static GenericRecord changeSchema(@Nonnull GenericRecord record, @Nonnul } } } + + /** + * Get Pegasus class from Avro class. + * @param clazz the aspect specific MXE avro class + * @return the Pegasus aspect specific MXE class + * @throws Exception + */ + public static Class getPegasusClass(@Nonnull Class clazz) throws ClassNotFoundException { + return Class.forName(clazz.getCanonicalName().replace(".pegasus2avro", "")); + } + + private static String getAvroResourcePath(@Nonnull Class clazz) { + return String.format("avro/%s.avsc", clazz.getCanonicalName().replace(".pegasus2avro", "").replace(".", "/")); + } } diff --git a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSchemaComposer.java b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSchemaComposer.java index 40bdfb21d59ae..1c181a61bf9c2 100644 --- a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSchemaComposer.java +++ b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/EventSchemaComposer.java @@ -58,6 +58,10 @@ private void generateResultFile(@Nonnull String entityUrn, @Nonnull EventSpec ev // generate FMCE writeToFile(new File(directory, FAILED_METADATA_CHANGE_EVENT + PDL_SUFFIX), renderToString(eventSpec, entityUrn, namespace, EVENT_TEMPLATES.get(FAILED_METADATA_CHANGE_EVENT))); + + // generate MAE + writeToFile(new File(directory, METADATA_AUDIT_EVENT + PDL_SUFFIX), + renderToString(eventSpec, entityUrn, namespace, EVENT_TEMPLATES.get(METADATA_AUDIT_EVENT))); } @Nonnull diff --git a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java index eee3c04f37ed7..2fa6aa29693ed 100644 --- a/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java +++ b/metadata-models-generator/src/main/java/com/linkedin/metadata/generator/SchemaGeneratorConstants.java @@ -19,12 +19,14 @@ private SchemaGeneratorConstants() { // used in EventSchemaComposer static final String FAILED_METADATA_CHANGE_EVENT = "FailedMetadataChangeEvent"; static final String FAILED_METADATA_CHANGE_EVENT_PREFIX = "Failed"; + static final String METADATA_AUDIT_EVENT = "MetadataAuditEvent"; static final String METADATA_CHANGE_EVENT = "MetadataChangeEvent"; static final String PDL_SUFFIX = ".pdl"; static final Map EVENT_TEMPLATES = Collections.unmodifiableMap(new HashMap() { { - put(METADATA_CHANGE_EVENT, "MetadataChangeEvent.rythm"); put(FAILED_METADATA_CHANGE_EVENT, "FailedMetadataChangeEvent.rythm"); + put(METADATA_AUDIT_EVENT, "MetadataAuditEvent.rythm"); + put(METADATA_CHANGE_EVENT, "MetadataChangeEvent.rythm"); } }); } \ No newline at end of file diff --git a/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm b/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm new file mode 100644 index 0000000000000..9c54ade503d3f --- /dev/null +++ b/metadata-models-generator/src/main/resources/MetadataAuditEvent.rythm @@ -0,0 +1,36 @@ +@import com.linkedin.metadata.generator.EventSpec; +@import com.linkedin.metadata.generator.SchemaGeneratorUtil; +@args String entityUrn, String nameSpace EventSpec eventSpec +@assign (entityName) {@SchemaGeneratorUtil.getEntityName(entityUrn)} +namespace com.linkedin.mxe@(nameSpace) + +import com.linkedin.avro2pegasus.events.KafkaAuditHeader +import @entityUrn +import @eventSpec.getFullValueType() + +/** + * MetadataAuditEvent for the @(entityName)Urn with @(eventSpec.getValueType()) aspect. + */ +@@MetadataAuditEvent +record MetadataAuditEvent { + + /** + * Kafka audit header for the MetadataAuditEvent. + */ + auditHeader: optional KafkaAuditHeader + + /** + * @(entityName)Urn as the key for the MetadataAuditEvent. + */ + urn: @(entityName)Urn + + /** + * Aspect of the @eventSpec.getValueType() before the update. + */ + oldValue: optional @eventSpec.getValueType() + + /** + * Aspect of the @eventSpec.getValueType() after the update. + */ + newValue: @eventSpec.getValueType() +} diff --git a/metadata-models-generator/src/test/java/com/linkedin/metadata/generator/TestEventSchemaComposer.java b/metadata-models-generator/src/test/java/com/linkedin/metadata/generator/TestEventSchemaComposer.java index 27ef7ab20095a..81f78e4809209 100644 --- a/metadata-models-generator/src/test/java/com/linkedin/metadata/generator/TestEventSchemaComposer.java +++ b/metadata-models-generator/src/test/java/com/linkedin/metadata/generator/TestEventSchemaComposer.java @@ -45,6 +45,21 @@ public void testFMCESchemaRender() throws Exception { + TEST_GENERATED_PDL))); } + @Test + public void testMAESchemaRender() throws Exception { + final String testMAE = + GENERATED_MXE_PATH + TEST_NAMESPACE + File.separator + METADATA_AUDIT_EVENT + PDL_SUFFIX; + final File metadataAuditEventBar = new File(testMAE); + + populateEvents(); + + assertTrue(metadataAuditEventBar.exists()); + assertEquals(IOUtils.toString(new FileInputStream(testMAE)), IOUtils.toString(this.getClass() + .getClassLoader() + .getResourceAsStream( + "com/linkedin/mxe" + TEST_NAMESPACE + File.separator + METADATA_AUDIT_EVENT + PDL_SUFFIX))); + } + private void populateEvents() throws Exception { SchemaAnnotationRetriever schemaAnnotationRetriever = new SchemaAnnotationRetriever(TEST_METADATA_MODELS_RESOLVED_PATH); diff --git a/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl new file mode 100644 index 0000000000000..567bea65e6b5a --- /dev/null +++ b/metadata-models-generator/src/test/resources/com/linkedin/mxe/bar/annotatedAspectBar/MetadataAuditEvent.pdl @@ -0,0 +1,32 @@ +namespace com.linkedin.mxe.bar.annotatedAspectBar + +import com.linkedin.avro2pegasus.events.KafkaAuditHeader +import com.linkedin.testing.BarUrn +import com.linkedin.testing.AnnotatedAspectBar + +/** + * MetadataAuditEvent for the BarUrn with AnnotatedAspectBar aspect. + */ +@MetadataAuditEvent +record MetadataAuditEvent { + + /** + * Kafka audit header for the MetadataAuditEvent. + */ + auditHeader: optional KafkaAuditHeader + + /** + * BarUrn as the key for the MetadataAuditEvent. + */ + urn: BarUrn + + /** + * Aspect of the AnnotatedAspectBar before the update. + */ + oldValue: optional AnnotatedAspectBar + + /** + * Aspect of the AnnotatedAspectBar after the update. + */ + newValue: AnnotatedAspectBar +} diff --git a/metadata-models/src/main/pegasus/com/linkedin/identity/CorpGroupInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/identity/CorpGroupInfo.pdl index bda73f3863595..ef743943fdd0e 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/identity/CorpGroupInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/identity/CorpGroupInfo.pdl @@ -7,6 +7,7 @@ import com.linkedin.common.EmailAddress /** * group of corpUser, it may contains nested group */ +@Aspect.EntityUrns = [ "com.linkedin.common.CorpGroupUrn" ] record CorpGroupInfo { /** diff --git a/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserEditableInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserEditableInfo.pdl index 801ab1a4e616f..c7afa9f9da4f4 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserEditableInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserEditableInfo.pdl @@ -5,6 +5,7 @@ import com.linkedin.common.Url /** * Linkedin corp user information that can be edited from UI */ +@Aspect.EntityUrns = [ "com.linkedin.common.CorpuserUrn" ] record CorpUserEditableInfo { /** diff --git a/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserInfo.pdl index bdc4634ba79d8..c387908c4e6a1 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserInfo.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/identity/CorpUserInfo.pdl @@ -6,6 +6,7 @@ import com.linkedin.common.EmailAddress /** * Linkedin corp user information */ +@Aspect.EntityUrns = [ "com.linkedin.common.CorpuserUrn" ] record CorpUserInfo { /** diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableClient.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableClient.java new file mode 100644 index 0000000000000..ed11b951631fa --- /dev/null +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseBrowsableClient.java @@ -0,0 +1,51 @@ +package com.linkedin.metadata.restli; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.query.BrowseResult; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.restli.client.Client; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Base client that all entities supporting browse as well as search should implement in their respective restli MPs + * @param the client's value type + * @param urn type of the entity + */ +public abstract class BaseBrowsableClient extends BaseSearchableClient { + + public BaseBrowsableClient(@Nonnull Client restliClient) { + super(restliClient); + } + + /** + * Browse method that the client extending this class must implement. Returns {@link BrowseResult} containing list of groups/entities + * that match given browse request + * + * @param inputPath Path to be browsed + * @param requestFilters Request map with fields and values to be applied as filters to the browse query + * @param from Start index of the first entity located in path + * @param size The max number of entities contained in the response + * @return {@link BrowseResult} containing list of groups/entities + * @throws RemoteInvocationException when the rest.li request fails + */ + @Nonnull + public abstract BrowseResult browse(@Nonnull String inputPath, @Nullable Map requestFilters, int from, int size) + throws RemoteInvocationException; + + /** + * Returns a list of paths for a given urn + * + * @param urn Urn of the entity + * @return all paths that are related to the urn + * @throws RemoteInvocationException when the rest.li request fails + */ + @Nonnull + public StringArray getBrowsePaths(@Nonnull URN urn) throws RemoteInvocationException { + throw new UnsupportedOperationException("Not implemented yet."); + } + +} \ No newline at end of file diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseClient.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseClient.java index 0759ef535ed14..0ec2617e28fbb 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseClient.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseClient.java @@ -1,14 +1,7 @@ package com.linkedin.metadata.restli; import com.linkedin.common.callback.FutureCallback; -import com.linkedin.data.schema.validation.CoercionMode; -import com.linkedin.data.schema.validation.RequiredMode; -import com.linkedin.data.schema.validation.UnrecognizedFieldMode; -import com.linkedin.data.schema.validation.ValidateDataAgainstSchema; -import com.linkedin.data.schema.validation.ValidationOptions; -import com.linkedin.data.schema.validation.ValidationResult; import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.dao.exception.ModelValidationException; import com.linkedin.r2.RemoteInvocationException; import com.linkedin.restli.client.ActionRequest; import com.linkedin.restli.client.BatchGetEntityRequest; @@ -29,11 +22,6 @@ public abstract class BaseClient implements AutoCloseable { - private static final ValidationOptions VALIDATION_OPTIONS = new ValidationOptions( - RequiredMode.FIXUP_ABSENT_WITH_DEFAULT, - CoercionMode.NORMAL, - UnrecognizedFieldMode.DISALLOW - ); protected final Client _client; @@ -59,9 +47,7 @@ public void close() { */ protected ASPECT get(@Nonnull GetRequest request) throws RemoteInvocationException { - final ASPECT aspect = _client.sendRequest(request).getResponse().getEntity(); - validateEntity(aspect); - return aspect; + return _client.sendRequest(request).getResponse().getEntity(); } /** @@ -91,11 +77,7 @@ protected Map getUrnFunc.apply(entry.getKey().getKey()), - entry -> { - ASPECT aspect = entry.getValue().getEntity(); - validateEntity(aspect); - return aspect; - } + entry -> entry.getValue().getEntity() )); } @@ -139,9 +121,7 @@ protected Map CollectionResponse getAll(@Nonnull GetAllRequest request) throws RemoteInvocationException { - final CollectionResponse response = _client.sendRequest(request).getResponse().getEntity(); - response.getElements().forEach(this::validateEntity); - return response; + return _client.sendRequest(request).getResponse().getEntity(); } /** @@ -164,9 +144,7 @@ CollectionResponse getAll(@Nonnull GetAllRequestBuilderBase ASPECT doAction(@Nonnull ActionRequest request) throws RemoteInvocationException { - final ASPECT aspect = _client.sendRequest(request).getResponse().getEntity(); - validateEntity(aspect); - return aspect; + return _client.sendRequest(request).getResponse().getEntity(); } /** @@ -176,11 +154,4 @@ protected ASPECT doAction(@Nonnull ActionRequest ASPECT doAction(@Nonnull ActionRequestBuilderBase requestBuilder) throws RemoteInvocationException { return doAction(requestBuilder.build()); } - - protected void validateEntity(@Nonnull RecordTemplate entity) { - final ValidationResult validationResult = ValidateDataAgainstSchema.validate(entity, VALIDATION_OPTIONS); - if (!validationResult.isValid()) { - throw new ModelValidationException(validationResult.getMessages().toString()); - } - } } diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java new file mode 100644 index 0000000000000..89c90c6c0be54 --- /dev/null +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BaseSearchableClient.java @@ -0,0 +1,65 @@ +package com.linkedin.metadata.restli; + +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.query.AutoCompleteResult; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.restli.client.Client; +import com.linkedin.restli.common.CollectionResponse; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * Base client that all entities supporting search should implement in their respective restli MPs + * @param the client's value type + */ +public abstract class BaseSearchableClient extends BaseClient { + + public BaseSearchableClient(@Nonnull Client restliClient) { + super(restliClient); + } + + /** + * Search method that the client inheriting this class must implement. Returns {@link CollectionResponse} containing list of aspects + * + * @param input Input query + * @param aspectNames List of aspects to be returned in the VALUE model + * @param requestFilters Request map with fields and values to be applied as filters to the search query + * @param sortCriterion {@link SortCriterion} to be applied to search results + * @param start Start of the page + * @param count Number of search results to return + * @return {@link CollectionResponse} of VALUE for the records that satisfy the given search query + * @throws RemoteInvocationException when the rest.li request fails + */ + @Nonnull + public abstract CollectionResponse search(@Nonnull String input, @Nonnull StringArray aspectNames, @Nullable Map requestFilters, + @Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException; + + /** + * Similar to {@link #search(String, StringArray, Map, SortCriterion, int, int)} with empty list for aspect names, meaning all aspects will be returned + */ + @Nonnull + public CollectionResponse search(@Nonnull String input, @Nullable Map requestFilters, + @Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException { + return search(input, new StringArray(), requestFilters, sortCriterion, start, count); + } + + /** + * Autocomplete method that the client will override only if they need this capability. It returns {@link AutoCompleteResult} containing list of suggestions. + * + * @param query Input query + * @param field Field against which the query needs autocompletion + * @param requestFilters Request map with fields and values to be applied as filters to the autocomplete query + * @param limit Number of suggestions returned + * @return {@link AutoCompleteResult} containing list of suggestions as strings + * @throws RemoteInvocationException when the rest.li request fails + */ + @Nonnull + public AutoCompleteResult autocomplete(@Nonnull String query, @Nullable String field, + @Nullable Map requestFilters, int limit) throws RemoteInvocationException { + throw new UnsupportedOperationException("Not implemented yet."); + } + +} \ No newline at end of file diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BrowsableClient.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BrowsableClient.java index ffa5e02575e47..878a1b045749f 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BrowsableClient.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/BrowsableClient.java @@ -11,6 +11,8 @@ /** * Interface which all entities supporting browse should implement in their respective restli MPs + * + * @deprecated Use {@link BaseBrowsableClient} instead */ public interface BrowsableClient { diff --git a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/SearchableClient.java b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/SearchableClient.java index a6621bd16a804..bcfae4f313415 100644 --- a/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/SearchableClient.java +++ b/metadata-restli-resource/src/main/java/com/linkedin/metadata/restli/SearchableClient.java @@ -13,6 +13,8 @@ /** * Interface that all entities that support search and autocomplete should implement in their respective restli MPs * @param the client's value type + * + * @deprecated Use {@link BaseSearchableClient} instead */ public interface SearchableClient { diff --git a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseBrowsableClientTest.java b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseBrowsableClientTest.java new file mode 100644 index 0000000000000..2c32f3c18ade3 --- /dev/null +++ b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseBrowsableClientTest.java @@ -0,0 +1,72 @@ +package com.linkedin.metadata.restli; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.query.BrowseResult; +import com.linkedin.metadata.query.BrowseResultEntityArray; +import com.linkedin.metadata.query.BrowseResultMetadata; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.restli.client.Client; +import com.linkedin.restli.common.CollectionMetadata; +import com.linkedin.restli.common.CollectionResponse; +import com.linkedin.testing.EntityValue; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +public class BaseBrowsableClientTest { + + private Client _mockRestClient; + + public static class TestBrowsableClient extends BaseBrowsableClient { + + public TestBrowsableClient(@Nonnull Client restliClient) { + super(restliClient); + } + + @Override + @Nonnull + public BrowseResult browse(@Nonnull String inputPath, @Nullable Map requestFilters, int from, int size) throws RemoteInvocationException { + BrowseResultMetadata browseResultMetadata = new BrowseResultMetadata().setTotalNumEntities(100); + return new BrowseResult().setEntities(new BrowseResultEntityArray()).setFrom(0).setPageSize(10).setMetadata(browseResultMetadata).setNumEntities(8); + } + + @Override + @Nonnull + public StringArray getBrowsePaths(@Nonnull Urn urn) throws RemoteInvocationException { + return new StringArray(Arrays.asList("/root/path1", "/root/path2", "/root/path3")); + } + + @Override + @Nonnull + public CollectionResponse search(@Nonnull String input, @Nonnull StringArray aspectNames, @Nullable Map requestFilters, + @Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException { + CollectionResponse collectionResponse = new CollectionResponse<>(EntityValue.class); + collectionResponse.setPaging(new CollectionMetadata().setTotal(200)); + return collectionResponse; + } + } + + @BeforeMethod + public void setup() { + _mockRestClient = mock(Client.class); + } + + @Test + public void testClient() throws RemoteInvocationException { + TestBrowsableClient testBrowsableClient = new TestBrowsableClient(_mockRestClient); + assertEquals(testBrowsableClient.search("test", new StringArray(), new HashMap<>(), null, 0, + 10).getPaging().getTotal().intValue(), 200); + assertEquals(testBrowsableClient.browse("/root", null, 0, 10).getNumEntities().intValue(), 8); + assertEquals(testBrowsableClient.browse("/root", null, 0, 10).getPageSize().intValue(), 10); + } + +} \ No newline at end of file diff --git a/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSearchableClientTest.java b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSearchableClientTest.java new file mode 100644 index 0000000000000..4c43af9a1c197 --- /dev/null +++ b/metadata-restli-resource/src/test/java/com/linkedin/metadata/restli/BaseSearchableClientTest.java @@ -0,0 +1,51 @@ +package com.linkedin.metadata.restli; + +import com.linkedin.data.template.StringArray; +import com.linkedin.metadata.query.SortCriterion; +import com.linkedin.r2.RemoteInvocationException; +import com.linkedin.restli.client.Client; +import com.linkedin.restli.common.CollectionMetadata; +import com.linkedin.restli.common.CollectionResponse; +import com.linkedin.testing.EntityValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +public class BaseSearchableClientTest { + + private Client _mockRestClient; + + public static class TestSearchableClient extends BaseSearchableClient { + + public TestSearchableClient(@Nonnull Client restliClient) { + super(restliClient); + } + + @Override + @Nonnull + public CollectionResponse search(@Nonnull String input, @Nonnull StringArray aspectNames, @Nullable Map requestFilters, + @Nullable SortCriterion sortCriterion, int start, int count) throws RemoteInvocationException { + CollectionResponse collectionResponse = new CollectionResponse<>(EntityValue.class); + collectionResponse.setPaging(new CollectionMetadata().setTotal(100)); + return collectionResponse; + } + } + + @BeforeMethod + public void setup() { + _mockRestClient = mock(Client.class); + } + + @Test + public void testClient() throws RemoteInvocationException { + TestSearchableClient testSearchableClient = new TestSearchableClient(_mockRestClient); + assertEquals(testSearchableClient.search("test", new StringArray(), new HashMap<>(), null, 0, + 10).getPaging().getTotal().intValue(), 100); + } +} \ No newline at end of file diff --git a/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/RelationshipFoo.pdl b/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/RelationshipFoo.pdl index 197bc508d6d89..9ba3b52922b82 100644 --- a/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/RelationshipFoo.pdl +++ b/metadata-testing/metadata-test-models/src/main/pegasus/com/linkedin/testing/RelationshipFoo.pdl @@ -12,17 +12,22 @@ import com.linkedin.common.Urn record RelationshipFoo { /** - * For unit tests + * Urn of the source entity */ source: Urn /** - * For unit tests + * Urn of the destination entity */ destination: Urn /** - * Attribute for the relationship, for unit tests + * Type attribute of the relationship */ type: optional string + + /** + * Integer field as a relationship attribute + */ + intField: optional int } \ No newline at end of file