Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
* Added: System property "metricRegistryStartConsoleReporter" to start the metric registry console reporter automatically
* Added: Add fetch hint to exclude edge vertex ids or edge ids from edge references
* Added: Accumulo: Logging of large rows coming from Accumulo custom iterators
* Added: Accumulo: Option to compress transfers from iterator to client
* Added: methods to read multiple Streaming Property Values in one request
* Added: saveAsync methods to mutation
* Changed: Elasticsearch: Throw better exceptions when elements are missing in missing document helper
* Changed: Elasticsearch: bulk to squash multiple updates to same element into a single update
* Changed: Accumulo: use a string lookup to store label strings to reduce memory usage when reading vertices with large number of edges
* Fixed: Accumulo: don't queue events when no event listeners are attached

# v4.9.7
Expand Down
55 changes: 54 additions & 1 deletion accumulo/graph/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>vertexium-accumulo-root</artifactId>
<groupId>org.vertexium</groupId>
Expand Down Expand Up @@ -33,6 +34,13 @@
<artifactId>accumulo-core</artifactId>
<version>${accumulo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
<classifier>sources</classifier>
<version>${accumulo.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
Expand Down Expand Up @@ -71,6 +79,51 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.8</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<unzip src="${org.apache.accumulo:accumulo-core:jar:sources}"
dest="${project.build.directory}/accumulo-source"/>
<mkdir dir="${project.build.directory}/generated-sources/org/vertexium/accumulo/accumulo/"/>
<patch patchfile="${project.basedir}/src/main/java/org/vertexium/accumulo/accumulo/TabletServerBatchWriter.patch"
originalfile="${project.build.directory}/accumulo-source/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java"
destfile="${project.build.directory}/generated-sources/org/vertexium/accumulo/accumulo/VertexiumTabletServerBatchWriter.java"/>
<patch patchfile="${project.basedir}/src/main/java/org/vertexium/accumulo/accumulo/MultiTableBatchWriter.patch"
originalfile="${project.build.directory}/accumulo-source/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java"
destfile="${project.build.directory}/generated-sources/org/vertexium/accumulo/accumulo/VertexiumMultiTableBatchWriter.java"/>
</target>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,47 +83,47 @@ public static Edge createFromIteratorValue(
long timestamp;

ByteArrayInputStream bain = new ByteArrayInputStream(value.get());
final DataInputStream in = new DataInputStream(bain);
DataInputStreamUtils.decodeHeader(in, ElementData.TYPE_ID_EDGE);
edgeId = DataInputStreamUtils.decodeString(in);
timestamp = in.readLong();
vertexVisibility = new Visibility(DataInputStreamUtils.decodeString(in));

hiddenVisibilities = Iterables.transform(DataInputStreamUtils.decodeStringSet(in), new Function<String, Visibility>() {
@Nullable
@Override
public Visibility apply(String input) {
return new Visibility(input);
}
});

ImmutableSet<String> additionalVisibilities = DataInputStreamUtils.decodeStringSet(in);

List<MetadataEntry> metadataEntries = DataInputStreamUtils.decodeMetadataEntries(in);
properties = DataInputStreamUtils.decodeProperties(graph, in, metadataEntries, fetchHints);
ImmutableSet<String> extendedDataTableNames = DataInputStreamUtils.decodeStringSet(in);
String inVertexId = DataInputStreamUtils.decodeString(in);
String outVertexId = DataInputStreamUtils.decodeString(in);
String label = graph.getNameSubstitutionStrategy().inflate(DataInputStreamUtils.decodeString(in));

return new AccumuloEdge(
graph,
edgeId,
outVertexId,
inVertexId,
label,
null,
vertexVisibility,
properties,
null,
null,
hiddenVisibilities,
additionalVisibilities,
extendedDataTableNames,
timestamp,
fetchHints,
authorizations
);
try (DataInputStream in = DataInputStreamUtils.decodeHeader(bain, ElementData.TYPE_ID_EDGE)) {
edgeId = DataInputStreamUtils.decodeString(in);
timestamp = in.readLong();
vertexVisibility = new Visibility(DataInputStreamUtils.decodeString(in));

hiddenVisibilities = Iterables.transform(DataInputStreamUtils.decodeStringSet(in), new Function<String, Visibility>() {
@Nullable
@Override
public Visibility apply(String input) {
return new Visibility(input);
}
});

ImmutableSet<String> additionalVisibilities = DataInputStreamUtils.decodeStringSet(in);

List<MetadataEntry> metadataEntries = DataInputStreamUtils.decodeMetadataEntries(in);
properties = DataInputStreamUtils.decodeProperties(graph, in, metadataEntries, fetchHints);
ImmutableSet<String> extendedDataTableNames = DataInputStreamUtils.decodeStringSet(in);
String inVertexId = DataInputStreamUtils.decodeString(in);
String outVertexId = DataInputStreamUtils.decodeString(in);
String label = graph.getNameSubstitutionStrategy().inflate(DataInputStreamUtils.decodeString(in));

return new AccumuloEdge(
graph,
edgeId,
outVertexId,
inVertexId,
label,
null,
vertexVisibility,
properties,
null,
null,
hiddenVisibilities,
additionalVisibilities,
extendedDataTableNames,
timestamp,
fetchHints,
authorizations
);
}
} catch (IOException ex) {
throw new VertexiumException("Could not read vertex", ex);
}
Expand Down Expand Up @@ -189,9 +189,8 @@ public Vertex getVertex(Direction direction, FetchHints fetchHints, Authorizatio
public ExistingEdgeMutation prepareMutation() {
return new ExistingEdgeMutation(this) {
@Override
public Edge save(Authorizations authorizations) {
saveExistingElementMutation(this, authorizations);
return getElement();
public SaveResult<Edge> saveAsync(Authorizations authorizations) {
return saveExistingElementMutation(this, authorizations);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.vertexium.accumulo;

import org.vertexium.Authorizations;
import org.vertexium.Edge;
import org.vertexium.EdgeBuilderByVertexId;
import org.vertexium.Visibility;

Expand All @@ -20,9 +19,6 @@ protected AccumuloEdgeBuilderByVertexId(
this.elementMutationBuilder = elementMutationBuilder;
}

@Override
public abstract Edge save(Authorizations authorizations);

@Override
public Iterable<KeyValuePair> getEdgeTableKeyValuePairs() {
AccumuloEdge edge = createEdge(new AccumuloAuthorizations());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
import org.vertexium.query.ExtendedDataQueryableIterable;
import org.vertexium.query.QueryableIterable;
import org.vertexium.search.IndexHint;
import org.vertexium.util.CompletableFutureUtils;
import org.vertexium.util.PropertyCollection;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Stream;

Expand Down Expand Up @@ -121,15 +125,21 @@ public AccumuloGraph getGraph() {
return (AccumuloGraph) graph;
}

protected <TElement extends Element> void saveExistingElementMutation(ExistingElementMutation<TElement> mutation, Authorizations authorizations) {
protected <TElement extends Element> SaveResult<TElement> saveExistingElementMutation(
ExistingElementMutation<TElement> mutation,
Authorizations authorizations
) {
List<CompletableFuture<Void>> futures = new ArrayList<>();

// Order matters a lot in this method
AccumuloElement element = (AccumuloElement) mutation.getElement();
SaveResult<TElement> result = new SaveResult<>(mutation.getElement());

// metadata must be altered first because the lookup of a property can include visibility which will be altered by alterElementPropertyVisibilities
getGraph().alterPropertyMetadatas(element, mutation.getSetPropertyMetadatas());
futures.add(getGraph().alterPropertyMetadatas(element, mutation.getSetPropertyMetadatas()));

// altering properties comes next because alterElementVisibility may alter the vertex and we won't find it
getGraph().alterElementPropertyVisibilities(element, mutation.getAlterPropertyVisibilities());
futures.add(getGraph().alterElementPropertyVisibilities(element, mutation.getAlterPropertyVisibilities()));

Iterable<PropertyDeleteMutation> propertyDeletes = mutation.getPropertyDeletes();
Iterable<PropertySoftDeleteMutation> propertySoftDeletes = mutation.getPropertySoftDeletes();
Expand All @@ -147,7 +157,7 @@ protected <TElement extends Element> void saveExistingElementMutation(ExistingEl
markPropertyVisibleMutations
);
updateAdditionalVisibilitiesInternal(additionalVisibilities, additionalVisibilityDeletes);
getGraph().savePropertiesAndAdditionalVisibilities(
futures.add(getGraph().savePropertiesAndAdditionalVisibilities(
element,
properties,
propertyDeletes,
Expand All @@ -156,27 +166,27 @@ protected <TElement extends Element> void saveExistingElementMutation(ExistingEl
additionalVisibilityDeletes,
markPropertyHiddenMutations,
markPropertyVisibleMutations
);
));

if (mutation.getNewElementVisibility() != null) {
getGraph().alterElementVisibility(element, mutation.getNewElementVisibility(), mutation.getNewElementVisibilityData());
futures.add(getGraph().alterElementVisibility(element, mutation.getNewElementVisibility(), mutation.getNewElementVisibilityData()));
}

if (mutation instanceof EdgeMutation) {
EdgeMutation edgeMutation = (EdgeMutation) mutation;

String newEdgeLabel = edgeMutation.getNewEdgeLabel();
if (newEdgeLabel != null) {
getGraph().alterEdgeLabel((AccumuloEdge) mutation.getElement(), newEdgeLabel);
futures.add(getGraph().alterEdgeLabel((AccumuloEdge) mutation.getElement(), newEdgeLabel));
}
}

if (mutation.getIndexHint() != IndexHint.DO_NOT_INDEX) {
getGraph().getSearchIndex().updateElement(graph, mutation, authorizations);
futures.add(getGraph().getSearchIndex().updateElement(graph, mutation, authorizations));
}

ElementType elementType = ElementType.getTypeFromElement(mutation.getElement());
getGraph().saveExtendedDataMutations(
futures.add(getGraph().saveExtendedDataMutations(
mutation.getElement(),
elementType,
mutation.getIndexHint(),
Expand All @@ -185,7 +195,12 @@ protected <TElement extends Element> void saveExistingElementMutation(ExistingEl
mutation.getAdditionalExtendedDataVisibilities(),
mutation.getAdditionalExtendedDataVisibilityDeletes(),
authorizations
);
));

CompletableFutureUtils.allOf(futures)
.thenRun(result::complete);

return result;
}

@Override
Expand Down
Loading