Skip to content

Commit b0d73cb

Browse files
committed
Refactor to use MapElements and introduce IllegalAetUriException
1 parent e076fa4 commit b0d73cb

File tree

1 file changed

+39
-15
lines changed

1 file changed

+39
-15
lines changed

ingestion-beam/src/main/java/com/mozilla/telemetry/aet/DecryptAetIdentifiers.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.IOException;
2121
import java.io.UncheckedIOException;
2222
import java.security.PrivateKey;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.Map;
2625
import java.util.Map.Entry;
@@ -29,7 +28,7 @@
2928
import org.apache.beam.sdk.metrics.Counter;
3029
import org.apache.beam.sdk.metrics.Metrics;
3130
import org.apache.beam.sdk.options.ValueProvider;
32-
import org.apache.beam.sdk.transforms.FlatMapElements;
31+
import org.apache.beam.sdk.transforms.MapElements;
3332
import org.apache.beam.sdk.transforms.PTransform;
3433
import org.apache.beam.sdk.transforms.ProcessFunction;
3534
import org.apache.beam.sdk.transforms.WithFailures;
@@ -57,8 +56,18 @@ public class DecryptAetIdentifiers extends
5756
private static final Pattern STRUCTURED_URI_PATTERN = Pattern
5857
.compile("/submit/[^/]+/account-ecosystem.*");
5958

60-
private final Counter nonAetDocType = Metrics.counter(DecryptAetIdentifiers.class,
61-
"non_aet_doctype");
59+
private final Counter unparsableAetPayload = Metrics.counter(DecryptAetIdentifiers.class,
60+
"unparsable_aet_payload");
61+
private final Counter illegalAetUri = Metrics.counter(DecryptAetIdentifiers.class,
62+
"illegal_aet_uri");
63+
private final Counter illegalAetPayload = Metrics.counter(DecryptAetIdentifiers.class,
64+
"illegal_aet_payload");
65+
private final Counter successTelemetry = Metrics.counter(DecryptAetIdentifiers.class,
66+
"success_telemetry");
67+
private final Counter successGlean = Metrics.counter(DecryptAetIdentifiers.class,
68+
"success_glean");
69+
private final Counter successStructured = Metrics.counter(DecryptAetIdentifiers.class,
70+
"success_structured");
6271

6372
public static DecryptAetIdentifiers of(ValueProvider<String> metadataLocation,
6473
ValueProvider<Boolean> kmsEnabled) {
@@ -76,6 +85,9 @@ private DecryptAetIdentifiers(ValueProvider<String> metadataLocation,
7685
*/
7786
abstract static class DecryptAetPayloadException extends RuntimeException {
7887

88+
private DecryptAetPayloadException() {
89+
}
90+
7991
private DecryptAetPayloadException(Throwable cause) {
8092
super(cause);
8193
}
@@ -88,6 +100,12 @@ public UnparsableAetPayloadException(Exception cause) {
88100
}
89101
}
90102

103+
public static class IllegalAetUriException extends DecryptAetPayloadException {
104+
105+
public IllegalAetUriException() {
106+
}
107+
}
108+
91109
public static class IllegalAetPayloadException extends DecryptAetPayloadException {
92110

93111
public IllegalAetPayloadException(Exception cause) {
@@ -143,7 +161,7 @@ static void sanitizeJsonNode(JsonNode node) {
143161
@Override
144162
public Result<PCollection<PubsubMessage>, PubsubMessage> expand(
145163
PCollection<PubsubMessage> messages) {
146-
return messages.apply(FlatMapElements.into(TypeDescriptor.of(PubsubMessage.class)) //
164+
return messages.apply(MapElements.into(TypeDescriptor.of(PubsubMessage.class)) //
147165
.via(new Fn()) //
148166
.exceptionsInto(TypeDescriptor.of(PubsubMessage.class)) //
149167
.exceptionsVia((WithFailures.ExceptionElement<PubsubMessage> ee) -> {
@@ -198,11 +216,10 @@ public static JsonNode decrypt(KeyStore keyStore, JsonNode anonIdNode)
198216
}
199217
}
200218

201-
private class Fn implements ProcessFunction<PubsubMessage, Iterable<PubsubMessage>> {
219+
private class Fn implements ProcessFunction<PubsubMessage, PubsubMessage> {
202220

203221
private void processDesktopTelemetryPayload(ObjectNode json) throws IOException, JoseException {
204222
validator.validate(telemetryAetSchema, json);
205-
System.out.println("Go telemetry");
206223
ObjectNode payload = (ObjectNode) json.path(FieldName.PAYLOAD);
207224
JsonNode anonIdNode = payload.remove("ecosystemAnonId");
208225
if (anonIdNode != null) {
@@ -228,7 +245,7 @@ private void processStructuredIngestionPayload(ObjectNode json)
228245
}
229246

230247
@Override
231-
public Iterable<PubsubMessage> apply(PubsubMessage message) {
248+
public PubsubMessage apply(PubsubMessage message) {
232249
message = PubsubConstraints.ensureNonNull(message);
233250
String uri = Strings.nullToEmpty(message.getAttribute(Attribute.URI));
234251
String[] uriParts = uri.split("/");
@@ -257,32 +274,39 @@ public Iterable<PubsubMessage> apply(PubsubMessage message) {
257274
try {
258275
json = Json.readObjectNode(message.getPayload());
259276
} catch (IOException e) {
277+
unparsableAetPayload.inc();
260278
throw new UnparsableAetPayloadException(e);
261279
}
262280

263-
// Note that we must work with the raw URI because ParseUri can raise errors that would
264-
// route payloads containing anon_id values to error output; this transform must be placed
265-
// before ParseUri so that we can handle redacting anon_id values.
281+
// This transform comes early in the Decoder job before ParseUri, so we must work with
282+
// the raw URI rather than parsed namespace and doctype.
283+
// In particular, ParseUri is the first transform that can send messages to error output;
284+
// we risk spilling sensitive information if AET payloads get sent to an output before anon_id
285+
// values are removed, so we must have this transform come before an error step so that we
286+
// can handle sanitizing/dropping the payload before emitting errors.
266287
byte[] normalizedPayload;
267288
try {
268289
if (TELEMETRY_URI_PATTERN.matcher(uri).matches()) {
269290
processDesktopTelemetryPayload(json);
291+
successTelemetry.inc();
270292
} else if (false) {
271293
// Placeholder condition for handling AET payloads coming from Glean-enabled applications;
272294
// design is evolving in https://bugzilla.mozilla.org/show_bug.cgi?id=1634468
295+
successGlean.inc();
273296
} else if (STRUCTURED_URI_PATTERN.matcher(uri).matches()) {
274297
processStructuredIngestionPayload(json);
298+
successStructured.inc();
275299
} else {
276-
nonAetDocType.inc();
277-
return Collections.singletonList(message);
300+
illegalAetUri.inc();
301+
throw new IllegalAetUriException();
278302
}
279303
normalizedPayload = Json.asBytes(json);
280304
} catch (IOException | JoseException | ValidationException | KeyNotFoundException e) {
305+
illegalAetPayload.inc();
281306
throw new IllegalAetPayloadException(e);
282307
}
283308

284-
return Collections
285-
.singletonList(new PubsubMessage(normalizedPayload, message.getAttributeMap()));
309+
return new PubsubMessage(normalizedPayload, message.getAttributeMap());
286310
}
287311

288312
}

0 commit comments

Comments
 (0)