Skip to content

Commit 4007ee3

Browse files
authored
Gh-3295: Fix issue with Accumulo iterators when smart merging Federated POC (#3296)
2 parents 4ce1ec5 + d770cff commit 4007ee3

File tree

6 files changed

+93
-64
lines changed

6 files changed

+93
-64
lines changed

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/FederatedStore.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package uk.gov.gchq.gaffer.federated.simple;
1818

19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
1922
import uk.gov.gchq.gaffer.cache.Cache;
2023
import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
2124
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
@@ -63,6 +66,7 @@
6366
import java.util.LinkedList;
6467
import java.util.List;
6568
import java.util.Map;
69+
import java.util.Objects;
6670
import java.util.Set;
6771
import java.util.stream.Collectors;
6872
import java.util.stream.Stream;
@@ -76,6 +80,7 @@
7680
* to sub graphs then merge the result.
7781
*/
7882
public class FederatedStore extends Store {
83+
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedStore.class);
7984
private static final String DEFAULT_CACHE_CLASS_FALLBACK = "uk.gov.gchq.gaffer.cache.impl.HashMapCacheService";
8085

8186
// Default graph IDs to execute on
@@ -161,7 +166,15 @@ public Iterable<GraphSerialisable> getAllGraphs() {
161166
* @return The default list.
162167
*/
163168
public List<String> getDefaultGraphIds() {
164-
return defaultGraphIds;
169+
// Only return the graphs that actually exist
170+
return defaultGraphIds.stream().filter(id -> {
171+
try {
172+
return Objects.nonNull(getGraph(id));
173+
} catch (final IllegalArgumentException | CacheOperationException e) {
174+
LOGGER.warn("Default Graph was not found: {}", id);
175+
return false;
176+
}
177+
}).collect(Collectors.toList());
165178
}
166179

167180
/**

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/merge/operator/ElementAggregateOperator.java

+33-50
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import uk.gov.gchq.gaffer.data.element.function.ElementAggregator;
2525
import uk.gov.gchq.gaffer.store.schema.Schema;
2626

27-
import java.util.Iterator;
27+
import java.util.HashSet;
28+
import java.util.Set;
2829
import java.util.function.BinaryOperator;
29-
import java.util.stream.StreamSupport;
30+
import java.util.stream.Collectors;
3031

3132
/**
3233
* Operator for aggregating two iterables of {@link Element}s together, this
@@ -50,60 +51,42 @@ public void setSchema(final Schema schema) {
5051
@Override
5152
public Iterable<Element> apply(final Iterable<Element> update, final Iterable<Element> state) {
5253
// Just append the state and update so we can loop over it to do accurate merging
53-
Iterable<Element> chainedMerge = IterableUtils.chainedIterable(update, state);
54-
55-
// Custom merge iterable for lazy processing
56-
Iterable<Element> mergeIterable = () ->
57-
new Iterator<Element>() {
58-
// An iterator over the chained state and update iterables gives an accurate hasNext
59-
Iterator<Element> chainedMergeIterator = chainedMerge.iterator();
60-
61-
@Override
62-
public boolean hasNext() {
63-
return chainedMergeIterator.hasNext();
54+
// We can't use the original iterators directly in case they close or become exhausted so save to a Set first.
55+
Set<Element> chainedResult = new HashSet<>(IterableUtils.toList(IterableUtils.chainedIterable(update, state)));
56+
57+
// Iterate over the chained result to merge the elements with each other
58+
// Collect to a set to ensure deduplication
59+
return chainedResult.stream()
60+
.map(e -> {
61+
Element result = e;
62+
// Set up the aggregator for this group based on the schema
63+
ElementAggregator aggregator = new ElementAggregator();
64+
if (schema != null) {
65+
aggregator = schema.getElement(e.getGroup()).getIngestAggregator();
6466
}
65-
66-
@Override
67-
public Element next() {
68-
// When requested do element aggregation on for the current element if required
69-
Element current = chainedMergeIterator.next();
70-
Element result = current;
71-
72-
// Set up the aggregator for this group based on the schema
73-
ElementAggregator aggregator = new ElementAggregator();
74-
if (schema != null) {
75-
aggregator = schema.getElement(current.getGroup()).getIngestAggregator();
67+
// Compare the current element with all others to do a full merge
68+
for (final Element inner : chainedResult) {
69+
// No merge required if not in same group
70+
if (!e.getGroup().equals(inner.getGroup()) || e.equals(inner)) {
71+
continue;
7672
}
7773

78-
// Compare the current element with all others to do a full merge
79-
for (final Element inner : chainedMerge) {
80-
// No merge required if not in same group
81-
if (!current.getGroup().equals(inner.getGroup())) {
82-
continue;
83-
}
84-
85-
if ((current instanceof Entity)
86-
&& (inner instanceof Entity)
87-
&& ((Entity) current).getVertex().equals(((Entity) inner).getVertex())) {
88-
result = aggregator.apply(inner, result);
89-
90-
}
74+
if ((e instanceof Entity)
75+
&& (inner instanceof Entity)
76+
&& ((Entity) e).getVertex().equals(((Entity) inner).getVertex())) {
77+
result = aggregator.apply(inner.shallowClone(), result).shallowClone();
78+
}
9179

92-
if ((current instanceof Edge)
93-
&& (inner instanceof Edge)
94-
&& ((Edge) current).getSource().equals(((Edge) inner).getSource())
95-
&& ((Edge) current).getDestination().equals(((Edge) inner).getDestination())) {
96-
result = aggregator.apply(inner, result);
97-
}
80+
if ((e instanceof Edge)
81+
&& (inner instanceof Edge)
82+
&& ((Edge) e).getSource().equals(((Edge) inner).getSource())
83+
&& ((Edge) e).getDestination().equals(((Edge) inner).getDestination())) {
84+
result = aggregator.apply(inner.shallowClone(), result);
9885
}
99-
return result;
10086
}
101-
};
102-
103-
// Use stream to dedupe the merged result
104-
return () -> StreamSupport.stream(mergeIterable.spliterator(), false)
105-
.distinct()
106-
.iterator();
87+
return result;
88+
})
89+
.collect(Collectors.toSet());
10790
}
10891

10992
}

store-implementation/simple-federated-store/src/main/java/uk/gov/gchq/gaffer/federated/simple/operation/handler/FederatedOperationHandler.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package uk.gov.gchq.gaffer.federated.simple.operation.handler;
1818

19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
1922
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
2023
import uk.gov.gchq.gaffer.federated.simple.FederatedStore;
2124
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
@@ -41,6 +44,7 @@
4144
* if provided operation has output so that it is merged.
4245
*/
4346
public class FederatedOperationHandler<P extends Operation> implements OperationHandler<P> {
47+
private static final Logger LOGGER = LoggerFactory.getLogger(FederatedOperationHandler.class);
4448

4549
/**
4650
* The operation option for the Graph IDs that an operation should be
@@ -59,17 +63,26 @@ public class FederatedOperationHandler<P extends Operation> implements Operation
5963
*/
6064
public static final String OPT_AGGREGATE_ELEMENTS = "federated.aggregateElements";
6165

66+
/**
67+
* A boolean option to specify if to forward the whole operation chain to the sub graph or not.
68+
*/
69+
public static final String OPT_FORWARD_CHAIN = "federated.forwardChain";
70+
6271
@Override
6372
public Object doOperation(final P operation, final Context context, final Store store) throws OperationException {
64-
73+
LOGGER.debug("Running operation: {}", operation);
6574
// Check inside operation chains in case there are operations that don't require running on sub graphs
6675
if (operation instanceof OperationChain) {
6776
Set<Class<? extends Operation>> storeSpecificOps = ((FederatedStore) store).getStoreSpecificOperations();
6877
List<Class<? extends Operation>> chainOps = ((OperationChain<?>) operation).flatten().stream()
6978
.map(Operation::getClass)
7079
.collect(Collectors.toList());
71-
// If all the operations in the chain can be handled by the store then execute them
72-
if (storeSpecificOps.containsAll(chainOps)) {
80+
81+
// If all the operations in the chain can be handled by the store then execute them.
82+
// Or if told not to forward the whole chain process each operation individually.
83+
if (storeSpecificOps.containsAll(chainOps) ||
84+
(!Boolean.parseBoolean(operation.getOption(OPT_FORWARD_CHAIN, "true")))) {
85+
// Use default handler
7386
return new OperationChainHandler<>(store.getOperationChainValidator(), store.getOperationChainOptimisers())
7487
.doOperation((OperationChain<Object>) operation, context, store);
7588
}
@@ -79,7 +92,7 @@ public Object doOperation(final P operation, final Context context, final Store
7992
if (!Collections.disjoint(storeSpecificOps, chainOps)) {
8093
throw new OperationException(
8194
"Chain contains standard Operations alongside federated store specific Operations."
82-
+ " Please submit each type separately.");
95+
+ " Please submit each type separately or set: '" + OPT_FORWARD_CHAIN + "' to: 'false'.");
8396
}
8497
}
8598

@@ -129,6 +142,7 @@ protected List<GraphSerialisable> getGraphsToExecuteOn(final FederatedStore stor
129142
try {
130143
// Get the corresponding graph serialisable
131144
for (final String id : graphIds) {
145+
LOGGER.debug("Will execute on Graph: {}", id);
132146
graphsToExecute.add(store.getGraph(id));
133147
}
134148
} catch (final CacheOperationException e) {

store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreIT.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ void shouldFederateElementsByAggregation() throws StoreException, OperationExcep
6565
final String graphId1 = "graph1";
6666
final String graphId2 = "graph2";
6767

68-
final Graph graph1 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId1, StoreType.MAP);
69-
final Graph graph2 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId2, StoreType.MAP);
68+
final Graph graph1 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId1, StoreType.ACCUMULO);
69+
final Graph graph2 = ModernDatasetUtils.getBlankGraphWithModernSchema(this.getClass(), graphId2, StoreType.ACCUMULO);
7070

7171
final String group = "person";
7272
final String vertex = "1";
@@ -130,7 +130,7 @@ void shouldFederateElementsByAggregation() throws StoreException, OperationExcep
130130
}
131131

132132
@Test
133-
void shouldPreventMixOfFederatedAndCoreOperationsInChain() throws StoreException {
133+
void shouldPreventMixOfFederatedAndCoreOperationsInChainByDefault() throws StoreException {
134134
// Given
135135
FederatedStore federatedStore = new FederatedStore();
136136
federatedStore.initialise("federated", null, new StoreProperties());

store-implementation/simple-federated-store/src/test/java/uk/gov/gchq/gaffer/federated/simple/FederatedStoreTest.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils;
2525
import uk.gov.gchq.gaffer.federated.simple.util.ModernDatasetUtils.StoreType;
2626
import uk.gov.gchq.gaffer.graph.Graph;
27+
import uk.gov.gchq.gaffer.graph.GraphConfig;
2728
import uk.gov.gchq.gaffer.graph.GraphSerialisable;
2829
import uk.gov.gchq.gaffer.store.StoreException;
2930
import uk.gov.gchq.gaffer.store.StoreProperties;
@@ -64,17 +65,35 @@ void shouldNotInitialiseWithSchema() {
6465
}
6566

6667
@Test
67-
void shouldSetDefaultGraphIds() throws StoreException {
68+
void shouldNotSetDefaultGraphIdsIfGraphDoesNotExist() throws StoreException {
6869
final String graphId = "federated";
6970
final String graphId1 = "graph1";
7071
final String graphId2 = "graph2";
7172
FederatedStoreProperties properties = new FederatedStoreProperties();
73+
// Set the defaults to graphs not available to the store
7274
properties.set(PROP_DEFAULT_GRAPH_IDS, graphId1 + "," + graphId2);
7375

7476
FederatedStore store = new FederatedStore();
7577
store.initialise(graphId, null, properties);
7678

77-
assertThat(store.getDefaultGraphIds()).containsExactlyInAnyOrder(graphId1, graphId2);
79+
assertThat(store.getDefaultGraphIds()).isEmpty();
80+
}
81+
82+
@Test
83+
void shouldSetDefaultGraphIdsIfGraphsExist() throws StoreException {
84+
final String graphId = "federated";
85+
final String graphId1 = "graph1";
86+
final String graphId2 = "graph2";
87+
final FederatedStore store = new FederatedStore();
88+
final FederatedStoreProperties properties = new FederatedStoreProperties();
89+
90+
// Set the defaults to graphs and make sure to add them
91+
properties.set(PROP_DEFAULT_GRAPH_IDS, graphId1 + "," + graphId2);
92+
store.initialise(graphId, null, properties);
93+
store.addGraph(new GraphSerialisable(new GraphConfig(graphId1), new Schema(), new StoreProperties()));
94+
store.addGraph(new GraphSerialisable(new GraphConfig(graphId2), new Schema(), new StoreProperties()));
95+
96+
assertThat(store.getDefaultGraphIds()).containsExactly(graphId1, graphId2);
7897
}
7998

8099
@Test

store-implementation/simple-federated-store/src/test/resources/modern/schema/schema.json

+4-4
Original file line numberDiff line numberDiff line change
@@ -65,25 +65,25 @@
6565
"property.string": {
6666
"class": "java.lang.String",
6767
"aggregateFunction": {
68-
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
68+
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Max"
6969
}
7070
},
7171
"property.integer": {
7272
"class": "java.lang.Integer",
7373
"aggregateFunction": {
74-
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
74+
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
7575
}
7676
},
7777
"property.double": {
7878
"class": "java.lang.Double",
7979
"aggregateFunction": {
80-
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
80+
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
8181
}
8282
},
8383
"property.float": {
8484
"class": "java.lang.Float",
8585
"aggregateFunction": {
86-
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.First"
86+
"class": "uk.gov.gchq.koryphe.impl.binaryoperator.Sum"
8787
}
8888
}
8989
}

0 commit comments

Comments
 (0)