Skip to content

Commit cfd1418

Browse files
committed
Bug 1635893 Implement DecryptAetIdentifiers
Supports Account Ecosystem Telemetry. This differs somewhat from the design in the open [draft docs PR](#1264), so those docs will be updated before deploying this.
1 parent 58d06d6 commit cfd1418

File tree

11 files changed

+686
-8
lines changed

11 files changed

+686
-8
lines changed

ingestion-beam/src/main/java/com/mozilla/telemetry/Decoder.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.mozilla.telemetry;
22

3+
import com.mozilla.telemetry.aet.DecryptAetIdentifiers;
34
import com.mozilla.telemetry.decoder.AddMetadata;
45
import com.mozilla.telemetry.decoder.DecoderOptions;
56
import com.mozilla.telemetry.decoder.DecryptPioneerPayloads;
@@ -55,7 +56,9 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
5556
final List<PCollection<PubsubMessage>> failureCollections = new ArrayList<>();
5657

5758
// We wrap pipeline in Optional for more convenience in chaining together transforms.
58-
Optional.of(pipeline) //
59+
Optional.of(pipeline)
60+
61+
// Input
5962
.map(p -> p //
6063
.apply(options.getInputType().read(options)) //
6164
// We apply ParseProxy and GeoCityLookup and GeoIspLookup first so that IP
@@ -64,12 +67,24 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
6467
.apply(ParseProxy.of()) //
6568
.apply(GeoIspLookup.of(options.getGeoIspDatabase())) //
6669
.apply(GeoCityLookup.of(options.getGeoCityDatabase(), options.getGeoCityFilter())) //
67-
.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections) //
6870
.apply(DecompressPayload.enabled(options.getDecompressInputPayloads())))
71+
72+
// Special case: decryption of Account Ecosystem Telemetry identifiers
73+
.map(p -> options.getAetEnabled() ? p
74+
.apply(DecryptAetIdentifiers.of(options.getAetMetadataLocation(),
75+
options.getAetKmsEnabled())) //
76+
.failuresTo(failureCollections) : p)
77+
78+
// URI Parsing
79+
.map(p -> p.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections))
80+
81+
// Special case: decryption of Pioneer payloads
6982
.map(p -> options.getPioneerEnabled() ? p
7083
.apply(DecryptPioneerPayloads.of(options.getPioneerMetadataLocation(),
71-
options.getPioneerKmsEnabled(), options.getPioneerDecompressPayload()))
84+
options.getPioneerKmsEnabled(), options.getPioneerDecompressPayload())) //
7285
.failuresTo(failureCollections) : p)
86+
87+
// Main output
7388
.map(p -> p //
7489
// See discussion in https://github.com/mozilla/gcp-ingestion/issues/776
7590
.apply("LimitPayloadSize", LimitPayloadSize.toMB(8)).failuresTo(failureCollections) //
@@ -78,12 +93,11 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
7893
.apply(ParseUserAgent.of()) //
7994
.apply(NormalizeAttributes.of()) //
8095
.apply("AddMetadata", AddMetadata.of()).failuresTo(failureCollections) //
81-
.apply(Deduplicate.removeDuplicates(options.getParsedRedisUri()))
82-
.sendDuplicateMetadataToErrors().failuresTo(failureCollections)) //
83-
.map(p -> p //
96+
.apply(Deduplicate.removeDuplicates(options.getParsedRedisUri())) //
97+
.sendDuplicateMetadataToErrors().failuresTo(failureCollections) //
8498
.apply(options.getOutputType().write(options)).failuresTo(failureCollections));
8599

