Skip to content

Gh-3295: Fix issue with Accumulo iterators when smart merging Federated POC #3296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package uk.gov.gchq.gaffer.federated.simple;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.cache.Cache;
import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
Expand Down Expand Up @@ -63,6 +66,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -76,6 +80,7 @@
* to sub graphs then merge the result.
*/
public class FederatedStore extends Store {
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedStore.class);
private static final String DEFAULT_CACHE_CLASS_FALLBACK = "uk.gov.gchq.gaffer.cache.impl.HashMapCacheService";

// Default graph IDs to execute on
Expand Down Expand Up @@ -161,7 +166,15 @@ public Iterable<GraphSerialisable> getAllGraphs() {
* @return The default list.
*/
public List<String> getDefaultGraphIds() {
return defaultGraphIds;
// Only return the graphs that actually exist
return defaultGraphIds.stream().filter(id -> {
try {
return Objects.nonNull(getGraph(id));
} catch (final IllegalArgumentException | CacheOperationException e) {
LOGGER.warn("Default Graph was not found: {}", id);
return false;
}
}).collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import uk.gov.gchq.gaffer.data.element.function.ElementAggregator;
import uk.gov.gchq.gaffer.store.schema.Schema;

import java.util.Iterator;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.stream.StreamSupport;
import java.util.stream.Collectors;

/**
* Operator for aggregating two iterables of {@link Element}s together, this
Expand All @@ -50,60 +51,42 @@ public void setSchema(final Schema schema) {
@Override
public Iterable<Element> apply(final Iterable<Element> update, final Iterable<Element> state) {
// Just append the state and update so we can loop over it to do accurate merging
Iterable<Element> chainedMerge = IterableUtils.chainedIterable(update, state);

// Custom merge iterable for lazy processing
Iterable<Element> mergeIterable = () ->
new Iterator<Element>() {
// An iterator over the chained state and update iterables gives an accurate hasNext
Iterator<Element> chainedMergeIterator = chainedMerge.iterator();

@Override
public boolean hasNext() {
return chainedMergeIterator.hasNext();
// We can't use the original iterators directly in case they close or become exhausted so save to a Set first.
Set<Element> chainedResult = new HashSet<>(IterableUtils.toList(IterableUtils.chainedIterable(update, state)));

// Iterate over the chained result to merge the elements with each other
// Collect to a set to ensure deduplication
return chainedResult.stream()
.map(e -> {
Element result = e;
// Set up the aggregator for this group based on the schema
ElementAggregator aggregator = new ElementAggregator();
if (schema != null) {
aggregator = schema.getElement(e.getGroup()).getIngestAggregator();
}

@Override
public Element next() {
// When requested do element aggregation on for the current element if required
Element current = chainedMergeIterator.next();
Element result = current;

// Set up the aggregator for this group based on the schema
ElementAggregator aggregator = new ElementAggregator();
if (schema != null) {
aggregator = schema.getElement(current.getGroup()).getIngestAggregator();
// Compare the current element with all others to do a full merge
for (final Element inner : chainedResult) {
// No merge required if not in same group
if (!e.getGroup().equals(inner.getGroup()) || e.equals(inner)) {
continue;
}

// Compare the current element with all others to do a full merge
for (final Element inner : chainedMerge) {
// No merge required if not in same group
if (!current.getGroup().equals(inner.getGroup())) {
continue;
}

if ((current instanceof Entity)
&& (inner instanceof Entity)
&& ((Entity) current).getVertex().equals(((Entity) inner).getVertex())) {
result = aggregator.apply(inner, result);

}
if ((e instanceof Entity)
&& (inner instanceof Entity)
&& ((Entity) e).getVertex().equals(((Entity) inner).getVertex())) {
result = aggregator.apply(inner.shallowClone(), result).shallowClone();
}

if ((current instanceof Edge)
&& (inner instanceof Edge)
&& ((Edge) current).getSource().equals(((Edge) inner).getSource())
&& ((Edge) current).getDestination().equals(((Edge) inner).getDestination())) {
result = aggregator.apply(inner, result);
}
if ((e instanceof Edge)
&& (inner instanceof Edge)
&& ((Edge) e).getSource().equals(((Edge) inner).getSource())
&& ((Edge) e).getDestination().equals(((Edge) inner).getDestination())) {
result = aggregator.apply(inner.shallowClone(), result);
}
return result;
}
};

// Use stream to dedupe the merged result
return () -> StreamSupport.stream(mergeIterable.spliterator(), false)
.distinct()
.iterator();
return result;
})
.collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package uk.gov.gchq.gaffer.federated.simple.operation.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
Expand All @@ -41,6 +44,7 @@
* if provided operation has output so that it is merged.
*/
public class FederatedOperationHandler<P extends Operation> implements OperationHandler<P> {
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedOperationHandler.class);

/**
* The operation option for the Graph IDs that an operation should be
Expand All @@ -59,17 +63,26 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
*/
public static final String OPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";

/**
* A boolean option to specify if to forward the whole operation chain to the sub graph or not.
*/
public static final String OPT_FORWARD_CHAIN = "federated.forwardChain";

@Override
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {

LOGGER.debug("Running operation: {}", operation);
// Check inside operation chains in case there are operations that don't require running on sub graphs
if (operation instanceof OperationChain) {
Set<Class<? extends Operation>> storeSpecificOps = ((FederatedStore) store).getStoreSpecificOperations();
List<Class<? extends Operation>> chainOps = ((OperationChain<?>) operation).flatten().stream()
.map(Operation::getClass)
.collect(Collectors.toList());
// If all the operations in the chain can be handled by the store then execute them
if (storeSpecificOps.containsAll(chainOps)) {

// If all the operations in the chain can be handled by the store then execute them.
// Or if told not to forward the whole chain process each operation individually.
if (storeSpecificOps.containsAll(chainOps) ||
(!Boolean.parseBoolean(operation.getOption(OPT_FORWARD_CHAIN, "true")))) {
// Use default handler
return new OperationChainHandler<>(store.getOperationChainValidator(), store.getOperationChainOptimisers())
.doOperation((OperationChain<Object>) operation, context, store);
}
Expand All @@ -79,7 +92,7 @@ public Object doOperation(final P operation, final Context context, final Store
if (!Collections.disjoint(storeSpecificOps, chainOps)) {
throw new OperationException(
"Chain contains standard Operations alongside federated store specific Operations."
+ " Please submit each type separately.");
+ " Please submit each type separately or set: '" + OPT_FORWARD_CHAIN + "' to: 'false'.");
}
}

Expand Down Expand Up @@ -129,6 +142,7 @@ protected List<GraphSerialisable> getGraphsToExecuteOn(final FederatedStore stor
try {
// Get the corresponding graph serialisable
for (final String id : graphIds) {
LOGGER.debug("Will execute on Graph: {}", id);
graphsToExecute.add(store.getGraph(id));
}
} catch (final CacheOperationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ void shouldFederateElementsByAggregation() throws StoreException, OperationExcep
final String graphId1 = "graph1";
final String graphId2 = "graph2";

final Graph graph1 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId1, StoreType.MAP);
final Graph graph2 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId2, StoreType.MAP);
final Graph graph1 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId1, StoreType.ACCUMULO);
final Graph graph2 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId2, StoreType.ACCUMULO);

final String group = "person";
final String vertex = "1";
Expand Down Expand Up @@ -130,7 +130,7 @@ void shouldFederateElementsByAggregation() throws StoreException, OperationExcep
}

@Test
void shouldPreventMixOfFederatedAndCoreOperationsInChain() throws StoreException {
void shouldPreventMixOfFederatedAndCoreOperationsInChainByDefault() throws StoreException {
// Given
FederatedStore federatedStore = new FederatedStore();
federatedStore.initialise("federated", null, new StoreProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils;
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils.StoreType;
import uk.gov.gchq.gaffer.graph.Graph;
import uk.gov.gchq.gaffer.graph.GraphConfig;
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.StoreProperties;
Expand Down Expand Up @@ -64,17 +65,35 @@ void shouldNotInitialiseWithSchema() {
}

@Test
void shouldSetDefaultGraphIds() throws StoreException {
void shouldNotSetDefaultGraphIdsIfGraphDoesNotExist() throws StoreException {
final String graphId = "federated";
final String graphId1 = "graph1";
final String graphId2 = "graph2";
FederatedStoreProperties properties = new FederatedStoreProperties();
// Set the defaults to graphs not available to the store
properties.set(PROP_DEFAULT_GRAPH_IDS, graphId1 + "," + graphId2);

FederatedStore store = new FederatedStore();
store.initialise(graphId, null, properties);

assertThat(store.getDefaultGraphIds()).containsExactlyInAnyOrder(graphId1, graphId2);
assertThat(store.getDefaultGraphIds()).isEmpty();
}

@Test
void shouldSetDefaultGraphIdsIfGraphsExist() throws StoreException {
final String graphId = "federated";
final String graphId1 = "graph1";
final String graphId2 = "graph2";
final FederatedStore store = new FederatedStore();
final FederatedStoreProperties properties = new FederatedStoreProperties();

// Set the defaults to graphs and make sure to add them
properties.set(PROP_DEFAULT_GRAPH_IDS, graphId1 + "," + graphId2);
store.initialise(graphId, null, properties);
store.addGraph(new GraphSerialisable(new GraphConfig(graphId1), new Schema(), new StoreProperties()));
store.addGraph(new GraphSerialisable(new GraphConfig(graphId2), new Schema(), new StoreProperties()));

assertThat(store.getDefaultGraphIds()).containsExactly(graphId1, graphId2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,25 @@
"property.string": {
"class": "java.lang.String",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Max"
}
},
"property.integer": {
"class": "java.lang.Integer",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
}
},
"property.double": {
"class": "java.lang.Double",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
}
},
"property.float": {
"class": "java.lang.Float",
"aggregateFunction": {
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
}
}
}
Expand Down
Loading