Skip to content

Commit 3ff0bd1

Browse files
authored
PubSubMessageSerDe for query pointcuts (#99)
1 parent 423cced commit 3ff0bd1

22 files changed

+442
-126
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>com.yahoo.bullet</groupId>
66
<artifactId>bullet-core</artifactId>
7-
<version>1.3.3-SNAPSHOT</version>
7+
<version>1.4.0-SNAPSHOT</version>
88
<packaging>jar</packaging>
99
<name>bullet-core</name>
1010
<description>

src/main/java/com/yahoo/bullet/common/BulletConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class BulletConfig extends Config {
6363

6464
public static final String PUBSUB_CONTEXT_NAME = "bullet.pubsub.context.name";
6565
public static final String PUBSUB_CLASS_NAME = "bullet.pubsub.class.name";
66+
public static final String PUBSUB_MESSAGE_SERDE_CLASS_NAME = "bullet.pubsub.message.serde.class.name";
6667

6768
public static final String STORAGE_CLASS_NAME = "bullet.storage.class.name";
6869

@@ -142,6 +143,7 @@ public class BulletConfig extends Config {
142143

143144
public static final String DEFAULT_PUBSUB_CONTEXT_NAME = Context.QUERY_PROCESSING.name();
144145
public static final String DEFAULT_PUBSUB_CLASS_NAME = "com.yahoo.bullet.pubsub.MockPubSub";
146+
public static final String DEFAULT_PUBSUB_MESSAGE_SERDE_CLASS_NAME = "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe";
145147

146148
public static final String DEFAULT_RECORD_PROVIDER_CLASS_NAME = "com.yahoo.bullet.record.avro.TypedAvroBulletRecordProvider";
147149

@@ -278,6 +280,9 @@ public class BulletConfig extends Config {
278280
VALIDATOR.define(PUBSUB_CLASS_NAME)
279281
.defaultTo(DEFAULT_PUBSUB_CLASS_NAME)
280282
.checkIf(Validator::isClassName);
283+
VALIDATOR.define(PUBSUB_MESSAGE_SERDE_CLASS_NAME)
284+
.defaultTo(DEFAULT_PUBSUB_MESSAGE_SERDE_CLASS_NAME)
285+
.checkIf(Validator::isClassName);
281286

282287
VALIDATOR.define(RECORD_PROVIDER_CLASS_NAME)
283288
.defaultTo(DEFAULT_RECORD_PROVIDER_CLASS_NAME)
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2021 Yahoo Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.pubsub;
7+
8+
import com.yahoo.bullet.common.BulletConfig;
9+
10+
public class IdentityPubSubMessageSerDe extends PubSubMessageSerDe {
11+
private static final long serialVersionUID = -1709000962888195381L;
12+
13+
/**
14+
* Constructor.
15+
*
16+
* @param config The {@link BulletConfig} to configure this class.
17+
*/
18+
public IdentityPubSubMessageSerDe(BulletConfig config) {
19+
super(config);
20+
}
21+
22+
@Override
23+
public PubSubMessage toMessage(PubSubMessage message) {
24+
return message;
25+
}
26+
27+
@Override
28+
public PubSubMessage fromMessage(PubSubMessage message) {
29+
return message;
30+
}
31+
}

src/main/java/com/yahoo/bullet/pubsub/Metadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Metadata copy() {
5858
}
5959

6060
/**
61-
* Set a serializable content for this metadata.
61+
* Set a {@link Serializable} content for this metadata.
6262
*
6363
* @param content The content for this metadata.
6464
*/

src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
*/
66
package com.yahoo.bullet.pubsub;
77

8+
import com.google.gson.Gson;
9+
import com.yahoo.bullet.common.SerializerDeserializer;
810
import com.yahoo.bullet.pubsub.Metadata.Signal;
11+
import com.yahoo.bullet.query.Query;
912
import com.yahoo.bullet.result.JSONFormatter;
1013
import lombok.Getter;
1114
import lombok.Setter;
1215

1316
import java.io.Serializable;
1417
import java.nio.charset.Charset;
1518
import java.nio.charset.StandardCharsets;
19+
import java.util.Base64;
1620
import java.util.Objects;
1721

1822
/**
@@ -21,11 +25,12 @@
2125
*/
2226
@Getter
2327
public class PubSubMessage implements Serializable, JSONFormatter {
24-
private static final long serialVersionUID = -5068189058170874687L;
2528
public static final Charset CHARSET = StandardCharsets.UTF_8;
29+
private static final long serialVersionUID = 5096747716667851530L;
2630

2731
private String id;
28-
private byte[] content;
32+
// Serializable enforced through the constructors, getter and setter. Is Object so GSON can reify an instance.
33+
private Object content;
2934
@Setter
3035
private Metadata metadata;
3136

@@ -52,50 +57,29 @@ public PubSubMessage(String id, Signal signal) {
5257
* @param id The ID associated with the message.
5358
* @param content The content of the message.
5459
*/
55-
public PubSubMessage(String id, byte[] content) {
60+
public PubSubMessage(String id, Serializable content) {
5661
this(id, content, (Metadata) null);
5762
}
5863

59-
/**
60-
* Constructor for a message having only content as a String.
61-
*
62-
* @param id The ID associated with the message.
63-
* @param content The content of the message as a String.
64-
*/
65-
public PubSubMessage(String id, String content) {
66-
this(id, content, null);
67-
}
68-
6964
/**
7065
* Constructor for a message having content and a {@link Metadata.Signal}.
7166
*
7267
* @param id The ID associated with the message.
7368
* @param content The content of the message.
7469
* @param signal The Signal to be sent with the message.
7570
*/
76-
public PubSubMessage(String id, byte[] content, Signal signal) {
71+
public PubSubMessage(String id, Serializable content, Signal signal) {
7772
this(id, content, new Metadata(signal, null));
7873
}
7974

80-
/**
81-
* Constructor for a message having content as a String and {@link Metadata}.
82-
*
83-
* @param id The ID associated with the message.
84-
* @param content The content of the message as a String.
85-
* @param metadata The Metadata associated with the message.
86-
*/
87-
public PubSubMessage(String id, String content, Metadata metadata) {
88-
this(id, content == null ? null : content.getBytes(CHARSET), metadata);
89-
}
90-
9175
/**
9276
* Constructor for a message having content and {@link Metadata}.
9377
*
9478
* @param id The ID associated with the message.
9579
* @param content The content of the message.
9680
* @param metadata The Metadata associated with the message.
9781
*/
98-
public PubSubMessage(String id, byte[] content, Metadata metadata) {
82+
public PubSubMessage(String id, Serializable content, Metadata metadata) {
9983
this.id = Objects.requireNonNull(id, "ID cannot be null");
10084
this.content = content;
10185
this.metadata = metadata;
@@ -138,14 +122,52 @@ public boolean hasSignal() {
138122
return hasMetadata() && metadata.hasSignal();
139123
}
140124

125+
/**
126+
* Returns the {@link Serializable} content stored in the message.
127+
*
128+
* @return The content stored.
129+
*/
130+
public Serializable getContent() {
131+
return (Serializable) content;
132+
}
133+
134+
/**
135+
* Returns the content stored in the message as a byte[]. You should use this to read the byte[] back from the
136+
* message if you provided it originally to the message as a byte[].
137+
*
138+
* @return The content stored as a byte[].
139+
*/
140+
public byte[] getContentAsByteArray() {
141+
return (byte[]) content;
142+
}
143+
141144
/**
142145
* Returns the content stored in the message as a String. You should use this to read the String back from the
143146
* message if you provided it originally to the message as a String.
144147
*
145-
* @return The content stored as a String using the {@link PubSubMessage#CHARSET}.
148+
* @return The content stored as a String.
146149
*/
147150
public String getContentAsString() {
148-
return content == null ? null : new String(content, CHARSET);
151+
return (String) content;
152+
}
153+
154+
/**
155+
* Returns the content stored in the message as a {@link Query}. You should use this to read the {@link Query} back
156+
* if you originally provided to the message as a {@link Serializable}.
157+
*
158+
* @return The content stored as a {@link Query}.
159+
*/
160+
public Query getContentAsQuery() {
161+
return (Query) content;
162+
}
163+
164+
/**
165+
* Set a {@link Serializable} content for this message.
166+
*
167+
* @param content The content for this message.
168+
*/
169+
public void setContent(Serializable content) {
170+
this.content = content;
149171
}
150172

151173
@Override
@@ -169,16 +191,38 @@ public String toString() {
169191

170192
@Override
171193
public String asJSON() {
172-
return JSONFormatter.asJSON(this);
194+
String data = Base64.getEncoder().encodeToString(SerializerDeserializer.toBytes((Serializable) content));
195+
PubSubMessage message = new PubSubMessage(id, data, metadata);
196+
return JSONFormatter.asJSON(message);
173197
}
174198

175199
/**
176-
* Converts a json representation back to an instance.
200+
* Converts a json representation back to an instance. Is the inverse of {@link #asJSON()}.
177201
*
178202
* @param json The string representation of the JSON.
179203
* @return An instance of this class.
180204
*/
181205
public static PubSubMessage fromJSON(String json) {
182-
return JSONFormatter.fromJSON(json, PubSubMessage.class);
206+
return fromJSON(json, GSON);
207+
}
208+
209+
/**
210+
* Converts a JSON representation back to an instance using a specific {@link Gson} converter.
211+
* Is the inverse of {@link #asJSON()}.
212+
*
213+
* @param json The string representation of the JSON.
214+
* @param gson The {@link Gson} converter to use.
215+
* @return An instance of this class.
216+
*/
217+
public static PubSubMessage fromJSON(String json, Gson gson) {
218+
return fromJSON(gson.fromJson(json, PubSubMessage.class));
219+
}
220+
221+
private static PubSubMessage fromJSON(PubSubMessage message) {
222+
if (message == null || message.getContent() == null) {
223+
return message;
224+
}
225+
message.content = SerializerDeserializer.fromBytes(Base64.getDecoder().decode(message.getContentAsString()));
226+
return message;
183227
}
184228
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2021 Yahoo Inc.
3+
* Licensed under the terms of the Apache License, Version 2.0.
4+
* See the LICENSE file associated with the project for terms.
5+
*/
6+
package com.yahoo.bullet.pubsub;
7+
8+
import com.yahoo.bullet.common.BulletConfig;
9+
import com.yahoo.bullet.query.Query;
10+
11+
import java.io.Serializable;
12+
13+
/**
14+
* This allows you to hook into the {@link PubSubMessage} serialization and deserialization - for instance, if you want
15+
* to customize what is done to the message before it is sent to or read in the backend. You can implement a custom
16+
* version of this to change what the {@link PubSubMessage} contains (see {@link #toMessage(PubSubMessage)}) and invert
17+
* it when it is read back (see {@link #fromMessage(PubSubMessage)}).
18+
*
19+
* Please note that this is not intended to be applied to messages read from the backend or for signals sent to the
20+
* backend. You can, for example, use this if you want to customize what is stored in storage or to lazily create a
21+
* {@link Query} in the backend.
22+
*/
23+
public abstract class PubSubMessageSerDe implements Serializable {
24+
private static final long serialVersionUID = 5352288558800960763L;
25+
26+
protected BulletConfig config;
27+
28+
/**
29+
* Constructor.
30+
*
31+
* @param config The {@link BulletConfig} to configure this class.
32+
*/
33+
public PubSubMessageSerDe(BulletConfig config) {
34+
this.config = config;
35+
}
36+
37+
/**
38+
* A helper to submit a query by converting it to a standard {@link PubSubMessage} and running it through
39+
* {@link #toMessage(PubSubMessage)}.
40+
*
41+
* @param id The ID for the query.
42+
* @param query The {@link Query} object to create a {@link PubSubMessage}.
43+
* @param queryString The BQL for the query.
44+
* @return A converted {@link PubSubMessage}.
45+
*/
46+
public PubSubMessage toMessage(String id, Query query, String queryString) {
47+
return toMessage(new PubSubMessage(id, query, new Metadata(null, queryString)));
48+
}
49+
50+
/**
51+
* Takes a {@link PubSubMessage} and returns the new format of the message. Note that it is allowed to modify the
52+
* original message. See {@link #fromMessage(PubSubMessage)} for the inverse.
53+
*
54+
* @param message The {@link PubSubMessage} to convert.
55+
* @return A converted {@link PubSubMessage}.
56+
*/
57+
public abstract PubSubMessage toMessage(PubSubMessage message);
58+
59+
/**
60+
* Takes a converted {@link PubSubMessage} and returns the original format of the message. Note that it is allowed
61+
* to modify the original message. See {@link #toMessage(PubSubMessage)} for the inverse.
62+
*
63+
* @param message The {@link PubSubMessage} to revert.
64+
* @return A reverted {@link PubSubMessage}.
65+
*/
66+
public abstract PubSubMessage fromMessage(PubSubMessage message);
67+
68+
/**
69+
* Create a {@link PubSubMessageSerDe} instance using the class specified in the config file.
70+
*
71+
* @param config The non-null {@link BulletConfig} containing the class name and its settings.
72+
* @return An instance of specified class initialized with settings from the input file and defaults.
73+
*/
74+
public static PubSubMessageSerDe from(BulletConfig config) {
75+
try {
76+
return config.loadConfiguredClass(BulletConfig.PUBSUB_MESSAGE_SERDE_CLASS_NAME);
77+
} catch (RuntimeException e) {
78+
throw new RuntimeException("Cannot create PubSubMessageSerDe instance.", e.getCause());
79+
}
80+
}
81+
}

src/main/java/com/yahoo/bullet/pubsub/Publisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package com.yahoo.bullet.pubsub;
77

8+
import java.io.Serializable;
9+
810
public interface Publisher extends AutoCloseable {
911
/**
1012
* Send a message with an ID and content.
@@ -14,7 +16,7 @@ public interface Publisher extends AutoCloseable {
1416
* @return The sent {@link PubSubMessage}.
1517
* @throws PubSubException if the messaging system throws an error.
1618
*/
17-
default PubSubMessage send(String id, byte[] content) throws PubSubException {
19+
default PubSubMessage send(String id, Serializable content) throws PubSubException {
1820
return send(new PubSubMessage(id, content));
1921
}
2022

src/main/java/com/yahoo/bullet/pubsub/rest/RESTPubSub.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import com.yahoo.bullet.pubsub.Subscriber;
1313
import lombok.extern.slf4j.Slf4j;
1414
import org.apache.http.impl.client.HttpClients;
15+
16+
import java.nio.charset.Charset;
17+
import java.nio.charset.StandardCharsets;
1518
import java.util.Collections;
1619
import java.util.List;
1720
import java.util.stream.Collectors;
@@ -21,7 +24,7 @@
2124
public class RESTPubSub extends PubSub {
2225
public static final int OK_200 = 200;
2326
public static final int NO_CONTENT_204 = 204;
24-
public static final String UTF_8 = "UTF-8";
27+
public static final Charset UTF_8 = StandardCharsets.UTF_8;
2528

2629
/**
2730
* Create a RESTPubSub from a {@link BulletConfig}.

src/main/java/com/yahoo/bullet/pubsub/rest/RESTSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public List<PubSubMessage> getMessages() {
7272
HttpEntity httpEntity = response.getEntity();
7373
String message = EntityUtils.toString(httpEntity, RESTPubSub.UTF_8);
7474
log.debug("Received message from url: {}. Message was {}", url, message);
75-
messages.add(GSON.fromJson(message, PubSubMessage.class));
75+
messages.add(PubSubMessage.fromJSON(message, GSON));
7676
EntityUtils.consume(httpEntity);
7777
} else if (statusCode != RESTPubSub.NO_CONTENT_204) {
7878
// NO_CONTENT_204 indicates there are no new messages - anything else indicates a problem

src/main/resources/bullet_defaults.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,5 @@ bullet.query.partitioner.equality.delimiter: "|"
249249
bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.rest.RESTPubSub"
250250
# The current context. This can be QUERY_PROCESSING or QUERY_SUBMISSION. The PubSub implementation should use this to generate appropriate Publishers and Subscribers.
251251
bullet.pubsub.context.name: "QUERY_PROCESSING"
252+
# The class to use for converting and reading PubSubMessage queries sent to the backend.
253+
bullet.pubsub.message.serde.class.name: "com.yahoo.bullet.pubsub.IdentityPubSubMessageSerDe"

0 commit comments

Comments
 (0)