Skip to content

Commit

Permalink
add airbyte version to tracking (airbytehq#1073)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Nov 25, 2020
1 parent e4db33e commit d56da58
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import com.segment.analytics.messages.IdentifyMessage;
import com.segment.analytics.messages.TrackMessage;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class SegmentTrackingClient implements TrackingClient {

private static final String SEGMENT_WRITE_KEY = "7UDdp5K55CyiGgsauOr2pNNujGvmhaeu";
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";

// Analytics is threadsafe.
private final Analytics analytics;
Expand All @@ -56,6 +58,7 @@ public SegmentTrackingClient(Supplier<TrackingIdentity> identitySupplier) {
public void identify() {
final TrackingIdentity trackingIdentity = identitySupplier.get();
final ImmutableMap.Builder<String, Object> identityMetadataBuilder = ImmutableMap.<String, Object>builder()
.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion())
.put("anonymized", trackingIdentity.isAnonymousDataCollection())
.put("subscribed_newsletter", trackingIdentity.isNews())
.put("subscribed_security", trackingIdentity.isSecurityUpdates());
Expand All @@ -73,9 +76,12 @@ public void track(String action) {

@Override
public void track(String action, Map<String, Object> metadata) {
final Map<String, Object> mapCopy = new HashMap<>(metadata);
final TrackingIdentity trackingIdentity = identitySupplier.get();
mapCopy.put(AIRBYTE_VERSION_KEY, trackingIdentity.getAirbyteVersion());
analytics.enqueue(TrackMessage.builder(action)
.userId(identitySupplier.get().getCustomerId().toString())
.properties(metadata));
.userId(trackingIdentity.getCustomerId().toString())
.properties(mapCopy));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ static void initialize(TrackingClient trackingClient) {
}
}

public static void initialize(Configs.TrackingStrategy trackingStrategy, ConfigRepository configRepository) {
initialize(createTrackingClient(trackingStrategy, () -> getTrackingIdentity(configRepository)));
public static void initialize(Configs.TrackingStrategy trackingStrategy, String airbyteVersion, ConfigRepository configRepository) {
initialize(createTrackingClient(trackingStrategy, () -> getTrackingIdentity(configRepository, airbyteVersion)));
}

// fallback on a logging client with an empty identity.
Expand All @@ -65,14 +65,15 @@ private static void initialize() {
}

@VisibleForTesting
static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository) {
static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository, String airbyteVersion) {
try {
final StandardWorkspace workspace = configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID);
String email = null;
if (workspace.getEmail() != null && workspace.getAnonymousDataCollection() != null && !workspace.getAnonymousDataCollection()) {
email = workspace.getEmail();
}
return new TrackingIdentity(
airbyteVersion,
workspace.getCustomerId(),
email,
workspace.getAnonymousDataCollection(),
Expand All @@ -96,15 +97,11 @@ static TrackingIdentity getTrackingIdentity(ConfigRepository configRepository) {
*/
@VisibleForTesting
static TrackingClient createTrackingClient(Configs.TrackingStrategy trackingStrategy, Supplier<TrackingIdentity> trackingIdentitySupplier) {

switch (trackingStrategy) {
case SEGMENT:
return new SegmentTrackingClient(trackingIdentitySupplier);
case LOGGING:
return new LoggingTrackingClient(trackingIdentitySupplier);
default:
throw new RuntimeException("unrecognized tracking strategy");
}
return switch (trackingStrategy) {
case SEGMENT -> new SegmentTrackingClient(trackingIdentitySupplier);
case LOGGING -> new LoggingTrackingClient(trackingIdentitySupplier);
default -> throw new IllegalStateException("unrecognized tracking strategy");
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,36 @@

public class TrackingIdentity {

private final String airbyteVersion;
private final UUID customerId;
private final String email;
private final Boolean anonymousDataCollection;
private final Boolean news;
private final Boolean securityUpdates;

public static TrackingIdentity empty() {
return new TrackingIdentity(null, null, null, null, null);
return new TrackingIdentity(null, null, null, null, null, null);
}

public TrackingIdentity(UUID customerId, String email, Boolean anonymousDataCollection, Boolean news, Boolean securityUpdates) {
public TrackingIdentity(
final String airbyteVersion,
final UUID customerId,
final String email,
final Boolean anonymousDataCollection,
final Boolean news,
final Boolean securityUpdates) {
this.airbyteVersion = airbyteVersion;
this.customerId = customerId;
this.email = email;
this.anonymousDataCollection = anonymousDataCollection;
this.news = news;
this.securityUpdates = securityUpdates;
}

public String getAirbyteVersion() {
return airbyteVersion;
}

public UUID getCustomerId() {
return customerId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@

class SegmentTrackingClientTest {

private static final TrackingIdentity identity = new TrackingIdentity(UUID.randomUUID(), "a@airbyte.io", false, false, true);
private static final String AIRBYTE_VERSION = "dev";
private static final TrackingIdentity identity = new TrackingIdentity(AIRBYTE_VERSION, UUID.randomUUID(), "a@airbyte.io", false, false, true);

private Analytics analytics;
private SegmentTrackingClient segmentTrackingClient;
Expand All @@ -63,6 +64,7 @@ void testIdentify() {
verify(analytics).enqueue(mockBuilder.capture());
final IdentifyMessage actual = mockBuilder.getValue().build();
final Map<String, Object> expectedTraits = ImmutableMap.<String, Object>builder()
.put("airbyte_version", AIRBYTE_VERSION)
.put("email", identity.getEmail().get())
.put("anonymized", identity.isAnonymousDataCollection())
.put("subscribed_newsletter", identity.isNews())
Expand All @@ -74,25 +76,29 @@ void testIdentify() {

@Test
void testTrack() {
ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of("airbyte_version", AIRBYTE_VERSION);

segmentTrackingClient.track("jump");

verify(analytics).enqueue(mockBuilder.capture());
TrackMessage actual = mockBuilder.getValue().build();
assertEquals("jump", actual.event());
assertEquals(identity.getCustomerId().toString(), actual.userId());
assertEquals(metadata, actual.properties());
}

@Test
void testTrackWithMetadata() {
ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of("height", "80 meters");
final ArgumentCaptor<TrackMessage.Builder> mockBuilder = ArgumentCaptor.forClass(TrackMessage.Builder.class);
final ImmutableMap<String, Object> metadata = ImmutableMap.of(
"height", "80 meters",
"airbyte_version", AIRBYTE_VERSION);

segmentTrackingClient.track("jump", metadata);

verify(analytics).enqueue(mockBuilder.capture());
TrackMessage actual = mockBuilder.getValue().build();
final TrackMessage actual = mockBuilder.getValue().build();
assertEquals("jump", actual.event());
assertEquals(identity.getCustomerId().toString(), actual.userId());
assertEquals(metadata, actual.properties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

class TrackingClientSingletonTest {

private static final String AIRBYTE_VERSION = "dev";

private ConfigRepository configRepository;

@BeforeEach
Expand Down Expand Up @@ -81,8 +83,8 @@ void testGetTrackingIdentityInitialSetupNotComplete() throws JsonValidationExcep

when(configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID)).thenReturn(workspace);

final TrackingIdentity actual = TrackingClientSingleton.getTrackingIdentity(configRepository);
final TrackingIdentity expected = new TrackingIdentity(workspace.getCustomerId(), null, null, null, null);
final TrackingIdentity actual = TrackingClientSingleton.getTrackingIdentity(configRepository, AIRBYTE_VERSION);
final TrackingIdentity expected = new TrackingIdentity(AIRBYTE_VERSION, workspace.getCustomerId(), null, null, null, null);

assertEquals(expected, actual);
}
Expand All @@ -98,8 +100,8 @@ void testGetTrackingIdentityNonAnonymous() throws JsonValidationException, IOExc

when(configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID)).thenReturn(workspace);

final TrackingIdentity actual = TrackingClientSingleton.getTrackingIdentity(configRepository);
final TrackingIdentity expected = new TrackingIdentity(workspace.getCustomerId(), workspace.getEmail(), false, true, true);
final TrackingIdentity actual = TrackingClientSingleton.getTrackingIdentity(configRepository, AIRBYTE_VERSION);
final TrackingIdentity expected = new TrackingIdentity(AIRBYTE_VERSION, workspace.getCustomerId(), workspace.getEmail(), false, true, true);

assertEquals(expected, actual);
}
Expand All @@ -115,8 +117,8 @@ void testGetTrackingIdentityAnonymous() throws JsonValidationException, IOExcept

when(configRepository.getStandardWorkspace(PersistenceConstants.DEFAULT_WORKSPACE_ID)).thenReturn(workspace);

final TrackingIdentity actual = TrackingClientSingleton.getTrackingIdentity(configRepository);
final TrackingIdentity expected = new TrackingIdentity(workspace.getCustomerId(), null, true, true, true);
final TrackingIdentity actual = TrackingClientSingleton.getTrackingIdentity(configRepository, AIRBYTE_VERSION);
final TrackingIdentity expected = new TrackingIdentity(AIRBYTE_VERSION, workspace.getCustomerId(), null, true, true, true);

assertEquals(expected, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

public interface Configs {

String getAirbyteVersion();

Path getConfigRoot();

Path getWorkspaceRoot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class EnvConfigs implements Configs {

private static final Logger LOGGER = LoggerFactory.getLogger(EnvConfigs.class);

public static final String AIRBYTE_VERSION = "AIRBYTE_VERSION";
public static final String WORKSPACE_ROOT = "WORKSPACE_ROOT";
public static final String WORKSPACE_DOCKER_MOUNT = "WORKSPACE_DOCKER_MOUNT";
public static final String LOCAL_ROOT = "LOCAL_ROOT";
Expand All @@ -57,6 +58,11 @@ public EnvConfigs() {
this.getEnv = getEnv;
}

@Override
public String getAirbyteVersion() {
return getEnsureEnv(AIRBYTE_VERSION);
}

@Override
public Path getConfigRoot() {
return getPath(CONFIG_ROOT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ void ensureGetEnvBehavior() {
Assertions.assertNull(System.getenv("MY_RANDOM_VAR_1234"));
}

@Test
void testAirbyteVersion() {
when(function.apply(EnvConfigs.AIRBYTE_VERSION)).thenReturn(null);
Assertions.assertThrows(IllegalArgumentException.class, () -> config.getAirbyteVersion());

when(function.apply(EnvConfigs.AIRBYTE_VERSION)).thenReturn("dev");
Assertions.assertEquals("dev", config.getAirbyteVersion());
}

@Test
void testWorkspaceRoot() {
when(function.apply(EnvConfigs.WORKSPACE_ROOT)).thenReturn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -65,35 +64,26 @@ public class SchedulerApp {
private static final long JOB_SUBMITTER_DELAY_MILLIS = 5000L;
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build();

private final Database database;
private final Path configRoot;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
private final TrackingStrategy trackingStrategy;
private final SchedulerPersistence schedulerPersistence;
private final ConfigRepository configRepository;

public SchedulerApp(Database database,
Path configRoot,
Path workspaceRoot,
public SchedulerApp(Path workspaceRoot,
ProcessBuilderFactory pbf,
TrackingStrategy trackingStrategy) {
this.database = database;
this.configRoot = configRoot;
SchedulerPersistence schedulerPersistence,
ConfigRepository configRepository) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
this.trackingStrategy = trackingStrategy;
this.schedulerPersistence = schedulerPersistence;
this.configRepository = configRepository;
}

public void start() {
final SchedulerPersistence schedulerPersistence = new DefaultSchedulerPersistence(database);
final ConfigPersistence configPersistence = new DefaultConfigPersistence(configRoot);
final ConfigRepository configRepository = new ConfigRepository(configPersistence);
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();

final WorkerRunFactory workerRunFactory = new WorkerRunFactory(workspaceRoot, pbf);

TrackingClientSingleton.initialize(trackingStrategy, configRepository);

final JobRetrier jobRetrier = new JobRetrier(schedulerPersistence, Instant::now);
final JobScheduler jobScheduler = new JobScheduler(schedulerPersistence, configRepository);
final JobSubmitter jobSubmitter = new JobSubmitter(workerThreadPool, schedulerPersistence, configRepository, workerRunFactory);
Expand Down Expand Up @@ -132,8 +122,14 @@ public static void main(String[] args) {
configs.getLocalDockerMount(),
configs.getDockerNetwork());

final SchedulerPersistence schedulerPersistence = new DefaultSchedulerPersistence(database);
final ConfigPersistence configPersistence = new DefaultConfigPersistence(configRoot);
final ConfigRepository configRepository = new ConfigRepository(configPersistence);

TrackingClientSingleton.initialize(configs.getTrackingStrategy(), configs.getAirbyteVersion(), configRepository);

LOGGER.info("Launching scheduler...");
new SchedulerApp(database, configRoot, workspaceRoot, pbf, configs.getTrackingStrategy()).start();
new SchedulerApp(workspaceRoot, pbf, schedulerPersistence, configRepository).start();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static void main(String[] args) throws Exception {
// tracking we can associate all action with the correct anonymous id.
setCustomerIdIfNotSet(configRepository);

TrackingClientSingleton.initialize(configs.getTrackingStrategy(), configRepository);
TrackingClientSingleton.initialize(configs.getTrackingStrategy(), configs.getAirbyteVersion(), configRepository);

LOGGER.info("Creating Scheduler persistence...");
final SchedulerPersistence schedulerPersistence = new DefaultSchedulerPersistence(Databases.createPostgresDatabase(
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ services:
- WORKSPACE_DOCKER_MOUNT=${WORKSPACE_DOCKER_MOUNT}
- LOCAL_DOCKER_MOUNT=${LOCAL_DOCKER_MOUNT}
- CONFIG_ROOT=${CONFIG_ROOT}
- AIRBYTE_VERSION=${VERSION}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:${WORKSPACE_ROOT}
Expand All @@ -64,6 +65,7 @@ services:
- WAIT_HOSTS=db:5432
- CONFIG_ROOT=${CONFIG_ROOT}
- TRACKING_STRATEGY=${TRACKING_STRATEGY}
- AIRBYTE_VERSION=${VERSION}
ports:
- 8001:8001
volumes:
Expand Down

0 comments on commit d56da58

Please sign in to comment.