Skip to content

Commit c6c5382

Browse files
authored
Bug 1654558 Implement RerouteDocuments for xfocsp-error-report (#1323)
Relies on mozilla-services/mozilla-pipeline-schemas#581
1 parent 8782555 commit c6c5382

File tree

3 files changed

+86
-1
lines changed

3 files changed

+86
-1
lines changed

ingestion-beam/src/main/java/com/mozilla/telemetry/Decoder.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.mozilla.telemetry.decoder.ParseProxy;
1212
import com.mozilla.telemetry.decoder.ParseUri;
1313
import com.mozilla.telemetry.decoder.ParseUserAgent;
14+
import com.mozilla.telemetry.decoder.RerouteDocuments;
1415
import com.mozilla.telemetry.transforms.DecompressPayload;
1516
import com.mozilla.telemetry.transforms.LimitPayloadSize;
1617
import com.mozilla.telemetry.transforms.NormalizeAttributes;
@@ -76,7 +77,9 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
7677
.failuresTo(failureCollections) : p)
7778

7879
// URI Parsing
79-
.map(p -> p.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections))
80+
.map(p -> p //
81+
.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections)
82+
.apply("RerouteDocuments", RerouteDocuments.of()))
8083

8184
// Special case: decryption of Pioneer payloads
8285
.map(p -> options.getPioneerEnabled() ? p
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.mozilla.telemetry.decoder;
2+
3+
import com.mozilla.telemetry.ingestion.core.Constant.Attribute;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
7+
import org.apache.beam.sdk.transforms.MapElements;
8+
import org.apache.beam.sdk.transforms.PTransform;
9+
import org.apache.beam.sdk.values.PCollection;
10+
import org.apache.beam.sdk.values.TypeDescriptor;
11+
12+
/**
13+
* A temporary shim that allows us to isolate a few document types to their own
14+
* datasets in BigQuery by rewriting the document_namespace attribute.
15+
*
16+
* <p>See https://bugzilla.mozilla.org/show_bug.cgi?id=1654558
17+
*/
18+
public class RerouteDocuments {
19+
20+
private static final String XFOCSP_ERROR_REPORT = "xfocsp-error-report";
21+
22+
/**
23+
* Return a transform that rewrites document_namespace for certain document types.
24+
*/
25+
public static PTransform<PCollection<? extends PubsubMessage>, PCollection<PubsubMessage>> of() {
26+
return MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via((PubsubMessage m) -> {
27+
if (ParseUri.TELEMETRY.equals(m.getAttribute(Attribute.DOCUMENT_NAMESPACE))
28+
&& XFOCSP_ERROR_REPORT.equals(m.getAttribute(Attribute.DOCUMENT_TYPE))) {
29+
Map<String, String> attributes = new HashMap<>(m.getAttributeMap());
30+
attributes.put(Attribute.DOCUMENT_NAMESPACE, XFOCSP_ERROR_REPORT);
31+
return new PubsubMessage(m.getPayload(), attributes);
32+
} else {
33+
return m;
34+
}
35+
});
36+
}
37+
38+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.mozilla.telemetry.decoder;
2+
3+
import com.mozilla.telemetry.options.InputFileFormat;
4+
import com.mozilla.telemetry.options.OutputFileFormat;
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
8+
import org.apache.beam.sdk.testing.PAssert;
9+
import org.apache.beam.sdk.testing.TestPipeline;
10+
import org.apache.beam.sdk.transforms.Create;
11+
import org.apache.beam.sdk.values.PCollection;
12+
import org.junit.Rule;
13+
import org.junit.Test;
14+
15+
public class RerouteDocumentsTest {
16+
17+
@Rule
18+
public final transient TestPipeline pipeline = TestPipeline.create();
19+
20+
@Test
21+
public void testOutput() {
22+
final List<String> validInput = Arrays.asList(//
23+
"{\"attributeMap\":{\"document_namespace\":\"telemetry\"" //
24+
+ ",\"document_type\": \"xfocsp-error-report\"},\"payload\":\"\"}",
25+
"{\"attributeMap\":{\"document_namespace\":\"telemetry\"" //
26+
+ ",\"document_type\": \"main\"},\"payload\":\"\"}");
27+
28+
final List<String> expected = Arrays.asList(//
29+
"{\"attributeMap\":{\"document_namespace\":\"xfocsp-error-report\"" //
30+
+ ",\"document_type\":\"xfocsp-error-report\"},\"payload\":\"\"}",
31+
"{\"attributeMap\":{\"document_namespace\":\"telemetry\"" //
32+
+ ",\"document_type\":\"main\"},\"payload\":\"\"}");
33+
34+
PCollection<PubsubMessage> result = pipeline.apply(Create.of(validInput)) //
35+
.apply("DecodeJsonInput", InputFileFormat.json.decode()) //
36+
.apply("RerouteDocuments", RerouteDocuments.of());
37+
38+
PCollection<String> output = result //
39+
.apply("EncodeJsonOutput", OutputFileFormat.json.encode());
40+
PAssert.that(output).containsInAnyOrder(expected);
41+
42+
pipeline.run();
43+
}
44+
}

0 commit comments

Comments
 (0)