Skip to content

Commit d55cf66

Browse files
authored
Query Partitioner (#60)
1 parent c1d2206 commit d55cf66

35 files changed

+1091
-88
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>com.yahoo.bullet</groupId>
66
<artifactId>bullet-core</artifactId>
7-
<version>0.5.3-SNAPSHOT</version>
7+
<version>0.6.0-SNAPSHOT</version>
88
<packaging>jar</packaging>
99
<name>bullet-core</name>
1010

@@ -36,7 +36,7 @@
3636
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3737
<maven.compiler.source>1.8</maven.compiler.source>
3838
<maven.compiler.target>1.8</maven.compiler.target>
39-
<bullet.record.version>0.2.1</bullet.record.version>
39+
<bullet.record.version>0.2.2</bullet.record.version>
4040
<sketches.version>0.9.1</sketches.version>
4141
</properties>
4242

src/main/java/com/yahoo/bullet/common/BulletConfig.java

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.commons.lang3.tuple.ImmutablePair;
1414
import org.apache.commons.lang3.tuple.Pair;
1515

16+
import java.lang.reflect.Constructor;
1617
import java.util.ArrayList;
1718
import java.util.Collections;
1819
import java.util.HashMap;
@@ -65,6 +66,11 @@ public class BulletConfig extends Config {
6566

6667
public static final String RECORD_PROVIDER_CLASS_NAME = "bullet.record.provider.class.name";
6768

69+
public static final String QUERY_PARTITIONER_ENABLE = "bullet.query.partitioner.enable";
70+
public static final String QUERY_PARTITIONER_CLASS_NAME = "bullet.query.partitioner.class.name";
71+
public static final String EQUALITY_PARTITIONER_FIELDS = "bullet.query.partitioner.equality.fields";
72+
public static final String EQUALITY_PARTITIONER_DELIMITER = "bullet.query.partitioner.equality.delimiter";
73+
6874
// Defaults
6975
public static final long DEFAULT_QUERY_DURATION = (long) Double.POSITIVE_INFINITY;
7076
public static final long DEFAULT_QUERY_MAX_DURATION = (long) Double.POSITIVE_INFINITY;
@@ -134,9 +140,15 @@ public class BulletConfig extends Config {
134140

135141
public static final String DEFAULT_RECORD_PROVIDER_CLASS_NAME = "com.yahoo.bullet.record.AvroBulletRecordProvider";
136142

143+
public static final boolean DEFAULT_QUERY_PARTITIONER_ENABLE = false;
144+
public static final String DEFAULT_QUERY_PARTITIONER_CLASS_NAME = "com.yahoo.bullet.querying.partitioning.SimpleEqualityPartitioner";
145+
public static final String DEFAULT_EQUALITY_PARTITIONER_DELIMITER = "|";
146+
public static final int MAXIMUM_EQUALITY_FIELDS = 10;
147+
137148
// Validator definitions for the configs in this class.
138149
// This can be static since VALIDATOR itself does not change for different values for fields in the BulletConfig.
139150
private static final Validator VALIDATOR = new Validator();
151+
140152
static {
141153
VALIDATOR.define(QUERY_DEFAULT_DURATION)
142154
.defaultTo(DEFAULT_QUERY_DURATION)
@@ -258,29 +270,50 @@ public class BulletConfig extends Config {
258270
.checkIf(Validator.isIn(Context.QUERY_PROCESSING.name(), Context.QUERY_SUBMISSION.name()));
259271
VALIDATOR.define(PUBSUB_CLASS_NAME)
260272
.defaultTo(DEFAULT_PUBSUB_CLASS_NAME)
261-
.checkIf(Validator::isString);
273+
.checkIf(Validator::isClassName);
262274

263275
VALIDATOR.define(RECORD_PROVIDER_CLASS_NAME)
264-
.defaultTo(DEFAULT_RECORD_PROVIDER_CLASS_NAME)
265-
.checkIf(Validator::isString);
276+
.defaultTo(DEFAULT_RECORD_PROVIDER_CLASS_NAME)
277+
.checkIf(Validator::isClassName);
278+
279+
VALIDATOR.define(QUERY_PARTITIONER_ENABLE)
280+
.defaultTo(DEFAULT_QUERY_PARTITIONER_ENABLE)
281+
.checkIf(Validator::isBoolean);
282+
VALIDATOR.define(QUERY_PARTITIONER_CLASS_NAME)
283+
.defaultTo(DEFAULT_QUERY_PARTITIONER_CLASS_NAME)
284+
.checkIf(Validator::isClassName);
285+
VALIDATOR.define(EQUALITY_PARTITIONER_FIELDS)
286+
.checkIf(Validator.isListOfType(String.class))
287+
.checkIf(Validator.hasMaximumListSize(MAXIMUM_EQUALITY_FIELDS))
288+
.unless(Validator::isNull)
289+
.orFail();
290+
VALIDATOR.define(EQUALITY_PARTITIONER_DELIMITER)
291+
.defaultTo(DEFAULT_EQUALITY_PARTITIONER_DELIMITER)
292+
.checkIf(Validator::isString);
293+
266294

267295
VALIDATOR.relate("Max should be >= default", QUERY_MAX_DURATION, QUERY_DEFAULT_DURATION)
268296
.checkIf(Validator::isGreaterOrEqual);
269297
VALIDATOR.relate("Max should be >= default", AGGREGATION_MAX_SIZE, AGGREGATION_DEFAULT_SIZE)
270-
.checkIf(Validator::isGreaterOrEqual);
298+
.checkIf(Validator::isGreaterOrEqual);
271299
VALIDATOR.relate("Raw max should be <= Aggregation max", AGGREGATION_MAX_SIZE, RAW_AGGREGATION_MAX_SIZE)
272-
.checkIf(Validator::isGreaterOrEqual);
300+
.checkIf(Validator::isGreaterOrEqual);
273301
VALIDATOR.relate("Group max should be <= Aggregation max", AGGREGATION_MAX_SIZE, GROUP_AGGREGATION_MAX_SIZE)
274-
.checkIf(Validator::isGreaterOrEqual);
302+
.checkIf(Validator::isGreaterOrEqual);
275303
VALIDATOR.relate("Distribution points should be <= Aggregation max", AGGREGATION_MAX_SIZE, DISTRIBUTION_AGGREGATION_MAX_POINTS)
276-
.checkIf(Validator::isGreaterOrEqual);
304+
.checkIf(Validator::isGreaterOrEqual);
277305
VALIDATOR.relate("Max duration should be >= min window emit interval", QUERY_MAX_DURATION, WINDOW_MIN_EMIT_EVERY)
278-
.checkIf(Validator::isGreaterOrEqual);
306+
.checkIf(Validator::isGreaterOrEqual);
279307
VALIDATOR.relate("If metadata is enabled, keys should be defined", RESULT_METADATA_ENABLE, RESULT_METADATA_METRICS)
280308
.checkIf(BulletConfig::isMetadataConfigured);
281309
VALIDATOR.relate("If metadata is disabled, keys should not be defined", RESULT_METADATA_ENABLE, RESULT_METADATA_METRICS)
282310
.checkIf(BulletConfig::isMetadataNecessary)
283311
.orElseUse(false, Collections.emptyMap());
312+
313+
VALIDATOR.evaluate("If the equality partitioner is used, the partitioner fields should be defined",
314+
QUERY_PARTITIONER_ENABLE, QUERY_PARTITIONER_CLASS_NAME, EQUALITY_PARTITIONER_FIELDS)
315+
.checkIf(BulletConfig::areEqualityPartitionerFieldsDefined)
316+
.orFail();
284317
}
285318

286319
// Members
@@ -350,6 +383,28 @@ public void merge(Config other) {
350383
validate();
351384
}
352385

386+
/**
387+
* This method loads a given class name (stored in this config) with the class name key and creates an instance of
388+
* it by using a constructor that has a single argument for a {@link BulletConfig}. It then passes in this config
389+
* and returns the constructed instance.
390+
*
391+
* @param classNameKey The name of the key which stores the class name to load in this config.
392+
* @param <S> The type of the class.
393+
* @return A created instance of this class.
394+
* @throws RuntimeException if there were issues creating an instance. It wraps the real exception.
395+
*/
396+
@SuppressWarnings("unchecked")
397+
public <S> S loadConfiguredClass(String classNameKey) {
398+
try {
399+
String name = (String) this.get(classNameKey);
400+
Class<? extends S> className = (Class<? extends S>) Class.forName(name);
401+
Constructor<? extends S> constructor = className.getConstructor(BulletConfig.class);
402+
return constructor.newInstance(this);
403+
} catch (Exception e) {
404+
throw new RuntimeException(e);
405+
}
406+
}
407+
353408
@SuppressWarnings("unchecked")
354409
private static Object mapifyMetadata(Object metadata) {
355410
List<Map> entries = (List<Map>) metadata;
@@ -421,4 +476,18 @@ private static List<Map<String, String>> makeMetadata(Pair<Concept, String>... e
421476
}
422477
return metadataList;
423478
}
479+
480+
@SuppressWarnings("unchecked")
481+
private static boolean areEqualityPartitionerFieldsDefined(List<Object> fields) {
482+
boolean enabled = (Boolean) fields.get(0);
483+
if (!enabled) {
484+
return true;
485+
}
486+
String className = fields.get(1).toString();
487+
if (!DEFAULT_QUERY_PARTITIONER_CLASS_NAME.equals(className)) {
488+
return true;
489+
}
490+
List<String> partitionFields = ((List<String>) fields.get(2));
491+
return partitionFields != null && !partitionFields.isEmpty();
492+
}
424493
}

src/main/java/com/yahoo/bullet/common/Validator.java

Lines changed: 172 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.function.Predicate;
2323
import java.util.stream.Collectors;
2424

25+
import static java.util.Arrays.asList;
26+
2527
/**
2628
* This class validates instances of {@link BulletConfig}. Use {@link Validator.Entry} to define
2729
* fields and {@link Validator.Relationship} to define relationships between them.
@@ -414,7 +416,7 @@ public State evaluate(String description, String... keys) {
414416
if (!missingKeys.isEmpty()) {
415417
throw new NullPointerException("You must evaluate entries for " + missingKeys.stream().collect(Collectors.joining(COMMA)));
416418
}
417-
State state = new State(description, Arrays.asList(keys));
419+
State state = new State(description, asList(keys));
418420
states.add(state);
419421
return state;
420422
}
@@ -505,6 +507,36 @@ public static boolean isNotNull(Object value) {
505507
return value != null;
506508
}
507509

510+
/**
511+
* Checks to see if the value is null or not.
512+
*
513+
* @param value The object to check.
514+
* @return A boolean denoting if the value was null.
515+
*/
516+
public static boolean isNull(Object value) {
517+
return value == null;
518+
}
519+
520+
/**
521+
* Checks to see if the value is true or not.
522+
*
523+
* @param value The object to check.
524+
* @return A boolean denoting if the value was true.
525+
*/
526+
public static boolean isTrue(Object value) {
527+
return isBoolean(value) && ((Boolean) value);
528+
}
529+
530+
/**
531+
* Checks to see if the value is false or not.
532+
*
533+
* @param value The object to check.
534+
* @return A boolean denoting if the value was false.
535+
*/
536+
public static boolean isFalse(Object value) {
537+
return isBoolean(value) && !((Boolean) value);
538+
}
539+
508540
/**
509541
* Checks to see if the value is of the provided type or not.
510542
*
@@ -630,6 +662,21 @@ public static boolean isNonEmptyList(Object value) {
630662
return isType(value, List.class) && !((List) value).isEmpty();
631663
}
632664

665+
/**
666+
* Checks to see if the given object refers to a class name that can be loaded.
667+
*
668+
* @param value The object to check if it is a class name.
669+
* @return A boolean denoting whether the given value was the name of a class.
670+
*/
671+
public static boolean isClassName(Object value) {
672+
try {
673+
Class.forName((String) value);
674+
} catch (Exception e) {
675+
return false;
676+
}
677+
return true;
678+
}
679+
633680
// Unary Predicate Generators
634681

635682
/**
@@ -642,10 +689,32 @@ public static boolean isNonEmptyList(Object value) {
642689
@SuppressWarnings("unchecked")
643690
public static <T> Predicate<Object> isIn(T... values) {
644691
Objects.requireNonNull(values);
645-
Set<T> set = new HashSet<>(Arrays.asList(values));
692+
Set<T> set = new HashSet<>(asList(values));
646693
return set::contains;
647694
}
648695

696+
/**
697+
* Creates a {@link Predicate} that checks to see if the given object is a {@link List} and has at least n items
698+
* in it. Note that the object must be a List even if n is 0.
699+
*
700+
* @param n The minimum number of items that can be in the List.
701+
* @return A boolean denoting if the {@link List} has a size of at least the given parameter.
702+
*/
703+
public static Predicate<Object> hasMinimumListSize(int n) {
704+
return o -> isList(o) && ((List) o).size() >= n;
705+
}
706+
707+
/**
708+
* Creates a {@link Predicate} that checks to see if the given object is a {@link List} and has at most n items
709+
* in it. Note that the object must be a List even if n is 0.
710+
*
711+
* @param n The maximum number of items that can be in the List.
712+
* @return A boolean denoting if the {@link List} has a size of at most the given parameter.
713+
*/
714+
public static Predicate<Object> hasMaximumListSize(int n) {
715+
return o -> isList(o) && ((List) o).size() <= n;
716+
}
717+
649718
/**
650719
* Creates a {@link Predicate} that checks to see if the given object is a {@link Number} in the proper range.
651720
*
@@ -663,7 +732,66 @@ public static <T extends Number> Predicate<Object> isInRange(T min, T max) {
663732
return o -> isNumber(o) && ((T) o).doubleValue() >= minimum && ((T) o).doubleValue() <= maximum;
664733
}
665734

666-
// Binary Predicates.
735+
/**
736+
* Creates a {@link Predicate} that checks to see if the given object is a non-empty {@link List} of the
737+
* given type.
738+
*
739+
* @param type The class of the contents of the list to check for.
740+
* @param <T> The type of the content in the list.
741+
* @return A Predicate that checks tor see if the value was a non-empty List of the given type.
742+
*/
743+
@SuppressWarnings("unchecked")
744+
public static <T> Predicate<Object> isListOfType(Class<T> type) {
745+
return value -> {
746+
if (!isNonEmptyList(value)) {
747+
return false;
748+
}
749+
List list = (List) value;
750+
return list.stream().allMatch(i -> isType(i, type));
751+
};
752+
}
753+
754+
/**
755+
* Creates a {@link Predicate} that is true if and only if all the provided predicates are true and false otherwise.
756+
*
757+
* @param predicates The predicates to be ANDed.
758+
* @return A predicate that is the AND of all the given predicates.
759+
*/
760+
@SafeVarargs
761+
public static Predicate<Object> and(Predicate<Object>... predicates) {
762+
Predicate<Object> anded = UNARY_IDENTITY;
763+
for (Predicate<Object> predicate : predicates) {
764+
anded = anded.and(predicate);
765+
}
766+
return anded;
767+
}
768+
769+
/**
770+
* Creates a {@link Predicate} that is true if any of the provided predicates are true and false otherwise.
771+
*
772+
* @param predicates The predicates to be ORed.
773+
* @return A predicate that is the OR of all the given predicates.
774+
*/
775+
@SafeVarargs
776+
public static Predicate<Object> or(Predicate<Object>... predicates) {
777+
Predicate<Object> ored = not(UNARY_IDENTITY);
778+
for (Predicate<Object> predicate : predicates) {
779+
ored = ored.or(predicate);
780+
}
781+
return ored;
782+
}
783+
784+
/**
785+
* Creates a {@link Predicate} that is true if the given predicate is false and false if the given predicate is true.
786+
*
787+
* @param predicate The predicates to be negated.
788+
* @return A predicate that is the NOT of the given predicate.
789+
*/
790+
public static Predicate<Object> not(Predicate<Object> predicate) {
791+
return predicate.negate();
792+
}
793+
794+
// Binary Predicates
667795

668796
/**
669797
* Checks to see if the first numeric object is greater than or equal to the second numeric object.
@@ -676,7 +804,20 @@ public static boolean isGreaterOrEqual(Object first, Object second) {
676804
return ((Number) first).doubleValue() >= ((Number) second).doubleValue();
677805
}
678806

679-
// Binary Predicate makers.
807+
/**
808+
* Checks to see if the first boolean object implies the second boolean object. In other words, does the first imply
809+
* the second. If first is true and second is false, this check is false. Otherwise, it is true.
810+
*
811+
* @param first The first boolean object.
812+
* @param second The second boolean object.
813+
* @return A boolean denoting whether the second is implied by the first.
814+
*/
815+
public static boolean isImplied(Object first, Object second) {
816+
// first -> second === ~first or second
817+
return !((Boolean) first) || ((Boolean) second);
818+
}
819+
820+
// Binary Predicate Generators.
680821

681822
/**
682823
* Returns a {@link BiPredicate} that checks to see if the first argument is at least the given times
@@ -688,4 +829,31 @@ public static boolean isGreaterOrEqual(Object first, Object second) {
688829
public static BiPredicate<Object, Object> isAtleastNTimes(double n) {
689830
return (greater, smaller) -> ((Number) greater).doubleValue() >= n * ((Number) smaller).doubleValue();
690831
}
832+
833+
/**
834+
* Returns a {@link BiPredicate} that checks to see if the first boolean argument implies the second
835+
* {@link Predicate}. In other words, if the first argument is true and the second argument evaluates to false
836+
* on the given {@link Predicate}, this check is false. Else, it is true.
837+
*
838+
* @param predicate The {@link Predicate} to test the second argument to the returned {@link BiPredicate} with.
839+
* @return The created {@link BiPredicate}.
840+
*/
841+
public static BiPredicate<Object, Object> ifTrueThenCheck(Predicate<Object> predicate) {
842+
// Can use isImplied(bool, predicate.test(object)) but is not lazy anymore
843+
return (bool, object) -> !((Boolean) bool) || predicate.test(object);
844+
}
845+
846+
/**
847+
* Returns a {@link BiPredicate} that checks to see if the first {@link Predicate} implies the second. In other
848+
* words, if the first one evaluates to true on the first object in the BiPredicate and the second evaluates to
849+
* false on the second object, this check is false. Else, it is true. Analogous to
850+
* {@link Validator#isImplied(Object, Object)}.
851+
*
852+
* @param firstTest The first {@link Predicate}.
853+
* @param secondTest The second {@link Predicate}.
854+
* @return The created {@link BiPredicate}.
855+
*/
856+
public static BiPredicate<Object, Object> isImpliedBy(Predicate<Object> firstTest, Predicate<Object> secondTest) {
857+
return (first, second) -> !(firstTest.test(first)) || secondTest.test(second);
858+
}
691859
}

0 commit comments

Comments
 (0)