diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java index 157514e9fcb7b..34728cb938872 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricTags.java @@ -4,6 +4,7 @@ package io.airbyte.metrics.lib; +import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.db.instance.configs.jooq.enums.ReleaseStage; /** @@ -13,10 +14,16 @@ public class MetricTags { private static final String RELEASE_STAGE = "release_stage"; + private static final String FAILURE_ORIGIN = "failure_origin"; + public static String getReleaseStage(final ReleaseStage stage) { return tagDelimit(RELEASE_STAGE, stage.getLiteral()); } + public static String getFailureOrigin(final FailureOrigin origin) { + return tagDelimit(FAILURE_ORIGIN, origin.value()); + } + private static String tagDelimit(final String tagName, final String tagVal) { return String.join(":", tagName, tagVal); } diff --git a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java index 3436cec2ec38b..bbe48b62f98c1 100644 --- a/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java +++ b/airbyte-metrics/lib/src/main/java/io/airbyte/metrics/lib/MetricsRegistry.java @@ -38,6 +38,10 @@ public enum MetricsRegistry { MetricEmittingApps.WORKER, "attempt_failed_by_release_stage", "increments when an attempt fails. attempts are double counted as this is tagged by release stage."), + ATTEMPT_FAILED_BY_FAILURE_ORIGIN( + MetricEmittingApps.WORKER, + "attempt_failed_by_failure_origin", + "increments for every failure origin a failed attempt has. since a failure can have multiple origins, a single failure can be counted more than once. tagged by failure origin."), ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE( MetricEmittingApps.WORKER, "attempt_succeeded_by_release_stage", diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index dea5227fcf202..5ffc00cd4882d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -445,8 +445,7 @@ private void reportJobStarting() { } /** - * Start the child {@link SyncWorkflow}. We are - * using a child workflow here for two main reason: + * Start the child {@link SyncWorkflow}. We are using a child workflow here for two main reason: *

* - Originally the Sync workflow was living by himself and was launch by the scheduler. In order to * limit the potential migration issues, we kept the {@link SyncWorkflow} as is and launch it as a diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index bc3ae71f9f1d6..e027966751744 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -7,8 +7,10 @@ import com.google.common.collect.Lists; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; +import io.airbyte.config.AttemptFailureSummary; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.FailureReason; import io.airbyte.config.JobOutput; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSync; @@ -170,9 +172,10 @@ public void attemptFailure(final AttemptFailureInput input) { try { final int attemptId = input.getAttemptId(); final long jobId = input.getJobId(); + final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary(); jobPersistence.failAttempt(jobId, attemptId); - jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary()); + jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary); if (input.getStandardSyncOutput() != null) { final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput()); @@ -180,6 +183,9 @@ public void attemptFailure(final AttemptFailureInput input) { } emitJobIdToReleaseStagesMetric(MetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId); + for (final FailureReason reason : failureSummary.getFailures()) { + DogStatsDMetricSingleton.count(MetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1, MetricTags.getFailureOrigin(reason.getFailureOrigin())); + } } catch (final IOException e) { throw new RetryableException(e); }