Skip to content

Commit

Permalink
Track new connection: (airbytehq#2260)
Browse files Browse the repository at this point in the history
* Track new connection:  Log users who created a connection in Orbit, when not anonymized airbytehq#2163
  • Loading branch information
ChristopheDuong authored Mar 5, 2021
1 parent 20dec48 commit 6c3378c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public void track(String action, Map<String, Object> metadata) {
final Map<String, Object> mapCopy = new HashMap<>(metadata);
final TrackingIdentity trackingIdentity = identitySupplier.get();
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion());
if (!metadata.isEmpty()) {
trackingIdentity.getEmail().ifPresent(email -> mapCopy.put("email", email));
}
analytics.enqueue(TrackMessage.builder(action)
.userId(trackingIdentity.getCustomerId().toString())
.properties(mapCopy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
class SegmentTrackingClientTest {

private static final String AIRBYTE_VERSION = "dev";
private static final TrackingIdentity identity = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), "a@airbyte.io", false, false, true);
private static final String EMAIL = "a@airbyte.io";
private static final TrackingIdentity identity = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);

private Analytics analytics;
private SegmentTrackingClient segmentTrackingClient;
Expand Down Expand Up @@ -122,6 +123,7 @@ void testTrackWithMetadata() {
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
"height", "80 meters",
"email", EMAIL,
"airbyte_version", AIRBYTE_VERSION);

segmentTrackingClient.track("jump", metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import pendulum as pendulum
from base_python import BaseClient

# FIXME (Eugene K): register logger as standard python logger
from base_python.entrypoint import logger
from cached_property import cached_property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
package io.airbyte.server.handlers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.api.model.ConnectionCreate;
import io.airbyte.api.model.ConnectionIdRequestBody;
import io.airbyte.api.model.ConnectionRead;
Expand All @@ -37,8 +40,11 @@
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.enums.Enums;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.helpers.ScheduleHelpers;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -48,10 +54,15 @@
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionsHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionsHandler.class);

private final ConfigRepository configRepository;
private final Supplier<UUID> uuidGenerator;

Expand Down Expand Up @@ -101,9 +112,46 @@ public ConnectionRead createConnection(ConnectionCreate connectionCreate) throws

configRepository.writeStandardSchedule(standardSyncSchedule);

trackNewConnection(standardSync, standardSyncSchedule);

return buildConnectionRead(connectionId);
}

private void trackNewConnection(final StandardSync standardSync, final StandardSyncSchedule standardSyncSchedule) {
try {
final Builder<String, Object> metadataBuilder = generateMetadata(standardSync, standardSyncSchedule);
TrackingClientSingleton.get().track("New Connection - Backend", metadataBuilder.build());
} catch (Exception e) {
LOGGER.error("failed while reporting usage.", e);
}
}

private Builder<String, Object> generateMetadata(final StandardSync standardSync, final StandardSyncSchedule standardSyncSchedule) {
final Builder<String, Object> metadata = ImmutableMap.builder();

final UUID sourceConnectionId = standardSync.getSourceId();
final UUID destinationConnectionId = standardSync.getDestinationId();
final StandardSourceDefinition sourceDefinition = configRepository
.getSourceDefinitionFromConnection(sourceConnectionId);
final StandardDestinationDefinition destinationDefinition = configRepository
.getDestinationDefinitionFromConnection(destinationConnectionId);

metadata.put("connector_source", sourceDefinition.getName());
metadata.put("connector_source_definition_id", sourceDefinition.getSourceDefinitionId());
metadata.put("connector_destination", destinationDefinition.getName());
metadata.put("connector_destination_definition_id", destinationDefinition.getDestinationDefinitionId());

final String frequencyString;
if (standardSyncSchedule.getManual()) {
frequencyString = "manual";
} else {
final long intervalInMinutes = TimeUnit.SECONDS.toMinutes(ScheduleHelpers.getIntervalInSecond(standardSyncSchedule.getSchedule()));
frequencyString = intervalInMinutes + " min";
}
metadata.put("frequency", frequencyString);
return metadata;
}

public ConnectionRead updateConnection(ConnectionUpdate connectionUpdate) throws ConfigNotFoundException, IOException, JsonValidationException {
// retrieve sync
final StandardSync persistedSync = configRepository.getStandardSync(connectionUpdate.getConnectionId())
Expand Down

0 comments on commit 6c3378c

Please sign in to comment.