Skip to content

Commit

Permalink
deprecate DEFAULT_WORKSPACE_ID (#5009)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Jul 28, 2021
1 parent c791032 commit 1f58fb7
Show file tree
Hide file tree
Showing 45 changed files with 366 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,41 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;
import java.util.UUID;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggingTrackingClient implements TrackingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingTrackingClient.class);

private final Supplier<TrackingIdentity> identitySupplier;
private final Function<UUID, TrackingIdentity> identityFetcher;

public LoggingTrackingClient(Supplier<TrackingIdentity> identitySupplier) {
this.identitySupplier = identitySupplier;
public LoggingTrackingClient(Function<UUID, TrackingIdentity> identityFetcher) {
this.identityFetcher = identityFetcher;
}

@Override
public void identify() {
LOGGER.info("identify. userId: {}", identitySupplier.get().getCustomerId());
public void identify(UUID workspaceId) {
LOGGER.info("identify. userId: {}", identityFetcher.apply(workspaceId).getCustomerId());
}

@Override
public void alias(String previousCustomerId) {
LOGGER.info("merge. userId: {} previousUserId: {}", identitySupplier.get().getCustomerId(), previousCustomerId);
public void alias(UUID workspaceId, String previousCustomerId) {
LOGGER.info("merge. userId: {} previousUserId: {}", identityFetcher.apply(workspaceId).getCustomerId(), previousCustomerId);
}

@Override
public void track(String action) {
track(action, Collections.emptyMap());
public void track(UUID workspaceId, String action) {
track(workspaceId, action, Collections.emptyMap());
}

@Override
public void track(String action, Map<String, Object> metadata) {
public void track(UUID workspaceId, String action, Map<String, Object> metadata) {
LOGGER.info("track. version: {}, userId: {}, action: {}, metadata: {}",
identitySupplier.get().getAirbyteVersion(),
identitySupplier.get().getCustomerId(),
identityFetcher.apply(workspaceId).getAirbyteVersion(),
identityFetcher.apply(workspaceId).getCustomerId(),
action,
metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.UUID;
import java.util.function.Function;

public class SegmentTrackingClient implements TrackingClient {

Expand All @@ -43,32 +44,31 @@ public class SegmentTrackingClient implements TrackingClient {

// Analytics is threadsafe.
private final Analytics analytics;
private final Supplier<TrackingIdentity> identitySupplier;
private final Function<UUID, TrackingIdentity> identityFetcher;
private final Deployment deployment;
private final String airbyteRole;

@VisibleForTesting
SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
SegmentTrackingClient(final Function<UUID, TrackingIdentity> identityFetcher,
final Deployment deployment,

final String airbyteRole,
final Analytics analytics) {
this.identitySupplier = identitySupplier;
this.identityFetcher = identityFetcher;
this.deployment = deployment;
this.analytics = analytics;
this.airbyteRole = airbyteRole;
}

public SegmentTrackingClient(final Supplier<TrackingIdentity> identitySupplier,
public SegmentTrackingClient(final Function<UUID, TrackingIdentity> identityFetcher,
final Deployment deployment,

final String airbyteRole) {
this(identitySupplier, deployment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
this(identityFetcher, deployment, airbyteRole, Analytics.builder(SEGMENT_WRITE_KEY).build());
}

@Override
public void identify() {
final TrackingIdentity trackingIdentity = identitySupplier.get();
public void identify(UUID workspaceId) {
final TrackingIdentity trackingIdentity = identityFetcher.apply(workspaceId);
final Map<String, Object> identityMetadata = new HashMap<>();

// deployment
Expand All @@ -95,19 +95,19 @@ public void identify() {
}

@Override
public void alias(String previousCustomerId) {
analytics.enqueue(AliasMessage.builder(previousCustomerId).userId(identitySupplier.get().getCustomerId().toString()));
public void alias(UUID workspaceId, String previousCustomerId) {
analytics.enqueue(AliasMessage.builder(previousCustomerId).userId(identityFetcher.apply(workspaceId).getCustomerId().toString()));
}

@Override
public void track(String action) {
track(action, Collections.emptyMap());
public void track(UUID workspaceId, String action) {
track(workspaceId, action, Collections.emptyMap());
}

@Override
public void track(String action, Map<String, Object> metadata) {
public void track(UUID workspaceId, String action, Map<String, Object> metadata) {
final Map<String, Object> mapCopy = new HashMap<>(metadata);
final TrackingIdentity trackingIdentity = identitySupplier.get();
final TrackingIdentity trackingIdentity = identityFetcher.apply(workspaceId);
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion());
if (!metadata.isEmpty()) {
trackingIdentity.getEmail().ifPresent(email -> mapCopy.put("email", email));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@
package io.airbyte.analytics;

import java.util.Map;
import java.util.UUID;

public interface TrackingClient {

void identify();
void identify(UUID workspaceId);

void alias(String previousCustomerId);
void alias(UUID workspaceId, String previousCustomerId);

void track(String action);
void track(UUID workspaceId, String action);

void track(String action, Map<String, Object> metadata);
void track(UUID workspaceId, String action, Map<String, Object> metadata);

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.PersistenceConstants;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.function.Supplier;
import java.util.UUID;
import java.util.function.Function;

public class TrackingClientSingleton {

Expand Down Expand Up @@ -64,18 +64,18 @@ public static void initialize(final Configs.TrackingStrategy trackingStrategy,
trackingStrategy,
deployment,
airbyteRole,
() -> getTrackingIdentity(configRepository, airbyteVersion)));
(workspaceId) -> getTrackingIdentity(configRepository, airbyteVersion, workspaceId)));
}

// fallback on a logging client with an empty identity.
private static void initialize() {
initialize(new LoggingTrackingClient(TrackingIdentity::empty));
initialize(new LoggingTrackingClient(workspaceId -> TrackingIdentity.empty()));
}

@VisibleForTesting
static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, String airbyteVersion) {
static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, String airbyteVersion, UUID workspaceId) {
try {
final StandardWorkspace workspace = configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID, true);
final StandardWorkspace workspace = configRepository.getStandardWorkspace(workspaceId, true);
String email = null;
if (workspace.getEmail() != null && workspace.getAnonymousDataCollection() != null && !workspace.getAnonymousDataCollection()) {
email = workspace.getEmail();
Expand All @@ -88,32 +88,34 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, S
workspace.getNews(),
workspace.getSecurityUpdates());
} catch (ConfigNotFoundException e) {
throw new RuntimeException("could not find workspace with id: " + PersistenceConstants.DEFAULT_WORKSPACE_ID, e);
throw new RuntimeException("could not find workspace with id: " + workspaceId, e);
} catch (JsonValidationException | IOException e) {
throw new RuntimeException(e);
}
}

// todo (cgardens) - trackingIdentityFetcher should probably have some sort of caching where it is
// only re-fetched on identify or alias.
/**
* Creates a tracking client that uses the appropriate strategy from an identity supplier.
*
* @param trackingStrategy - what type of tracker we want to use.
* @param deployment - deployment tracking info. static because it should not change once the
* instance is running.
* @param airbyteRole
* @param trackingIdentitySupplier - how we get the identity of the user. we have a supplier,
* because we if the identity updates over time (which happens during initial setup), we
* always want the most recent info.
* @param trackingIdentityFetcher - how we get the identity of the user. we have a function that
* takes in workspaceId and returns the tracking identity. it does not have any caching as
* email or other fields on the identity can change over time.
* @return tracking client
*/
@VisibleForTesting
static TrackingClient createTrackingClient(final Configs.TrackingStrategy trackingStrategy,
final Deployment deployment,
final String airbyteRole,
final Supplier<TrackingIdentity> trackingIdentitySupplier) {
final Function<UUID, TrackingIdentity> trackingIdentityFetcher) {
return switch (trackingStrategy) {
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier, deployment, airbyteRole);
case LOGGING -> new LoggingTrackingClient(trackingIdentitySupplier);
case SEGMENT -> new SegmentTrackingClient(trackingIdentityFetcher, deployment, airbyteRole);
case LOGGING -> new LoggingTrackingClient(trackingIdentityFetcher);
default -> throw new IllegalStateException("unrecognized tracking strategy");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.config.Configs.WorkerEnvironment;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -47,7 +48,9 @@ class SegmentTrackingClientTest {
private static final String AIRBYTE_VERSION = "dev";
private static final Deployment DEPLOYMENT = new Deployment(DeploymentMode.OSS, UUID.randomUUID(), WorkerEnvironment.DOCKER);
private static final String EMAIL = "a@airbyte.io";
private static final TrackingIdentity identity = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);
private static final TrackingIdentity IDENTITY = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), EMAIL, false, false, true);
private static final UUID WORKSPACE_ID = UUID.randomUUID();
private static final Function<UUID, TrackingIdentity> MOCK_TRACKING_IDENTITY = (workspaceId) -> IDENTITY;

private Analytics analytics;
private SegmentTrackingClient segmentTrackingClient;
Expand All @@ -58,7 +61,7 @@ class SegmentTrackingClientTest {
void setup() {
analytics = mock(Analytics.class);
roleSupplier = mock(Supplier.class);
segmentTrackingClient = new SegmentTrackingClient(() -> identity, DEPLOYMENT, null, analytics);
segmentTrackingClient = new SegmentTrackingClient(MOCK_TRACKING_IDENTITY, DEPLOYMENT, null, analytics);
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
Expand All @@ -68,7 +71,7 @@ void testIdentify() {
// manually.
ArgumentCaptor<IdentifyMessage.Builder> mockBuilder = ArgumentCaptor.forClass(IdentifyMessage.Builder.class);

segmentTrackingClient.identify();
segmentTrackingClient.identify(WORKSPACE_ID);

verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
Expand All @@ -77,24 +80,24 @@ void testIdentify() {
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
.put("subscribed_newsletter", identity.isNews())
.put("subscribed_security", identity.isSecurityUpdates())
.put("email", IDENTITY.getEmail().get())
.put("anonymized", IDENTITY.isAnonymousDataCollection())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.build();
assertEquals(identity.getCustomerId().toString(), actual.userId());
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(expectedTraits, actual.traits());
}

@Test
void testIdentifyWithRole() {
segmentTrackingClient = new SegmentTrackingClient(() -> identity, DEPLOYMENT, "role", analytics);
segmentTrackingClient = new SegmentTrackingClient((workspaceId) -> IDENTITY, DEPLOYMENT, "role", analytics);
// equals is not defined on MessageBuilder, so we need to use ArgumentCaptor to inspect each field
// manually.
ArgumentCaptor<IdentifyMessage.Builder> mockBuilder = ArgumentCaptor.forClass(IdentifyMessage.Builder.class);
when(roleSupplier.get()).thenReturn("role");

segmentTrackingClient.identify();
segmentTrackingClient.identify(WORKSPACE_ID);

verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
Expand All @@ -103,13 +106,13 @@ void testIdentifyWithRole() {
.put("deployment_mode", DEPLOYMENT.getDeploymentMode())
.put("deployment_id", DEPLOYMENT.getDeploymentId())
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
.put("subscribed_newsletter", identity.isNews())
.put("subscribed_security", identity.isSecurityUpdates())
.put("email", IDENTITY.getEmail().get())
.put("anonymized", IDENTITY.isAnonymousDataCollection())
.put("subscribed_newsletter", IDENTITY.isNews())
.put("subscribed_security", IDENTITY.isSecurityUpdates())
.put("airbyte_role", "role")
.build();
assertEquals(identity.getCustomerId().toString(), actual.userId());
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(expectedTraits, actual.traits());
}

Expand All @@ -118,12 +121,12 @@ void testTrack() {
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of("airbyte_version", AIRBYTE_VERSION);

segmentTrackingClient.track("jump");
segmentTrackingClient.track(WORKSPACE_ID, "jump");

verify(analytics).enqueue(mockBuilder.capture());
TrackMessage actual = mockBuilder.getValue().build();
assertEquals("jump", actual.event());
assertEquals(identity.getCustomerId().toString(), actual.userId());
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(metadata, actual.properties());
}

Expand All @@ -135,12 +138,12 @@ void testTrackWithMetadata() {
"email", EMAIL,
"airbyte_version", AIRBYTE_VERSION);

segmentTrackingClient.track("jump", metadata);
segmentTrackingClient.track(WORKSPACE_ID, "jump", metadata);

verify(analytics).enqueue(mockBuilder.capture());
final TrackMessage actual = mockBuilder.getValue().build();
assertEquals("jump", actual.event());
assertEquals(identity.getCustomerId().toString(), actual.userId());
assertEquals(IDENTITY.getCustomerId().toString(), actual.userId());
assertEquals(metadata, actual.properties());
}

Expand Down
Loading

0 comments on commit 1f58fb7

Please sign in to comment.