Skip to content

Commit

Permalink
Use CN from notifications when forcing segments sync (#694)
Browse files Browse the repository at this point in the history
  • Loading branch information
gthea authored Aug 29, 2024
2 parents 0d2f6b3 + ba445d8 commit 4b3726a
Show file tree
Hide file tree
Showing 24 changed files with 282 additions and 126 deletions.
4 changes: 2 additions & 2 deletions src/androidTest/java/helper/IntegrationHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ public static String emptyAllSegments() {
}

public static String dummyAllSegments() {
return "{\"ms\":{\"k\":[{\"n\":\"segment1\"},{\"n\":\"segment2\"}],\"cn\":null},\"ls\":{\"k\":[{\"n\":\"large-segment1\"},{\"n\":\"large-segment2\"},{\"n\":\"large-segment3\"}],\"cn\":9999999999999}}";
return "{\"ms\":{\"k\":[{\"n\":\"segment1\"},{\"n\":\"segment2\"}],\"cn\":null},\"ls\":{\"k\":[{\"n\":\"large-segment1\"},{\"n\":\"large-segment2\"},{\"n\":\"large-segment3\"}],\"cn\":1702507130121}}";
}

public static String randomizedAllSegments() {
int randIntOne = (int) (Math.random() * 100);
int randIntTwo = (int) (Math.random() * 100);
return "{\"ms\":{\"k\":[{\"n\":\"segment1\"},{\"n\":\"segment2\"}],\"cn\":null},\"ls\":{\"k\":[{\"n\":\"large-segment" + randIntOne + "\"},{\"n\":\"large-segment" + randIntTwo + "\"}],\"cn\":9999999999999}}";
return "{\"ms\":{\"k\":[{\"n\":\"segment1\"},{\"n\":\"segment2\"}],\"cn\":null},\"ls\":{\"k\":[{\"n\":\"large-segment" + randIntOne + "\"},{\"n\":\"large-segment" + randIntTwo + "\"}],\"cn\":1702507130121}}";
}

public static String dummySingleSegment(String segment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void successfulSyncOfLargeSegmentsContainsSegmentsInDatabase() {
List<String> mySegments = largeSegments.getNames();
assertEquals(3, mySegments.size());
assertTrue(mySegments.contains("large-segment1") && mySegments.contains("large-segment2") && mySegments.contains("large-segment3"));
assertEquals(9999999999999L, largeSegments.getChangeNumber().longValue());
assertEquals(1702507130121L, largeSegments.getChangeNumber().longValue());
}

@Test
Expand All @@ -134,8 +134,8 @@ public void syncOfLargeSegmentsForMultiClient() throws InterruptedException {
assertEquals(2, segmentList2.getNames().size());
assertNotEquals(segmentList1,
segmentList2);
assertEquals(9999999999999L, segmentList1.getChangeNumber().longValue());
assertEquals(9999999999999L, segmentList2.getChangeNumber().longValue());
assertEquals(1702507130121L, segmentList1.getChangeNumber().longValue());
assertEquals(1702507130121L, segmentList2.getChangeNumber().longValue());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,7 @@ public class AllSegmentsResponseParser implements HttpResponseParser<AllSegments
@Override
public AllSegmentsChange parse(String responseData) throws HttpResponseParserException {
try {
// TODO legacy endpoint support
// try {
// MySegmentsResponse mySegmentsResponse = Json.fromJson(responseData, MySegmentsResponse.class);
// Logger.d("Parsed legacy segments response");
// return new AllSegmentsChange(mySegmentsResponse.getSegments());
// } catch (Exception e) {

return Json.fromJson(responseData, AllSegmentsChange.class);
// }

} catch (JsonSyntaxException e) {
throw new HttpResponseParserException("Syntax error parsing my large segments http response: " + e.getLocalizedMessage());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.split.android.client.service.mysegments;

public class MySegmentUpdateParams {

private final Long mSyncDelay;
private final Long mTargetSegmentsCn;
private final Long mTargetLargeSegmentsCn;

public MySegmentUpdateParams(Long syncDelay, Long targetSegmentsCn, Long targetLargeSegmentsCn) {
mSyncDelay = syncDelay;
mTargetSegmentsCn = targetSegmentsCn;
mTargetLargeSegmentsCn = targetLargeSegmentsCn;
}

public Long getSyncDelay() {
return mSyncDelay;
}

public Long getTargetSegmentsCn() {
return mTargetSegmentsCn;
}

public Long getTargetLargeSegmentsCn() {
return mTargetLargeSegmentsCn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.split.android.client.storage.mysegments.MySegmentsStorage;
import io.split.android.client.telemetry.model.OperationType;
import io.split.android.client.telemetry.storage.TelemetryRuntimeProducer;
import io.split.android.client.utils.Utils;
import io.split.android.client.utils.logger.Logger;

public class MySegmentsSyncTask implements SplitTask {
Expand Down Expand Up @@ -73,6 +74,7 @@ public MySegmentsSyncTask(@NonNull HttpFetcher<AllSegmentsChange> mySegmentsFetc
myLargeSegmentsStorage,
avoidCache,
eventsManager,
new MySegmentsChangeChecker(),
telemetryRuntimeProducer,
config,
targetSegmentsChangeNumber,
Expand All @@ -87,6 +89,7 @@ public MySegmentsSyncTask(@NonNull HttpFetcher<AllSegmentsChange> mySegmentsFetc
@NonNull MySegmentsStorage myLargeSegmentsStorage,
boolean avoidCache,
SplitEventsManager eventsManager,
MySegmentsChangeChecker mySegmentsChangeChecker,
@NonNull TelemetryRuntimeProducer telemetryRuntimeProducer,
@NonNull MySegmentsSyncTaskConfig config,
@Nullable Long targetSegmentsChangeNumber,
Expand All @@ -98,7 +101,7 @@ public MySegmentsSyncTask(@NonNull HttpFetcher<AllSegmentsChange> mySegmentsFetc
mMyLargeSegmentsStorage = checkNotNull(myLargeSegmentsStorage);
mAvoidCache = avoidCache;
mEventsManager = eventsManager;
mMySegmentsChangeChecker = new MySegmentsChangeChecker();
mMySegmentsChangeChecker = mySegmentsChangeChecker;
mTelemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
mTaskType = config.getTaskType();
mUpdateEvent = config.getUpdateEvent();
Expand Down Expand Up @@ -172,21 +175,31 @@ private void fetch(int initialRetries) throws HttpFetcherException, InterruptedE
private Map<String, Object> getParams(boolean addTill) {
Map<String, Object> params = new HashMap<>();
if (addTill) {
params.put(TILL_PARAM, mTargetLargeSegmentsChangeNumber);
long segmentsTarget = Utils.getOrDefault(mTargetSegmentsChangeNumber, -1L);
long largeSegmentsTarget = Utils.getOrDefault(mTargetLargeSegmentsChangeNumber, -1L);
params.put(TILL_PARAM, Math.max(segmentsTarget, largeSegmentsTarget));
}

return params;
}

private boolean isStaleResponse(AllSegmentsChange response) {
boolean checkSegments = mTargetSegmentsChangeNumber != null && mTargetSegmentsChangeNumber != -1;
boolean checkLargeSegments = mTargetLargeSegmentsChangeNumber != null && mTargetLargeSegmentsChangeNumber != -1;
boolean checkSegments = Utils.getOrDefault(mTargetSegmentsChangeNumber, -1L) != -1;
boolean checkLargeSegments = Utils.getOrDefault(mTargetLargeSegmentsChangeNumber, -1L) != -1;

boolean segmentsTargetMatched = !checkSegments ||
response.getSegmentsChange() != null && mTargetSegmentsChangeNumber.equals(response.getSegmentsChange().getChangeNumber());
boolean largeSegmentsTargetMatched = !checkLargeSegments ||
response.getLargeSegmentsChange() != null && mTargetLargeSegmentsChangeNumber.equals(response.getLargeSegmentsChange().getChangeNumber());

if (!segmentsTargetMatched) {
Logger.v("Segments target change number not matched. Expected: " + mTargetSegmentsChangeNumber + " - Actual: " + response.getSegmentsChange().getChangeNumber());
}

if (!largeSegmentsTargetMatched) {
Logger.v("Large segments target change number not matched. Expected: " + mTargetLargeSegmentsChangeNumber + " - Actual: " + response.getLargeSegmentsChange().getChangeNumber());
}

return !segmentsTargetMatched || !largeSegmentsTargetMatched;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface MySegmentsTaskFactory {

MySegmentsSyncTask createMySegmentsSyncTask(boolean avoidCache);
MySegmentsSyncTask createMySegmentsSyncTask(boolean avoidCache, Long targetSegmentsCn, Long targetLargeSegmentsCn);

LoadMySegmentsTask createLoadMySegmentsTask();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ public MySegmentsTaskFactoryImpl(@NonNull MySegmentsTaskFactoryConfiguration con
}

@Override
public MySegmentsSyncTask createMySegmentsSyncTask(boolean avoidCache) {
public MySegmentsSyncTask createMySegmentsSyncTask(boolean avoidCache, Long targetSegmentsCn, Long targetLargeSegmentsCn) {
return new MySegmentsSyncTask(mConfiguration.getHttpFetcher(),
mConfiguration.getMySegmentsStorage(),
mConfiguration.getMyLargeSegmentsStorage(),
avoidCache,
mConfiguration.getEventsManager(),
mTelemetryRuntimeProducer,
mConfiguration.getMySegmentsSyncTaskConfig(),
null, // TODO
null); // TODO
targetSegmentsCn,
targetLargeSegmentsCn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.service.executor.SplitTaskExecutor;
import io.split.android.client.service.mysegments.MySegmentUpdateParams;
import io.split.android.client.service.sseclient.notifications.MyLargeSegmentChangeNotification;
import io.split.android.client.service.sseclient.notifications.MySegmentsV2PayloadDecoder;
import io.split.android.client.service.sseclient.notifications.NotificationParser;
Expand All @@ -17,7 +18,7 @@ public class MyLargeSegmentsNotificationProcessorImpl implements MyLargeSegments

private final MySegmentsNotificationProcessorHelper mProcessorHelper;
private final String mUserKey;
private final BlockingQueue<Long> mNotificationQueue;
private final BlockingQueue<MySegmentUpdateParams> mNotificationQueue;
private final SyncDelayCalculator mSyncDelayCalculator;

public MyLargeSegmentsNotificationProcessorImpl(@NonNull NotificationParser notificationParser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import io.split.android.client.service.mysegments.MySegmentUpdateParams;
import io.split.android.client.service.mysegments.MySegmentsTaskFactory;

public class MySegmentsNotificationProcessorConfiguration {

private final MySegmentsTaskFactory mMySegmentsTaskFactory;
private final BlockingQueue<Long> mMySegmentUpdateNotificationsQueue;
private final BlockingQueue<MySegmentUpdateParams> mMySegmentUpdateNotificationsQueue;
private final String mUserKey;
private final BigInteger mHashedUserKey;

public MySegmentsNotificationProcessorConfiguration(@NonNull MySegmentsTaskFactory mySegmentsTaskFactory,
@NonNull BlockingQueue<Long> mySegmentUpdateNotificationsQueue,
@NonNull LinkedBlockingDeque<MySegmentUpdateParams> mySegmentUpdateNotificationsQueue,
@NonNull String userKey,
@NonNull BigInteger hashedUserKey) {
mMySegmentsTaskFactory = checkNotNull(mySegmentsTaskFactory);
Expand All @@ -30,7 +32,7 @@ public MySegmentsTaskFactory getMySegmentsTaskFactory() {
return mMySegmentsTaskFactory;
}

public BlockingQueue<Long> getMySegmentUpdateNotificationsQueue() {
public BlockingQueue<MySegmentUpdateParams> getMySegmentUpdateNotificationsQueue() {
return mMySegmentUpdateNotificationsQueue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.split.android.client.common.CompressionType;
import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.service.executor.SplitTaskExecutor;
import io.split.android.client.service.mysegments.MySegmentUpdateParams;
import io.split.android.client.service.mysegments.MySegmentsUpdateTask;
import io.split.android.client.service.sseclient.notifications.KeyList;
import io.split.android.client.service.sseclient.notifications.MySegmentUpdateStrategy;
Expand Down Expand Up @@ -34,20 +35,20 @@ public MySegmentsNotificationProcessorHelper(NotificationParser notificationPars
mConfiguration = configuration;
}

void processMySegmentsUpdate(MySegmentUpdateStrategy updateStrategy, String data, CompressionType compression, Set<String> segmentNames, Long changeNumber, BlockingQueue<Long> notificationsQueue, long syncDelay) {
void processMySegmentsUpdate(MySegmentUpdateStrategy updateStrategy, String data, CompressionType compression, Set<String> segmentNames, Long changeNumber, BlockingQueue<MySegmentUpdateParams> notificationsQueue, long syncDelay) {
processUpdate(NotificationType.MY_SEGMENTS_UPDATE_V2, updateStrategy, data, compression, segmentNames, changeNumber, notificationsQueue, syncDelay);
}

void processMyLargeSegmentsUpdate(MySegmentUpdateStrategy updateStrategy, String data, CompressionType compression, Set<String> segmentNames, Long changeNumber, BlockingQueue<Long> notificationsQueue, long syncDelay) {
void processMyLargeSegmentsUpdate(MySegmentUpdateStrategy updateStrategy, String data, CompressionType compression, Set<String> segmentNames, Long changeNumber, BlockingQueue<MySegmentUpdateParams> notificationsQueue, long syncDelay) {
processUpdate(NotificationType.MY_LARGE_SEGMENT_UPDATE, updateStrategy, data, compression, segmentNames, changeNumber, notificationsQueue, syncDelay);
}

private void processUpdate(NotificationType notificationType, MySegmentUpdateStrategy updateStrategy, String data, CompressionType compression, Set<String> segmentNames, Long changeNumber, BlockingQueue<Long> notificationsQueue, long syncDelay) {
private void processUpdate(NotificationType notificationType, MySegmentUpdateStrategy updateStrategy, String data, CompressionType compression, Set<String> segmentNames, Long changeNumber, BlockingQueue<MySegmentUpdateParams> notificationsQueue, long syncDelay) {
try {
switch (updateStrategy) {
case UNBOUNDED_FETCH_REQUEST:
Logger.d("Received Unbounded my segment fetch request");
notifyMySegmentRefreshNeeded(notificationsQueue, syncDelay);
notifyMySegmentRefreshNeeded(notificationsQueue, syncDelay, notificationType, changeNumber);
break;
case BOUNDED_FETCH_REQUEST:
Logger.d("Received Bounded my segment fetch request");
Expand All @@ -70,12 +71,16 @@ private void processUpdate(NotificationType notificationType, MySegmentUpdateStr
}
} catch (Exception e) {
Logger.e("Executing unbounded fetch because an error has occurred processing my "+(notificationType == NotificationType.MY_LARGE_SEGMENT_UPDATE ? "large" : "")+" segment notification: " + e.getLocalizedMessage());
notifyMySegmentRefreshNeeded(notificationsQueue, syncDelay);
notifyMySegmentRefreshNeeded(notificationsQueue, syncDelay, notificationType, changeNumber);
}
}

private void notifyMySegmentRefreshNeeded(BlockingQueue<Long> notificationsQueue, long syncDelay) {
notificationsQueue.offer(syncDelay);
private void notifyMySegmentRefreshNeeded(BlockingQueue<MySegmentUpdateParams> notificationsQueue, long syncDelay, NotificationType notificationType, Long changeNumber) {
Long targetSegmentsCn = (notificationType == NotificationType.MY_LARGE_SEGMENT_UPDATE) ? null : changeNumber;
Long targetLargeSegmentsCn = (notificationType == NotificationType.MY_LARGE_SEGMENT_UPDATE) ? changeNumber : null;

//noinspection ResultOfMethodCallIgnored
notificationsQueue.offer(new MySegmentUpdateParams(syncDelay, targetSegmentsCn, targetLargeSegmentsCn));
}

private void removeSegment(NotificationType notificationType, Set<String> segmentNames, Long changeNumber) {
Expand All @@ -92,8 +97,8 @@ private void removeSegment(NotificationType notificationType, Set<String> segmen
private void executeBoundedFetch(byte[] keyMap, long syncDelay) {
int index = mMySegmentsPayloadDecoder.computeKeyIndex(mConfiguration.getHashedUserKey(), keyMap.length);
if (mMySegmentsPayloadDecoder.isKeyInBitmap(keyMap, index)) {
Logger.d("Executing Unbounded my segment fetch request");
notifyMySegmentRefreshNeeded(mConfiguration.getMySegmentUpdateNotificationsQueue(), syncDelay);
Logger.d("Executing Bounded my segment fetch request");
notifyMySegmentRefreshNeeded(mConfiguration.getMySegmentUpdateNotificationsQueue(), syncDelay, NotificationType.MY_SEGMENTS_UPDATE_V2, null); // TODO
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.split.android.client.dtos.SegmentsChange;
import io.split.android.client.service.ServiceConstants;
import io.split.android.client.service.executor.SplitTaskExecutor;
import io.split.android.client.service.mysegments.MySegmentUpdateParams;
import io.split.android.client.service.mysegments.MySegmentsOverwriteTask;
import io.split.android.client.service.sseclient.notifications.MySegmentChangeNotification;
import io.split.android.client.service.sseclient.notifications.MySegmentChangeV2Notification;
Expand Down Expand Up @@ -45,7 +46,8 @@ public MySegmentsNotificationProcessorImpl(@NonNull NotificationParser notificat
@Override
public void processMySegmentsUpdate(MySegmentChangeNotification notification) {
if (!notification.isIncludesPayload()) {
mConfiguration.getMySegmentUpdateNotificationsQueue().offer(ServiceConstants.NO_INITIAL_DELAY);
//noinspection ResultOfMethodCallIgnored
mConfiguration.getMySegmentUpdateNotificationsQueue().offer(new MySegmentUpdateParams(ServiceConstants.NO_INITIAL_DELAY, null, null));
} else {
Set<String> segmentList = notification.getSegmentList() != null ? new HashSet<>(notification.getSegmentList()) : new HashSet<>();
MySegmentsOverwriteTask task = mConfiguration.getMySegmentsTaskFactory()
Expand All @@ -60,7 +62,8 @@ public void processMySegmentsUpdateV2(MySegmentChangeV2Notification notification
notification.getData(),
notification.getCompression(),
Collections.singleton(notification.getSegmentName()),
notification.getChangeNumber(), mConfiguration.getMySegmentUpdateNotificationsQueue(),
notification.getChangeNumber(),
mConfiguration.getMySegmentUpdateNotificationsQueue(),
ServiceConstants.NO_INITIAL_DELAY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.concurrent.BlockingQueue;

import io.split.android.client.service.mysegments.MySegmentUpdateParams;
import io.split.android.client.service.synchronizer.mysegments.MySegmentsSynchronizer;
import io.split.android.client.utils.logger.Logger;

Expand All @@ -15,11 +16,11 @@
public class MySegmentsUpdateWorker extends UpdateWorker {

private final MySegmentsSynchronizer mSynchronizer;
private final BlockingQueue<Long> mNotificationsQueue;
private final BlockingQueue<MySegmentUpdateParams> mNotificationsQueue;

public MySegmentsUpdateWorker(
@NonNull MySegmentsSynchronizer synchronizer,
@NonNull BlockingQueue<Long> notificationsQueue) {
@NonNull BlockingQueue<MySegmentUpdateParams> notificationsQueue) {
super();
mSynchronizer = checkNotNull(synchronizer);
mNotificationsQueue = checkNotNull(notificationsQueue);
Expand All @@ -28,8 +29,8 @@ public MySegmentsUpdateWorker(
@Override
protected void onWaitForNotificationLoop() throws InterruptedException {
try {
Long syncDelay = mNotificationsQueue.take();
mSynchronizer.forceMySegmentsSync(syncDelay);
MySegmentUpdateParams params = mNotificationsQueue.take();
mSynchronizer.forceMySegmentsSync(params);
Logger.d("A new notification to update segments has been received. " +
"Enqueuing polling task.");
} catch (InterruptedException e) {
Expand Down
Loading

0 comments on commit 4b3726a

Please sign in to comment.