18
18
19
19
import com .google .common .collect .Sets ;
20
20
import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
21
+
21
22
import org .apache .accumulo .core .client .AccumuloSecurityException ;
22
23
import org .apache .accumulo .core .client .BatchWriter ;
23
24
import org .apache .accumulo .core .client .ClientConfiguration ;
42
43
import uk .gov .gchq .gaffer .accumulostore .key .exception .AccumuloElementConversionException ;
43
44
import uk .gov .gchq .gaffer .accumulostore .key .exception .IteratorSettingException ;
44
45
import uk .gov .gchq .gaffer .accumulostore .operation .handler .AddElementsHandler ;
46
+ import uk .gov .gchq .gaffer .accumulostore .operation .handler .DeleteElementsHandler ;
45
47
import uk .gov .gchq .gaffer .accumulostore .operation .handler .GenerateSplitPointsFromSampleHandler ;
46
48
import uk .gov .gchq .gaffer .accumulostore .operation .handler .GetAdjacentIdsHandler ;
47
49
import uk .gov .gchq .gaffer .accumulostore .operation .handler .GetAllElementsHandler ;
105
107
import uk .gov .gchq .koryphe .iterable .ChainedIterable ;
106
108
107
109
import java .nio .charset .StandardCharsets ;
110
+ import java .util .Arrays ;
108
111
import java .util .Collection ;
109
112
import java .util .Collections ;
110
113
import java .util .List ;
140
143
*/
141
144
public class AccumuloStore extends Store {
142
145
146
+ private static final String MUTATION_ERROR = "Failed to create an accumulo key mutation" ;
143
147
public static final Set <StoreTrait > TRAITS = Collections .unmodifiableSet (Sets .newHashSet (
144
148
ORDERED ,
145
149
VISIBILITY ,
@@ -415,8 +419,7 @@ protected OutputOperationHandler<GetTraits, Set<StoreTrait>> getGetTraitsHandler
415
419
416
420
@ Override
417
421
protected OperationHandler <? extends DeleteElements > getDeleteElementsHandler () {
418
- // TODO: implement accumulo delete handler logic
419
- return null ;
422
+ return new DeleteElementsHandler ();
420
423
}
421
424
422
425
/**
@@ -462,7 +465,7 @@ protected void insertGraphElements(final Iterable<? extends Element> elements) t
462
465
try {
463
466
writer .addMutation (m );
464
467
} catch (final MutationsRejectedException e ) {
465
- LOGGER .error ("Failed to create an accumulo key mutation" );
468
+ LOGGER .error (MUTATION_ERROR );
466
469
continue ;
467
470
}
468
471
// If the GraphElement is a Vertex then there will only be 1 key,
@@ -478,7 +481,7 @@ protected void insertGraphElements(final Iterable<? extends Element> elements) t
478
481
try {
479
482
writer .addMutation (m2 );
480
483
} catch (final MutationsRejectedException e ) {
481
- LOGGER .error ("Failed to create an accumulo key mutation" );
484
+ LOGGER .error (MUTATION_ERROR );
482
485
}
483
486
}
484
487
}
@@ -492,6 +495,53 @@ protected void insertGraphElements(final Iterable<? extends Element> elements) t
492
495
}
493
496
}
494
497
498
+ /**
499
+ * Method to delete {@link Element}s from Accumulo.
500
+ *
501
+ * @param elements The elements to be deleted.
502
+ * @throws StoreException If there is a failure to delete elements.
503
+ */
504
+ public void deleteElements (final Iterable <? extends Element > elements ) throws StoreException {
505
+ deleteGraphElements (elements );
506
+ }
507
+
508
+ protected void deleteGraphElements (final Iterable <? extends Element > elements ) throws StoreException {
509
+ // Create BatchWriter
510
+ // Loop through elements, convert to mutations, and add to BatchWriter
511
+ // The BatchWriter takes care of batching them up, sending them without
512
+ // too high a latency, etc.
513
+ if (elements == null ) {
514
+ throw new GafferRuntimeException ("Could not find any elements to delete from graph." , Status .BAD_REQUEST );
515
+ }
516
+
517
+ try (BatchWriter writer = TableUtils .createBatchWriter (this )) {
518
+ for (final Element element : elements ) {
519
+ final Pair <Key , Key > keys ;
520
+ try {
521
+ keys = getKeyPackage ().getKeyConverter ().getKeysFromElement (element );
522
+ } catch (final AccumuloElementConversionException e ) {
523
+ LOGGER .error (FAILED_TO_CREATE_AN_ACCUMULO_FROM_ELEMENT_OF_TYPE_WHEN_TRYING_TO_INSERT_ELEMENTS , "key" , element .getGroup ());
524
+ continue ;
525
+ }
526
+
527
+ for (final Key key : Arrays .asList (keys .getFirst (), keys .getSecond ())) {
528
+ if (nonNull (key )) {
529
+ final Mutation m = new Mutation (key .getRow ());
530
+ m .putDelete (key .getColumnFamily (), key .getColumnQualifier (), key .getTimestamp ());
531
+
532
+ try {
533
+ writer .addMutation (m );
534
+ } catch (final MutationsRejectedException e ) {
535
+ LOGGER .error (MUTATION_ERROR );
536
+ }
537
+ }
538
+ }
539
+ }
540
+ } catch (final MutationsRejectedException e ) {
541
+ LOGGER .warn ("Accumulo batch writer failed to close" , e );
542
+ }
543
+ }
544
+
495
545
/**
496
546
* Gets the {@link uk.gov.gchq.gaffer.accumulostore.key.AccumuloKeyPackage} in use by
497
547
* this AccumuloStore.
0 commit comments