Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.mozilla.telemetry.decoder.ParseProxy;
import com.mozilla.telemetry.decoder.ParseUri;
import com.mozilla.telemetry.decoder.ParseUserAgent;
import com.mozilla.telemetry.decoder.RerouteDocuments;
import com.mozilla.telemetry.transforms.DecompressPayload;
import com.mozilla.telemetry.transforms.LimitPayloadSize;
import com.mozilla.telemetry.transforms.NormalizeAttributes;
Expand Down Expand Up @@ -76,7 +77,9 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
.failuresTo(failureCollections) : p)

// URI Parsing
.map(p -> p.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections))
.map(p -> p //
.apply("ParseUri", ParseUri.of()).failuresTo(failureCollections)
.apply("RerouteDocuments", RerouteDocuments.of()))

// Special case: decryption of Pioneer payloads
.map(p -> options.getPioneerEnabled() ? p
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.mozilla.telemetry.decoder;

import com.mozilla.telemetry.ingestion.core.Constant.Attribute;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* A temporary shim that allows us to isolate a few document types to their own
* datasets in BigQuery by rewriting the document_namespace attribute.
*
* <p>See https://bugzilla.mozilla.org/show_bug.cgi?id=1654558
*/
public class RerouteDocuments {

private static final String XFOCSP_ERROR_REPORT = "xfocsp-error-report";

/**
* Return a transform that rewrites document_namespace for certain document types.
*/
public static PTransform<PCollection<? extends PubsubMessage>, PCollection<PubsubMessage>> of() {
return MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via((PubsubMessage m) -> {
if (ParseUri.TELEMETRY.equals(m.getAttribute(Attribute.DOCUMENT_NAMESPACE))
&& XFOCSP_ERROR_REPORT.equals(m.getAttribute(Attribute.DOCUMENT_TYPE))) {
Map<String, String> attributes = new HashMap<>(m.getAttributeMap());
attributes.put(Attribute.DOCUMENT_NAMESPACE, XFOCSP_ERROR_REPORT);
return new PubsubMessage(m.getPayload(), attributes);
} else {
return m;
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.mozilla.telemetry.decoder;

import com.mozilla.telemetry.options.InputFileFormat;
import com.mozilla.telemetry.options.OutputFileFormat;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;

public class RerouteDocumentsTest {

@Rule
public final transient TestPipeline pipeline = TestPipeline.create();

@Test
public void testOutput() {
final List<String> validInput = Arrays.asList(//
"{\"attributeMap\":{\"document_namespace\":\"telemetry\"" //
+ ",\"document_type\": \"xfocsp-error-report\"},\"payload\":\"\"}",
"{\"attributeMap\":{\"document_namespace\":\"telemetry\"" //
+ ",\"document_type\": \"main\"},\"payload\":\"\"}");

final List<String> expected = Arrays.asList(//
"{\"attributeMap\":{\"document_namespace\":\"xfocsp-error-report\"" //
+ ",\"document_type\":\"xfocsp-error-report\"},\"payload\":\"\"}",
"{\"attributeMap\":{\"document_namespace\":\"telemetry\"" //
+ ",\"document_type\":\"main\"},\"payload\":\"\"}");

PCollection<PubsubMessage> result = pipeline.apply(Create.of(validInput)) //
.apply("DecodeJsonInput", InputFileFormat.json.decode()) //
.apply("RerouteDocuments", RerouteDocuments.of());

PCollection<String> output = result //
.apply("EncodeJsonOutput", OutputFileFormat.json.encode());
PAssert.that(output).containsInAnyOrder(expected);

pipeline.run();
}
}