86-
// Write error output collections.
100+
// Error output
87101
PCollectionList.of(failureCollections) //
88102
.apply("FlattenFailureCollections", Flatten.pCollections()) //
89103
.apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
package com.mozilla.telemetry.aet;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.fasterxml.jackson.databind.node.ArrayNode;
5+
import com.fasterxml.jackson.databind.node.ObjectNode;
6+
import com.fasterxml.jackson.databind.node.TextNode;
7+
import com.google.api.client.util.Lists;
8+
import com.google.common.annotations.VisibleForTesting;
9+
import com.google.common.base.Strings;
10+
import com.google.common.io.Resources;
11+
import com.mozilla.telemetry.ingestion.core.Constant.Attribute;
12+
import com.mozilla.telemetry.ingestion.core.Constant.FieldName;
13+
import com.mozilla.telemetry.ingestion.core.schema.JSONSchemaStore;
14+
import com.mozilla.telemetry.transforms.FailureMessage;
15+
import com.mozilla.telemetry.transforms.PubsubConstraints;
16+
import com.mozilla.telemetry.util.Json;
17+
import com.mozilla.telemetry.util.JsonValidator;
18+
import com.mozilla.telemetry.util.KeyStore;
19+
import com.mozilla.telemetry.util.KeyStore.KeyNotFoundException;
20+
import java.io.IOException;
21+
import java.io.UncheckedIOException;
22+
import java.security.PrivateKey;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Map.Entry;
27+
import java.util.regex.Pattern;
28+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
29+
import org.apache.beam.sdk.metrics.Counter;
30+
import org.apache.beam.sdk.metrics.Metrics;
31+
import org.apache.beam.sdk.options.ValueProvider;
32+
import org.apache.beam.sdk.transforms.FlatMapElements;
33+
import org.apache.beam.sdk.transforms.PTransform;
34+
import org.apache.beam.sdk.transforms.ProcessFunction;
35+
import org.apache.beam.sdk.transforms.WithFailures;
36+
import org.apache.beam.sdk.transforms.WithFailures.Result;
37+
import org.apache.beam.sdk.values.PCollection;
38+
import org.apache.beam.sdk.values.TypeDescriptor;
39+
import org.everit.json.schema.Schema;
40+
import org.everit.json.schema.ValidationException;
41+
import org.jose4j.jwe.JsonWebEncryption;
42+
import org.jose4j.jwx.JsonWebStructure;
43+
import org.jose4j.lang.JoseException;
44+
45+
public class DecryptAetIdentifiers extends
46+
PTransform<PCollection<PubsubMessage>, Result<PCollection<PubsubMessage>, PubsubMessage>> {
47+
48+
private final ValueProvider<String> metadataLocation;
49+
private final ValueProvider<Boolean> kmsEnabled;
50+
private transient KeyStore keyStore;
51+
private transient JsonValidator validator;
52+
private transient Schema telemetryAetSchema;
53+
private transient Schema structuredAetSchema;
54+
55+
private static final Pattern TELEMETRY_URI_PATTERN = Pattern
56+
.compile("/submit/telemetry/[^/]+/account-ecosystem.*");
57+
private static final Pattern STRUCTURED_URI_PATTERN = Pattern
58+
.compile("/submit/[^/]+/account-ecosystem.*");
59+
60+
private final Counter nonAetDocType = Metrics.counter(DecryptAetIdentifiers.class, "not_aet_doctype");
61+
62+
public static DecryptAetIdentifiers of(ValueProvider<String> metadataLocation,
63+
ValueProvider<Boolean> kmsEnabled) {
64+
return new DecryptAetIdentifiers(metadataLocation, kmsEnabled);
65+
}
66+
67+
private DecryptAetIdentifiers(ValueProvider<String> metadataLocation,
68+
ValueProvider<Boolean> kmsEnabled) {
69+
this.metadataLocation = metadataLocation;
70+
this.kmsEnabled = kmsEnabled;
71+
}
72+
73+
/**
74+
* Base class for all exceptions thrown by this class.
75+
*/
76+
abstract static class DecryptAetPayloadException extends RuntimeException {
77+
78+
private DecryptAetPayloadException(Throwable cause) {
79+
super(cause);
80+
}
81+
}
82+
83+
public static class UnparsableAetPayloadException extends DecryptAetPayloadException {
84+
85+
public UnparsableAetPayloadException(Exception cause) {
86+
super(cause);
87+
}
88+
}
89+
90+
public static class IllegalAetPayloadException extends DecryptAetPayloadException {
91+
92+
public IllegalAetPayloadException(Exception cause) {
93+
super(cause);
94+
}
95+
}
96+
97+
/**
98+
* Return a redacted value if the given node represents a long string.
99+
*
100+
* <p>Strings longer than 32 characters might be ecosystem_anon_id values, so we want to make
101+
* sure they are not propagated to error output.
102+
*/
103+
private static JsonNode redactedNode(JsonNode node) {
104+
String value = node.textValue();
105+
if (value != null && value.length() > 32) {
106+
return TextNode.valueOf(
107+
String.format("%s<%d characters redacted>", value.substring(0, 4), value.length() - 4));
108+
} else {
109+
return node;
110+
}
111+
}
112+
113+
/**
114+
* Recursively walk a JSON tree, redacting long string values.
115+
*/
116+
@VisibleForTesting
117+
static void sanitizeJsonNode(JsonNode node) {
118+
if (node.isObject()) {
119+
List<Entry<String, JsonNode>> fields = Lists.newArrayList(node.fields());
120+
for (Map.Entry<String, JsonNode> entry : fields) {
121+
String fieldName = entry.getKey();
122+
JsonNode value = entry.getValue();
123+
((ObjectNode) node).set(fieldName, redactedNode(value));
124+
}
125+
}
126+
if (node.isArray()) {
127+
List<JsonNode> elements = Lists.newArrayList(node.elements());
128+
for (int i = 0; i < elements.size(); i++) {
129+
JsonNode value = elements.get(i);
130+
if (value.isTextual() && value.asText().length() > 32) {
131+
((ArrayNode) node).set(i, redactedNode(value));
132+
}
133+
}
134+
}
135+
136+
// Recursively sanitize objects and arrays.
137+
if (node.isContainerNode()) {
138+
node.elements().forEachRemaining(DecryptAetIdentifiers::sanitizeJsonNode);
139+
}
140+
}
141+
142+
@Override
143+
public Result<PCollection<PubsubMessage>, PubsubMessage> expand(
144+
PCollection<PubsubMessage> messages) {
145+
return messages.apply(FlatMapElements.into(TypeDescriptor.of(PubsubMessage.class)) //
146+
.via(new Fn()) //
147+
.exceptionsInto(TypeDescriptor.of(PubsubMessage.class)) //
148+
.exceptionsVia((WithFailures.ExceptionElement<PubsubMessage> ee) -> {
149+
byte[] sanitizedPayload;
150+
try {
151+
throw ee.exception();
152+
} catch (UnparsableAetPayloadException e) {
153+
sanitizedPayload = null;
154+
} catch (IllegalAetPayloadException e) {
155+
try {
156+
ObjectNode json = Json.readObjectNode(ee.element().getPayload());
157+
sanitizeJsonNode(json);
158+
sanitizedPayload = Json.asBytes(json);
159+
} catch (IOException ignore) {
160+
sanitizedPayload = null;
161+
}
162+
}
163+
return FailureMessage.of(DecryptAetPayloadException.class.getSimpleName(),
164+
new PubsubMessage(sanitizedPayload, ee.element().getAttributeMap()), ee.exception());
165+
}));
166+
}
167+
168+
/**
169+
* Decrypt a payload encoded in a compact serialization of JSON Web Encryption (JWE).
170+
*
171+
* <p>The payload may be either a single JWE string or an array of values.
172+
*
173+
* <p>Assumes that the payload contains a "kid" parameter that can be used to look up a matching
174+
* private key.
175+
*/
176+
public static JsonNode decrypt(KeyStore keyStore, JsonNode anonIdNode)
177+
throws JoseException, KeyNotFoundException {
178+
if (anonIdNode.isTextual()) {
179+
String anonId = anonIdNode.textValue();
180+
JsonWebStructure fromCompact = JsonWebEncryption.fromCompactSerialization(anonId);
181+
String keyId = fromCompact.getKeyIdHeaderValue();
182+
PrivateKey key = keyStore.getKeyOrThrow(keyId);
183+
JsonWebEncryption jwe = new JsonWebEncryption();
184+
jwe.setKey(key);
185+
jwe.setContentEncryptionKey(key.getEncoded());
186+
jwe.setCompactSerialization(anonId);
187+
return TextNode.valueOf(jwe.getPlaintextString());
188+
} else if (anonIdNode.isArray()) {
189+
ArrayNode userIds = Json.createArrayNode();
190+
for (JsonNode node : anonIdNode) {
191+
userIds.add(decrypt(keyStore, node));
192+
}
193+
return userIds;
194+
} else {
195+
throw new IllegalArgumentException(
196+
"Argument to decrypt must be a TextNode or ArrayNode, but got " + anonIdNode);
197+
}
198+
}
199+
200+
private class Fn implements ProcessFunction<PubsubMessage, Iterable<PubsubMessage>> {
201+
202+
private void processDesktopTelemetryPayload(ObjectNode json) throws IOException, JoseException {
203+
validator.validate(telemetryAetSchema, json);
204+
System.out.println("Go telemetry");
205+
ObjectNode payload = (ObjectNode) json.path(FieldName.PAYLOAD);
206+
JsonNode anonIdNode = payload.remove("ecosystemAnonId");
207+
if (anonIdNode != null) {
208+
payload.set("ecosystemUserId", decrypt(keyStore, anonIdNode));
209+
}
210+
ArrayNode prevAnonIdsNode = (ArrayNode) payload.remove("previousEcosystemAnonIds");
211+
if (prevAnonIdsNode != null) {
212+
payload.set("previousEcosystemUserIds", decrypt(keyStore, prevAnonIdsNode));
213+
}
214+
}
215+
216+
private void processStructuredIngestionPayload(ObjectNode json)
217+
throws IOException, JoseException {
218+
validator.validate(structuredAetSchema, json);
219+
JsonNode anonIdNode = json.remove("ecosystem_anon_id");
220+
if (anonIdNode != null) {
221+
json.set("ecosystem_user_id", decrypt(keyStore, anonIdNode));
222+
}
223+
ArrayNode prevAnonIdsNode = (ArrayNode) json.remove("previous_ecosystem_anon_ids");
224+
if (prevAnonIdsNode != null) {
225+
json.set("previous_ecosystem_user_ids", decrypt(keyStore, prevAnonIdsNode));
226+
}
227+
}
228+
229+
@Override
230+
public Iterable<PubsubMessage> apply(PubsubMessage message) {
231+
message = PubsubConstraints.ensureNonNull(message);
232+
String uri = Strings.nullToEmpty(message.getAttribute(Attribute.URI));
233+
String[] uriParts = uri.split("/");
234+
235+
if (keyStore == null) {
236+
// If configured resources aren't available, this throws UncheckedIOException;
237+
// this is unretryable so we allow it to bubble up and kill the worker and eventually fail
238+
// the pipeline.
239+
keyStore = KeyStore.of(metadataLocation.get(), kmsEnabled.get());
240+
}
241+
242+
if (validator == null) {
243+
validator = new JsonValidator();
244+
try {
245+
telemetryAetSchema = JSONSchemaStore.readSchema(Resources
246+
.toByteArray(Resources.getResource("account-ecosystem/telemetry.schema.json")));
247+
structuredAetSchema = JSONSchemaStore.readSchema(Resources
248+
.toByteArray(Resources.getResource("account-ecosystem/structured.schema.json")));
249+
} catch (IOException e) {
250+
// We let this problem bubble up and kill the worker.
251+
throw new UncheckedIOException("Unable to load AET JSON schemas", e);
252+
}
253+
}
254+
255+
ObjectNode json;
256+
try {
257+
json = Json.readObjectNode(message.getPayload());
258+
} catch (IOException e) {
259+
throw new UnparsableAetPayloadException(e);
260+
}
261+
262+
// Note that we must work with the raw URI because ParseUri can raise errors that would
263+
// route payloads containing anon_id values to error output; this transform must be placed
264+
// before ParseUri so that we can handle redacting anon_id values.
265+
byte[] normalizedPayload;
266+
try {
267+
if (TELEMETRY_URI_PATTERN.matcher(uri).matches()) {
268+
processDesktopTelemetryPayload(json);
269+
} else if (false) {
270+
// Placeholder condition for handling AET payloads coming from Glean-enabled applications;
271+
// design is evolving in https://bugzilla.mozilla.org/show_bug.cgi?id=1634468
272+
} else if (STRUCTURED_URI_PATTERN.matcher(uri).matches()) {
273+
processStructuredIngestionPayload(json);
274+
} else {
275+
nonAetDocType.inc();
276+
return Collections.singletonList(message);
277+
}
278+
normalizedPayload = Json.asBytes(json);
279+
} catch (IOException | JoseException | ValidationException | KeyNotFoundException e) {
280+
throw new IllegalAetPayloadException(e);
281+
}
282+
283+
return Collections
284+
.singletonList(new PubsubMessage(normalizedPayload, message.getAttributeMap()));
285+
}
286+
287+
}
288+
}

ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/DecoderOptions.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,27 @@ public interface DecoderOptions extends SinkOptions, PipelineOptions {
7272

7373
void setPioneerDecompressPayload(ValueProvider<Boolean> value);
7474

75+
@Description("If set to true, enable decryption of Account Ecosystem Telemetry identifiers.")
76+
@Default.Boolean(false)
77+
Boolean getAetEnabled();
78+
79+
void setAetEnabled(Boolean value);
80+
81+
@Description("Path (local or gs://) to JSON array of metadata entries enumerating encrypted"
82+
+ " private keys, Cloud KMS resource ids for decrypting those keys, and their corresponding"
83+
+ " document namespaces; this must be set if AET is enabled.")
84+
ValueProvider<String> getAetMetadataLocation();
85+
86+
void setAetMetadataLocation(ValueProvider<String> value);
87+
88+
@Description("If set to true, assume that all private keys are encrypted with the associated"
89+
+ " KMS resourceId. Otherwise ignore KMS and assume all private keys are stored in plaintext."
90+
+ " This may be used for debugging.")
91+
@Default.Boolean(true)
92+
ValueProvider<Boolean> getAetKmsEnabled();
93+
94+
void setAetKmsEnabled(ValueProvider<Boolean> value);
95+
7596
/*
7697
* Subinterface and static methods.
7798
*/

0 commit comments

Comments
 (0)