Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metadata-models 72.0.8 -> 80.0.0 #1756

Merged
merged 1 commit into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Pegasus & Avro
**/src/mainGenerated*
**/src/testGenerated*
metadata-events/mxe-registration/src/main/resources/**/*.avsc

# Java
.java-version
Expand Down
15 changes: 15 additions & 0 deletions metadata-dao-impl/ebean-dao/gma-create-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ create table metadata_aspect (
constraint pk_metadata_aspect primary key (urn,aspect,version)
);

create table metadata_index (
id bigint auto_increment not null,
urn varchar(500) not null,
aspect varchar(200) not null,
path varchar(200) not null,
longval bigint,
stringval varchar(500),
doubleval double,
constraint pk_metadata_index primary key (id)
);

create index idx_long_val on metadata_index (aspect,path,longval,urn);
create index idx_string_val on metadata_index (aspect,path,stringval,urn);
create index idx_double_val on metadata_index (aspect,path,doubleval,urn);
create index idx_urn on metadata_index (urn);
6 changes: 6 additions & 0 deletions metadata-dao-impl/ebean-dao/gma-drop-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,9 @@ drop table if exists metadata_id;

drop table if exists metadata_aspect;

drop table if exists metadata_index;

drop index if exists idx_long_val;
drop index if exists idx_string_val;
drop index if exists idx_double_val;
drop index if exists idx_urn;
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.linkedin.metadata.dao;

import io.ebean.annotation.Index;
import io.ebean.Model;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;


@Getter
@Setter
// define composite indexes
@Index(name = "idx_long_val", columnNames = {
EbeanMetadataIndex.ASPECT_COLUMN,
EbeanMetadataIndex.PATH_COLUMN,
EbeanMetadataIndex.LONG_COLUMN,
EbeanMetadataIndex.URN_COLUMN
})
@Index(name = "idx_string_val", columnNames = {
EbeanMetadataIndex.ASPECT_COLUMN,
EbeanMetadataIndex.PATH_COLUMN,
EbeanMetadataIndex.STRING_COLUMN,
EbeanMetadataIndex.URN_COLUMN
})
@Index(name = "idx_double_val", columnNames = {
EbeanMetadataIndex.ASPECT_COLUMN,
EbeanMetadataIndex.PATH_COLUMN,
EbeanMetadataIndex.DOUBLE_COLUMN,
EbeanMetadataIndex.URN_COLUMN
})
@Entity
@Table(name = "metadata_index")
public class EbeanMetadataIndex extends Model {

public static final long serialVersionUID = 1L;

private static final String ID_COLUMN = "id";
public static final String URN_COLUMN = "urn";
public static final String ASPECT_COLUMN = "aspect";
public static final String PATH_COLUMN = "path";
public static final String LONG_COLUMN = "longVal";
public static final String STRING_COLUMN = "stringVal";
public static final String DOUBLE_COLUMN = "doubleVal";

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = ID_COLUMN)
protected long id;

@NonNull
@Index(name = "idx_urn")
@Column(name = URN_COLUMN, length = 500, nullable = false)
protected String urn;

@NonNull
@Column(name = ASPECT_COLUMN, length = 200, nullable = false)
protected String aspect;

@NonNull
@Column(name = PATH_COLUMN, length = 200, nullable = false)
protected String path;

@Column(name = LONG_COLUMN)
protected Long longVal;

@Column(name = STRING_COLUMN, length = 500)
protected String stringVal;

@Column(name = DOUBLE_COLUMN)
protected Double doubleVal;

}
15 changes: 15 additions & 0 deletions metadata-dao-impl/ebean-dao/src/main/resources/gma-create-all.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ create table metadata_id (
constraint uq_metadata_id_namespace_id unique (namespace,id)
);

create table metadata_index (
id bigint auto_increment not null,
urn varchar(500) not null,
aspect varchar(200) not null,
path varchar(200) not null,
longval bigint,
stringval varchar(500),
doubleval double,
constraint pk_metadata_index primary key (id)
);

