1+ /*
2+ * Copyright 2017, Yahoo Inc.
3+ * Licensed under the terms of the Apache License, Version 2.0.
4+ * See the LICENSE file associated with the project for terms.
5+ */
16package com .yahoo .bullet .pubsub ;
27
38import com .yahoo .bullet .BulletConfig ;
49
5- import java .io .Serializable ;
610import java .lang .reflect .Constructor ;
711import java .util .List ;
12+ import java .util .Objects ;
813
914/**
1015 * Notation: Partition is a unit of parallelism in the Pub/Sub queue.
1116 *
1217 * Implementations of PubSub should take in a {@link BulletConfig} and use the information to wire up and return
1318 * Publishers and Subscribers.
1419 */
15- public abstract class PubSub implements Serializable {
20+ public abstract class PubSub {
1621 /**
1722 * The context determines how the {@link Publisher} and {@link Subscriber} returned by PubSub behave. For example,
1823 * If the Context is {@link Context#QUERY_SUBMISSION}:
@@ -28,49 +33,60 @@ public enum Context {
2833 }
2934
3035 protected Context context ;
36+ protected BulletConfig config ;
3137
3238 /**
3339 * Instantiate a PubSub using parameters from {@link BulletConfig}.
3440 *
3541 * @param config The {@link BulletConfig} containing all required PubSub parameters.
42+ * @throws PubSubException if the context name is not present or cannot be parsed.
3643 */
37- public PubSub (BulletConfig config ) {
38- context = Context .valueOf (config .get (BulletConfig .PUBSUB_CONTEXT_NAME ).toString ());
44+ public PubSub (BulletConfig config ) throws PubSubException {
45+ this .config = config ;
46+ try {
47+ this .context = Context .valueOf (getRequiredConfig (String .class , BulletConfig .PUBSUB_CONTEXT_NAME ));
48+ } catch (RuntimeException e ) {
49+ throw new PubSubException ("Cannot create PubSub" , e );
50+ }
3951 }
4052
4153 /**
4254 * Get a {@link Publisher} instance wired to write to all allocated partitions in the appropriate queue (See
4355 * {@link PubSub#context}).
4456 *
4557 * @return {@link Publisher} wired as required.
58+ * @throws PubSubException if the Publisher could not be created.
4659 */
47- public abstract Publisher getPublisher ();
60+ public abstract Publisher getPublisher () throws PubSubException ;
4861
4962 /**
5063 * Get a list of n {@link Publisher} instances with the allocated partitions in the appropriate queue
5164 * (See {@link PubSub#context}) split as evenly as possible among them.
5265 *
5366 * @param n The number of Publishers requested.
5467 * @return The {@link List} of n Publishers wired as required.
68+ * @throws PubSubException if Publishers could not be created.
5569 */
56- public abstract List <Publisher > getPublishers (int n );
70+ public abstract List <Publisher > getPublishers (int n ) throws PubSubException ;
5771
5872 /**
5973 * Get a {@link Subscriber} instance wired to read from all allocated partitions in the appropriate queue (See
6074 * {@link PubSub#context}).
6175 *
6276 * @return {@link Subscriber} wired as required.
77+ * @throws PubSubException if the Subscriber could not be created.
6378 */
64- public abstract Subscriber getSubscriber ();
79+ public abstract Subscriber getSubscriber () throws PubSubException ;
6580
6681 /**
6782 * Get a list of n {@link Subscriber} instances with allocated partitions from the appropriate queue
6883 * (See {@link PubSub#context}) split as evenly as possible among them.
6984 *
7085 * @param n The number of Subscribers requested.
7186 * @return The {@link List} of n Subscribers wired as required.
87+ * @throws PubSubException if Subscribers could not be created.
7288 */
73- public abstract List <Subscriber > getSubscribers (int n );
89+ public abstract List <Subscriber > getSubscribers (int n ) throws PubSubException ;
7490
7591 /**
7692 * Create a PubSub instance using the class specified in the config file.
@@ -89,4 +105,21 @@ public static PubSub from(BulletConfig config) throws PubSubException {
89105 throw new PubSubException ("Cannot create PubSub instance." , e );
90106 }
91107 }
108+
109+ /**
110+ * A method to get a required configuration of a particular type.
111+ *
112+ * @param name The name of the required configuration.
113+ * @param tClass The class of the required configuration.
114+ * @param <T> The type to cast the configuration to. Inferred from tClass.
115+ * @return The extracted configuration of type T.
116+ * @throws PubSubException if the configuration is missing or cannot be cast to type T.
117+ */
118+ public <T > T getRequiredConfig (Class <T > tClass , String name ) throws PubSubException {
119+ try {
120+ return (T ) Objects .requireNonNull (config .get (name ));
121+ } catch (Exception e ) {
122+ throw PubSubException .forArgument (name , e );
123+ }
124+ }
92125}
0 commit comments