Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand Down Expand Up @@ -88,9 +89,12 @@ public class WordCount {
*/
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist = Metrics.distribution(
ExtractWordsFn.class, "lineLenDistro");

@ProcessElement
public void processElement(ProcessContext c) {
lineLenDist.update(c.element().length());
if (c.element().trim().isEmpty()) {
emptyLines.inc();
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
<dataflow.version>v1b3-rev196-1.22.0</dataflow.version>
<dataflow.version>v1b3-rev198-1.20.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.client.version>1.4.0</datastore.client.version>
<datastore.proto.version>1.3.0</datastore.proto.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import static com.google.common.base.MoreObjects.firstNonNull;

import com.google.api.client.util.ArrayMap;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.auto.value.AutoValue;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
Expand All @@ -28,6 +30,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricFiltering;
import org.apache.beam.runners.core.metrics.MetricKey;
import org.apache.beam.sdk.metrics.DistributionResult;
Expand Down Expand Up @@ -72,34 +75,6 @@ public DataflowMetrics(DataflowPipelineJob dataflowPipelineJob, DataflowClient d
this.dataflowPipelineJob = dataflowPipelineJob;
}

/**
* Build an immutable map that serves as a hash key for a metric update.
* @return a {@link MetricKey} that can be hashed and used to identify a metric.
*/
private MetricKey metricHashKey(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
String fullStepName = metricUpdate.getName().getContext().get("step");
fullStepName = (dataflowPipelineJob.transformStepNames != null
? dataflowPipelineJob.transformStepNames
.inverse().get(fullStepName).getFullName() : fullStepName);
return MetricKey.create(
fullStepName,
MetricName.named(
metricUpdate.getName().getContext().get("namespace"),
metricUpdate.getName().getName()));
}

/**
* Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
* update or not.
* @return true if update is tentative, false otherwise
*/
private boolean isMetricTentative(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
return (metricUpdate.getName().getContext().containsKey("tentative")
&& Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
}

/**
* Take a list of metric updates coming from the Dataflow service, and format it into a
* Metrics API MetricQueryResults instance.
Expand All @@ -109,65 +84,8 @@ private boolean isMetricTentative(
private MetricQueryResults populateMetricQueryResults(
List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
MetricsFilter filter) {
// Separate metric updates by name and by tentative/committed.
HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
tentativeByName = new HashMap<>();
HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
committedByName = new HashMap<>();
HashSet<MetricKey> metricHashKeys = new HashSet<>();

// If the Context of the metric update does not have a namespace, then these are not
// actual metrics counters.
for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
if (Objects.equal(update.getName().getOrigin(), "user") && isMetricTentative(update)
&& update.getName().getContext().containsKey("namespace")) {
tentativeByName.put(metricHashKey(update), update);
metricHashKeys.add(metricHashKey(update));
} else if (Objects.equal(update.getName().getOrigin(), "user")
&& update.getName().getContext().containsKey("namespace")
&& !isMetricTentative(update)) {
committedByName.put(metricHashKey(update), update);
metricHashKeys.add(metricHashKey(update));
}
}
// Create the lists with the metric result information.
ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder();
ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults =
ImmutableList.builder();
ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder();
for (MetricKey metricKey : metricHashKeys) {
if (!MetricFiltering.matches(filter, metricKey)) {
// Skip unmatched metrics early.
continue;
}

// This code is not robust to evolutions in the types of metrics that can be returned, so
// wrap it in a try-catch and log errors.
try {
String metricName = metricKey.metricName().name();
if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
|| metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
// Skip distribution metrics, as these are not yet properly supported.
LOG.warn("Distribution metrics are not yet supported. You can see them in the Dataflow"
+ " User Interface");
continue;
}

String namespace = metricKey.metricName().namespace();
String step = metricKey.stepName();
Long committed = ((Number) committedByName.get(metricKey).getScalar()).longValue();
Long attempted = ((Number) tentativeByName.get(metricKey).getScalar()).longValue();
counterResults.add(
DataflowMetricResult.create(
MetricName.named(namespace, metricName), step, committed, attempted));
} catch (Exception e) {
LOG.warn("Error handling metric {} for filter {}, skipping result.", metricKey, filter);
}
}
return DataflowMetricQueryResults.create(
counterResults.build(),
distributionResults.build(),
gaugeResults.build());
return DataflowMetricQueryResultsFactory.create(dataflowPipelineJob, metricUpdates, filter)
.build();
}

private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) {
Expand Down Expand Up @@ -206,6 +124,195 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
return result;
}

private static class DataflowMetricResultExtractor {
private final ImmutableList.Builder<MetricResult<Long>> counterResults;
private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;

DataflowMetricResultExtractor() {
counterResults = ImmutableList.builder();
distributionResults = ImmutableList.builder();
gaugeResults = ImmutableList.builder();
}

public void addMetricResult(
MetricKey metricKey,
@Nullable com.google.api.services.dataflow.model.MetricUpdate committed,
@Nullable com.google.api.services.dataflow.model.MetricUpdate attempted) {
if (committed == null || attempted == null) {
LOG.warn(
"Unexpected metric {} did not have both a committed ({}) and tentative value ({}).",
metricKey, committed, attempted);
} else if (committed.getDistribution() != null && attempted.getDistribution() != null) {
// distribution metric
distributionResults.add(
DataflowMetricResult.create(
metricKey.metricName(),
metricKey.stepName(),
getDistributionValue(committed),
getDistributionValue(attempted)));
} else if (committed.getScalar() != null && attempted.getScalar() != null) {
// counter metric
counterResults.add(
DataflowMetricResult.create(
metricKey.metricName(),
metricKey.stepName(),
getCounterValue(committed),
getCounterValue(attempted)));
} else {
// This is exceptionally unexpected. We expect matching user metrics to only have the
// value types provided by the Metrics API.
LOG.warn("Unexpected metric type. Please report JOB ID to Dataflow Support. Metric key: "
+ metricKey.toString());
}
}

private Long getCounterValue(com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
if (metricUpdate.getScalar() == null) {
return 0L;
}
return ((Number) metricUpdate.getScalar()).longValue();
}

private DistributionResult getDistributionValue(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
if (metricUpdate.getDistribution() == null) {
return DistributionResult.ZERO;
}
ArrayMap distributionMap = (ArrayMap) metricUpdate.getDistribution();
Long count = ((Number) distributionMap.get("count")).longValue();
Long min = ((Number) distributionMap.get("min")).longValue();
Long max = ((Number) distributionMap.get("max")).longValue();
Long mean = ((Number) distributionMap.get("mean")).longValue();
// TODO: Switch to use sum when it's available in the service.
return DistributionResult.create(count * mean, count, min, max);
}

public Iterable<MetricResult<DistributionResult>> getDistributionResults() {
return distributionResults.build();
}

public Iterable<MetricResult<Long>> getCounterResults() {
return counterResults.build();
}

public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
return gaugeResults.build();
}
}

private static class DataflowMetricQueryResultsFactory {
private final Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates;
private final MetricsFilter filter;
private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
tentativeByName;
private final HashMap<MetricKey, com.google.api.services.dataflow.model.MetricUpdate>
committedByName;
private final HashSet<MetricKey> metricHashKeys;
private final DataflowPipelineJob dataflowPipelineJob;

public static DataflowMetricQueryResultsFactory create(DataflowPipelineJob dataflowPipelineJob,
Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
MetricsFilter filter) {
return new DataflowMetricQueryResultsFactory(dataflowPipelineJob, metricUpdates, filter);
}

private DataflowMetricQueryResultsFactory(DataflowPipelineJob dataflowPipelineJob,
Iterable<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates,
MetricsFilter filter) {
this.dataflowPipelineJob = dataflowPipelineJob;
this.metricUpdates = metricUpdates;
this.filter = filter;

tentativeByName = new HashMap<>();
committedByName = new HashMap<>();
metricHashKeys = new HashSet<>();
}

/**
* Check whether a {@link com.google.api.services.dataflow.model.MetricUpdate} is a tentative
* update or not.
* @return true if update is tentative, false otherwise
*/
private boolean isMetricTentative(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
return (metricUpdate.getName().getContext().containsKey("tentative")
&& Objects.equal(metricUpdate.getName().getContext().get("tentative"), "true"));
}

/**
* Build an immutable map that serves as a hash key for a metric update.
* @return a {@link MetricKey} that can be hashed and used to identify a metric.
*/
private MetricKey getMetricHashKey(
com.google.api.services.dataflow.model.MetricUpdate metricUpdate) {
String fullStepName = metricUpdate.getName().getContext().get("step");
fullStepName = (dataflowPipelineJob.transformStepNames != null
? dataflowPipelineJob.transformStepNames
.inverse().get(fullStepName).getFullName() : fullStepName);
return MetricKey.create(
fullStepName,
MetricName.named(
metricUpdate.getName().getContext().get("namespace"),
metricUpdate.getName().getName()));
}

private void buildMetricsIndex() {
// If the Context of the metric update does not have a namespace, then these are not
// actual metrics counters.
for (com.google.api.services.dataflow.model.MetricUpdate update : metricUpdates) {
MetricKey updateKey = getMetricHashKey(update);
if (!MetricFiltering.matches(filter, updateKey)) {
// Skip unmatched metrics early.
continue;
}

if (update.getName().getOrigin() != null
&& update.getName().getOrigin().toLowerCase().equals("user")
&& update.getName().getContext().containsKey("namespace")) {
// Skip non-user metrics, which should have both a "user" origin and a namespace.
continue;
}

metricHashKeys.add(updateKey);
if (isMetricTentative(update)) {
MetricUpdate previousUpdate = tentativeByName.put(updateKey, update);
if (previousUpdate != null) {
LOG.warn("Metric {} alreday had a tentative value of {}", updateKey, previousUpdate);
}
} else {
MetricUpdate previousUpdate = committedByName.put(updateKey, update);
if (previousUpdate != null) {
LOG.warn("Metric {} alreday had a committed value of {}", updateKey, previousUpdate);
}
}
}
}

public MetricQueryResults build() {
buildMetricsIndex();

DataflowMetricResultExtractor extractor = new DataflowMetricResultExtractor();
for (MetricKey metricKey : metricHashKeys) {
String metricName = metricKey.metricName().name();
if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]")
|| metricName.endsWith("[MEAN]") || metricName.endsWith("[COUNT]")) {
// Skip distribution metrics, as these are not yet properly supported.
// TODO: remove this when distributions stop being broken up for the UI.
continue;
}

extractor.addMetricResult(metricKey,
committedByName.get(metricKey),
tentativeByName.get(metricKey));
}
return DataflowMetricQueryResults.create(
extractor.getCounterResults(),
extractor.getDistributionResults(),
extractor.getGaugeResults());
}
}

@AutoValue
abstract static class DataflowMetricQueryResults implements MetricQueryResults {
public static MetricQueryResults create(
Expand Down
Loading