create index idx_long_val on metadata_index (aspect,path,longval,urn);
create index idx_string_val on metadata_index (aspect,path,stringval,urn);
create index idx_double_val on metadata_index (aspect,path,doubleval,urn);
create index idx_urn on metadata_index (urn);
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ public class ESUtils {
private static final String DEFAULT_SEARCH_RESULTS_SORT_BY_FIELD = "urn";

/*
* TODO: we might need to extend this list if need be, below link has the complete list
* https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html
* */
private static final char[] ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS = {'*'};
* Refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/regexp-syntax.html for list of reserved
* characters in an Elasticsearch regular expression.
*/
private static final String ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS = "?+*|{}[]()";

private ESUtils() {

Expand Down Expand Up @@ -81,7 +81,7 @@ public static void buildSortOrder(@Nonnull SearchSourceBuilder searchSourceBuild
*/
@Nonnull
public static String escapeReservedCharacters(@Nonnull String input) {
for (char reservedChar : ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS) {
for (char reservedChar : ELASTICSEARCH_REGEXP_RESERVED_CHARACTERS.toCharArray()) {
input = input.replace(String.valueOf(reservedChar), "\\" + reservedChar);
}
return input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ public void testBuildFilterQuery() throws Exception {
public void testEscapeReservedCharacters() {
assertEquals(escapeReservedCharacters("foobar"), "foobar");
assertEquals(escapeReservedCharacters("**"), "\\*\\*");
assertEquals(escapeReservedCharacters("()"), "\\(\\)");
assertEquals(escapeReservedCharacters("{}"), "\\{\\}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
import com.linkedin.metadata.dao.utils.ModelUtils;
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.Configs;
import com.linkedin.mxe.MetadataAuditEvent;
import com.linkedin.mxe.MetadataChangeEvent;
import com.linkedin.mxe.Topics;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -107,11 +111,57 @@ record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);
}
}

@Override
public <ASPECT extends RecordTemplate> void produceAspectSpecificMetadataAuditEvent(@Nonnull URN urn,
@Nullable ASPECT oldValue, @Nonnull ASPECT newValue) {

validateAspectSpecificTopic(ModelUtils.getAspectSpecificMAETopicName(urn, newValue));

String topic;
Class<? extends SpecificRecord> maeAvroClass;
RecordTemplate metadataAuditEvent;
try {
topic = (String) Topics.class.getField(ModelUtils.getAspectSpecificMAETopicName(urn, newValue)).get(null);
maeAvroClass = Configs.TOPIC_SCHEMA_CLASS_MAP.get(topic);
metadataAuditEvent = (RecordTemplate) EventUtils.getPegasusClass(maeAvroClass).newInstance();

metadataAuditEvent.getClass().getMethod("setUrn", urn.getClass()).invoke(metadataAuditEvent, urn);
metadataAuditEvent.getClass().getMethod("setNewValue", newValue.getClass()).invoke(metadataAuditEvent, newValue);
if (oldValue != null) {
metadataAuditEvent.getClass()
.getMethod("setOldValue", oldValue.getClass())
.invoke(metadataAuditEvent, oldValue);
}
} catch (NoSuchFieldException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
| InstantiationException | InvocationTargetException e) {
throw new IllegalArgumentException("Failed to compose the Pegasus aspect specific MAE", e);
}

GenericRecord record;
try {
record = EventUtils.pegasusToAvroAspectSpecificMXE(maeAvroClass, metadataAuditEvent);
} catch (NoSuchFieldException | IOException | IllegalAccessException e) {
throw new ModelConversionException("Failed to convert Pegasus aspect specific MAE to Avro", e);
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
} else {
_producer.send(new ProducerRecord(topic, urn.toString(), record));
}
}

@Nonnull
private Snapshot makeSnapshot(@Nonnull URN urn, @Nonnull RecordTemplate value) {
Snapshot snapshot = new Snapshot();
List<ASPECT_UNION> aspects = Collections.singletonList(ModelUtils.newAspectUnion(_aspectUnionClass, value));
RecordUtils.setSelectedRecordTemplateInUnion(snapshot, ModelUtils.newSnapshot(_snapshotClass, urn, aspects));
return snapshot;
}
}

static void validateAspectSpecificTopic(@Nonnull String topic) {
if (!Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic))) {
throw new IllegalArgumentException(String.format("The aspect specific topic %s is not registered.", topic));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import com.linkedin.metadata.query.RelationshipFilter;
import com.linkedin.metadata.validator.EntityValidator;
import com.linkedin.metadata.validator.RelationshipValidator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -185,7 +185,7 @@ public List<RecordTemplate> findMixedTypesRelationships(@Nonnull Statement query

@Nonnull
public <SRC_ENTITY extends RecordTemplate, DEST_ENTITY extends RecordTemplate, RELATIONSHIP extends RecordTemplate>
List<List<RecordTemplate>> getPathsToAllNodesTraversed(
List<List<RecordTemplate>> getTraversedPaths(
@Nullable Class<SRC_ENTITY> sourceEntityClass, @Nonnull Filter sourceEntityFilter,
@Nullable Class<DEST_ENTITY> destinationEntityClass, @Nonnull Filter destinationEntityFilter,
@Nonnull Class<RELATIONSHIP> relationshipType, @Nonnull RelationshipFilter relationshipFilter,
Expand Down Expand Up @@ -221,7 +221,7 @@ List<List<RecordTemplate>> getPathsToAllNodesTraversed(

final Statement statement = buildStatement(statementString, "length(p), dest.urn", offset, count);

return runQuery(statement, this::pathRecordToEntityList);
return runQuery(statement, this::pathRecordToPathList);
}

/**
Expand Down Expand Up @@ -302,11 +302,21 @@ private <ENTITY extends RecordTemplate> ENTITY nodeRecordToEntity(@Nonnull Class
}

@Nonnull
private List<RecordTemplate> pathRecordToEntityList(@Nonnull Record pathRecord) {
private List<RecordTemplate> pathRecordToPathList(@Nonnull Record pathRecord) {
final Path path = pathRecord.values().get(0).asPath();
return StreamSupport.stream(path.nodes().spliterator(), false)
.map(Neo4jUtil::nodeToEntity)
.collect(Collectors.toList());
final List<RecordTemplate> pathList = new ArrayList<>();

StreamSupport.stream(path.spliterator(), false)
.map(Neo4jUtil::pathSegmentToRecordList)
.forEach(segment -> {
if (pathList.isEmpty()) {
pathList.add(segment.get(0));
}
pathList.add(segment.get(1));
pathList.add(segment.get(2));
});

return pathList;
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
import com.linkedin.metadata.query.Filter;
import com.linkedin.metadata.query.RelationshipDirection;
import com.linkedin.metadata.query.RelationshipFilter;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.ClassUtils;
import org.neo4j.driver.types.Node;
import org.neo4j.driver.types.Path;
import org.neo4j.driver.types.Relationship;

import static com.linkedin.metadata.dao.utils.QueryUtils.*;
Expand Down Expand Up @@ -165,6 +168,25 @@ public static RecordTemplate nodeToEntity(@Nonnull Node node) {
return RecordUtils.toRecordTemplate(className, new DataMap(node.asMap()));
}

/**
* Converts path segment (field:value map) list of {@link RecordTemplate}s of nodes & edges
*
* @param segment The segment of a path containing nodes & edges
* @return List<RecordTemplate>
*/
@Nonnull
public static List<RecordTemplate> pathSegmentToRecordList(@Nonnull Path.Segment segment) {
final Node startNode = segment.start();
final Node endNode = segment.end();
final Relationship edge = segment.relationship();

return Arrays.asList(
nodeToEntity(startNode),
edgeToRelationship(startNode, endNode, edge),
nodeToEntity(endNode)
);
}

/**
* Converts edge (source-relationship->destination) to RELATIONSHIP
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,17 +511,17 @@ public void testFindNodesInPath() throws Exception {
// Get reports roll-up - 2 levels
Filter sourceFilter = newFilter("urn", urn1.toString());
RelationshipFilter relationshipFilter = createRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING);
List<List<RecordTemplate>> nodesInPath = _dao.getPathsToAllNodesTraversed(EntityFoo.class, sourceFilter, null,
List<List<RecordTemplate>> paths = _dao.getTraversedPaths(EntityFoo.class, sourceFilter, null,
EMPTY_FILTER, RelationshipFoo.class, relationshipFilter, 1, 2, -1, -1);
assertEquals(nodesInPath.size(), 5);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 2).collect(Collectors.toList()).size(), 2);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 3);
assertEquals(paths.size(), 5);
assertEquals(paths.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 2);
assertEquals(paths.stream().filter(l -> l.size() == 5).collect(Collectors.toList()).size(), 3);

// Get reports roll-up - 1 level
nodesInPath = _dao.getPathsToAllNodesTraversed(EntityFoo.class, sourceFilter, null,
paths = _dao.getTraversedPaths(EntityFoo.class, sourceFilter, null,
EMPTY_FILTER, RelationshipFoo.class, relationshipFilter, 1, 1, -1, -1);
assertEquals(nodesInPath.size(), 2);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 2).collect(Collectors.toList()).size(), 2);
assertEquals(nodesInPath.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 0);
assertEquals(paths.size(), 2);
assertEquals(paths.stream().filter(l -> l.size() == 3).collect(Collectors.toList()).size(), 2);
assertEquals(paths.stream().filter(l -> l.size() == 5).collect(Collectors.toList()).size(), 0);
}
}
Loading