Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions app/dao/models/cassandra/CassandraGraphVersionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import util.IdGenerator;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,12 +97,12 @@ public GraphVersion create(Map<String, Tag> tags,

this.dbClient.insert("graph_version", insertions);

List<DbDataContainer> predicate = new ArrayList<>();
predicate.add(new DbDataContainer("id", GroundType.LONG, id));
for (long edgeVersionId : edgeVersionIds) {
List<DbDataContainer> edgeInsertion = new ArrayList<>();
edgeInsertion.add(new DbDataContainer("graph_version_id", GroundType.LONG, id));
edgeInsertion.add(new DbDataContainer("edge_version_id", GroundType.LONG, edgeVersionId));

this.dbClient.insert("graph_version_edge", edgeInsertion);
Set<Long> edge = new HashSet<>();
edge.add(edgeVersionId);
this.dbClient.addToSet("graph_version", "edge_version_set", edge, predicate);
}

this.graphFactory.update(graphId, id, parentIds);
Expand All @@ -126,26 +126,12 @@ public GraphVersion retrieveFromDatabase(long id) throws GroundException {
List<DbDataContainer> predicates = new ArrayList<>();
predicates.add(new DbDataContainer("id", GroundType.LONG, id));

List<DbDataContainer> edgePredicate = new ArrayList<>();
edgePredicate.add(new DbDataContainer("graph_version_id", GroundType.LONG, id));

CassandraResults resultSet = this.dbClient.equalitySelect("graph_version",
DbClient.SELECT_STAR,
predicates);


long graphId = resultSet.getLong("graph_id");

List<Long> edgeVersionIds = new ArrayList<>();
CassandraResults edgeSet = this.dbClient.equalitySelect("graph_version_edge",
DbClient.SELECT_STAR,
edgePredicate);

if (!edgeSet.isEmpty()) {
do {
edgeVersionIds.add(edgeSet.getLong("edge_version_id"));
} while (edgeSet.next());
}
List<Long> edgeVersionIds = new ArrayList<>(resultSet.getSet("edge_version_set", Long.class));

LOGGER.info("Retrieved graph version " + id + " in graph " + graphId + ".");
return new GraphVersion(id, version.getTags(), version.getStructureVersionId(),
Expand Down
29 changes: 11 additions & 18 deletions app/dao/models/cassandra/CassandraStructureVersionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,11 @@ public StructureVersion create(long structureId,
this.dbClient.insert("structure_version", insertions);

for (String key : attributes.keySet()) {
List<DbDataContainer> itemInsertions = new ArrayList<>();
itemInsertions.add(new DbDataContainer("structure_version_id", GroundType.LONG, id));
itemInsertions.add(new DbDataContainer("key", GroundType.STRING, key));
itemInsertions.add(new DbDataContainer("type", GroundType.STRING,
attributes.get(key).toString()));

this.dbClient.insert("structure_version_attribute", itemInsertions);
Map<String, String> map = new HashMap<>();
map.put(key, attributes.get(key).toString());
List<DbDataContainer> predicate = new ArrayList<>();
predicate.add(new DbDataContainer("id", GroundType.LONG, id));
this.dbClient.addToMap("structure_version", "key_type_map", map, predicate);
}

this.structureFactory.update(structureId, id, parentIds);
Expand All @@ -120,21 +118,16 @@ public StructureVersion retrieveFromDatabase(long id) throws GroundException {
predicates);
super.verifyResultSet(resultSet, id);

Map<String, GroundType> attributes = new HashMap<>();

List<DbDataContainer> attributePredicates = new ArrayList<>();
attributePredicates.add(new DbDataContainer("structure_version_id", GroundType.LONG, id));
CassandraResults attributesSet = this.dbClient.equalitySelect("structure_version_attribute",
DbClient.SELECT_STAR, attributePredicates);
Map<String, String> tmpAttributes = resultSet.getMap("key_type_map", String.class, String.class);

if (attributesSet.isEmpty()) {
if (tmpAttributes.isEmpty()) {
throw new GroundException("No StructureVersion attributes found for id " + id + ".");
}

do {
attributes.put(attributesSet.getString("key"), GroundType.fromString(attributesSet
.getString("type")));
} while (attributesSet.next());
Map<String, GroundType> attributes = new HashMap<>();
for (String key : tmpAttributes.keySet()) {
attributes.put(key, GroundType.fromString(tmpAttributes.get(key)));
}

long structureId = resultSet.getLong("structure_id");

Expand Down
28 changes: 10 additions & 18 deletions app/dao/usage/cassandra/CassandraLineageGraphVersionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
import util.IdGenerator;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -104,13 +105,12 @@ public LineageGraphVersion create(Map<String, Tag> tags,
this.dbClient.insert("lineage_graph_version", insertions);

for (long lineageEdgeVersionId : lineageEdgeVersionIds) {
List<DbDataContainer> lineageEdgeInsertion = new ArrayList<>();
lineageEdgeInsertion.add(new DbDataContainer("lineage_graph_version_id", GroundType.LONG,
id));
lineageEdgeInsertion.add(new DbDataContainer("lineage_edge_version_id", GroundType.LONG,
lineageEdgeVersionId));
List<DbDataContainer> predicates = new ArrayList<>();
predicates.add(new DbDataContainer("id", GroundType.LONG, id));
Set<Long> edgeValue = new HashSet<>();
edgeValue.add(lineageEdgeVersionId);

this.dbClient.insert("lineage_graph_version_edge", lineageEdgeInsertion);
this.dbClient.addToSet("lineage_graph_version", "lineage_edge_version_id_set", edgeValue, predicates);
}

this.lineageGraphFactory.update(lineageGraphId, id, parentIds);
Expand All @@ -137,8 +137,8 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException
List<DbDataContainer> predicates = new ArrayList<>();
predicates.add(new DbDataContainer("id", GroundType.LONG, id));

List<DbDataContainer> lineageEdgePredicate = new ArrayList<>();
lineageEdgePredicate.add(new DbDataContainer("lineage_graph_version_id", GroundType.LONG,
List<DbDataContainer> lineageGraphVersionPredicates = new ArrayList<>();
lineageGraphVersionPredicates.add(new DbDataContainer("id", GroundType.LONG,
id));

CassandraResults resultSet = this.dbClient.equalitySelect("lineage_graph_version",
Expand All @@ -148,15 +148,7 @@ public LineageGraphVersion retrieveFromDatabase(long id) throws GroundException

long lineageGraphId = resultSet.getLong("lineage_graph_id");

List<Long> lineageEdgeVersionIds = new ArrayList<>();
CassandraResults lineageEdgeSet = this.dbClient.equalitySelect("lineage_graph_version_edge",
DbClient.SELECT_STAR, lineageEdgePredicate);

if (!lineageEdgeSet.isEmpty()) {
do {
lineageEdgeVersionIds.add(lineageEdgeSet.getLong("lineage_edge_version_id"));
} while (lineageEdgeSet.next());
}
List<Long> lineageEdgeVersionIds = new ArrayList<>(resultSet.getSet("lineage_edge_version_id_set", Long.class));

LOGGER.info("Retrieved lineage_graph version " + id + " in lineage_graph " + lineageGraphId
+ ".");
Expand Down
51 changes: 22 additions & 29 deletions app/dao/versions/cassandra/CassandraVersionHistoryDagFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,21 @@
package dao.versions.cassandra;

import com.google.common.base.CaseFormat;

import dao.versions.VersionHistoryDagFactory;
import db.CassandraClient;
import db.CassandraResults;
import db.DbClient;
import db.DbDataContainer;
import exceptions.GroundException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import models.models.Structure;
import models.versions.GroundType;
import models.versions.Item;
import models.versions.Version;
import models.versions.VersionHistoryDag;
import models.versions.VersionSuccessor;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class CassandraVersionHistoryDagFactory implements VersionHistoryDagFactory {
private final CassandraClient dbClient;
private final CassandraVersionSuccessorFactory versionSuccessorFactory;
Expand Down Expand Up @@ -61,21 +57,14 @@ public <T extends Version> VersionHistoryDag<T> create(long itemId) throws Groun
@Override
public <T extends Version> VersionHistoryDag<T> retrieveFromDatabase(long itemId)
throws GroundException {
List<DbDataContainer> predicates = new ArrayList<>();
predicates.add(new DbDataContainer("item_id", GroundType.LONG, itemId));
CassandraResults resultSet = this.dbClient.equalitySelect("version_history_dag", DbClient.SELECT_STAR,
predicates);
List<VersionSuccessor<T>> edges;

if (resultSet.isEmpty()) {
try {
edges = this.versionSuccessorFactory.retrieveFromDatabaseByItemId(itemId);
} catch (GroundException e) {
// do nothing, this just means no version have been added yet.
return new VersionHistoryDag<T>(itemId, new ArrayList<>());
}

List<VersionSuccessor<T>> edges = new ArrayList<>();
do {
edges.add(this.versionSuccessorFactory.retrieveFromDatabase(resultSet
.getLong("version_successor_id")));
} while (resultSet.next());

return new VersionHistoryDag(itemId, edges);
}

Expand All @@ -93,12 +82,12 @@ public void addEdge(VersionHistoryDag dag, long parentId, long childId, long ite
throws GroundException {
VersionSuccessor successor = this.versionSuccessorFactory.create(parentId, childId);

List<DbDataContainer> insertions = new ArrayList<>();
insertions.add(new DbDataContainer("item_id", GroundType.LONG, itemId));
insertions.add(new DbDataContainer("version_successor_id", GroundType.LONG, successor.getId()));

this.dbClient.insert("version_history_dag", insertions);
// Adding to the entry with id = successor.getId()
List<DbDataContainer> newValue = new ArrayList<>(), predicate = new ArrayList<>();
newValue.add(new DbDataContainer("item_id", GroundType.LONG, itemId));
predicate.add(new DbDataContainer("id", GroundType.LONG, successor.getId()));

this.dbClient.update(newValue, predicate, "version_successor");
dag.addEdge(parentId, childId, successor.getId());
}

Expand Down Expand Up @@ -147,14 +136,18 @@ public void truncate(VersionHistoryDag dag, int numLevels, Class<? extends Item>
tableNamePrefix = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, tableNamePrefix);

if (itemType.equals(Structure.class)) {
predicates.add(new DbDataContainer("structure_version_id", GroundType.LONG, id));
this.dbClient.delete(predicates, "structure_version_attribute");
predicates.add(new DbDataContainer("id", GroundType.LONG, id));
List<String> columns = new ArrayList<>();
columns.add("key_type_map");
this.dbClient.deleteColumn(predicates, "structure_version", columns);
predicates.clear();
}

if (itemType.getName().toLowerCase().contains("graph")) {
predicates.add(new DbDataContainer(tableNamePrefix + "_version_id", GroundType.LONG, id));
this.dbClient.delete(predicates, tableNamePrefix + "_version_edge");
predicates.add(new DbDataContainer("id", GroundType.LONG, id));
List<String> columns = new ArrayList<>();
columns.add("edge_version_set");
this.dbClient.deleteColumn(predicates, "graph_version", columns);
predicates.clear();
}

Expand Down
34 changes: 25 additions & 9 deletions app/dao/versions/cassandra/CassandraVersionSuccessorFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
import db.DbClient;
import db.DbDataContainer;
import exceptions.GroundException;
import java.util.ArrayList;
import java.util.List;
import models.versions.GroundType;
import models.versions.Version;
import models.versions.VersionSuccessor;
import util.IdGenerator;

import java.util.ArrayList;
import java.util.List;

public class CassandraVersionSuccessorFactory implements VersionSuccessorFactory {
private final CassandraClient dbClient;
private final IdGenerator idGenerator;
Expand Down Expand Up @@ -66,6 +65,29 @@ public <T extends Version> VersionSuccessor<T> create(long fromId, long toId)
return new VersionSuccessor<>(dbId, toId, fromId);
}

public <T extends Version> List<VersionSuccessor<T>> retrieveFromDatabaseByItemId(long itemId)
throws GroundException {
CassandraResults resultSet;

List<VersionSuccessor<T>> versionSuccessors = new ArrayList<>();
List<DbDataContainer> predicate = new ArrayList<>();
predicate.add(new DbDataContainer("item_id", GroundType.LONG, itemId));
resultSet = this.dbClient.equalitySelect("version_successor", DbClient.SELECT_STAR, predicate);

if (resultSet.isEmpty()) {
throw new GroundException("No VersionSuccessor found with itemId " + itemId + ".");
}

do {
long id = resultSet.getLong("id");
long fromId = resultSet.getLong("from_version_id");
long toId = resultSet.getLong("to_version_id");
versionSuccessors.add(new VersionSuccessor<>(id, fromId, toId));
} while (resultSet.next());

return versionSuccessors;
}

/**
* Retrieve a version successor from the database.
*
Expand Down Expand Up @@ -115,12 +137,6 @@ public void deleteFromDestination(long toId, long itemId) throws GroundException
do {
long dbId = resultSet.getLong("id");

predicates.clear();
predicates.add(new DbDataContainer("item_id", GroundType.LONG, itemId));
predicates.add(new DbDataContainer("version_successor_id", GroundType.LONG, dbId));

this.dbClient.delete(predicates, "version_history_dag");

predicates.clear();
predicates.add(new DbDataContainer("id", GroundType.LONG, dbId));

Expand Down
Loading