Skip to content

Fixed akka cluster issue to support more than 2 regions #619

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public interface ActorSystemManager {
* Publish message to all topic subscribers in all regions.
*/
void publishToAllRegions( String topic, Object message, ActorRef sender );
/**
* Publish message to all topic subscribers in local region only.
*/
void publishToLocalRegion( String topic, Object message, ActorRef sender );
/**
* Publish message to all topic subscribers in remote regions only.
*/
void publishToRemoteRegions( String topic, Object message, ActorRef sender );

void leaveCluster();
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,21 @@ public String getCurrentRegion() {
@Override
public void publishToAllRegions( String topic, Object message, ActorRef sender ) {

publishToLocalRegion(topic, message, sender);
publishToRemoteRegions(topic, message, sender);
}

@Override
public void publishToLocalRegion( String topic, Object message, ActorRef sender ) {

// send to local subscribers to topic
mediator.tell( new DistributedPubSubMediator.Publish( topic, message ), sender );

}

@Override
public void publishToRemoteRegions( String topic, Object message, ActorRef sender ) {

// send to each ClusterClient
for ( ActorRef clusterClient : clusterClientsByRegion.values() ) {
clusterClient.tell( new ClusterClient.Publish( topic, message ), sender );
Expand Down Expand Up @@ -423,7 +435,7 @@ private void createClientActors( ActorSystem system ) {
}

ActorRef clusterClient = system.actorOf( ClusterClient.props(
ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client");
ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client-"+region);

clusterClientsByRegion.put( region, clusterClient );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,17 @@ public class UniqueValueActor extends UntypedActor {
private final ActorSystemManager actorSystemManager;

private final UniqueValuesTable table;

private final UniqueValuesFig uniqueValuesFig;

private int count = 0;


@Inject
public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager ) {
public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager, UniqueValuesFig uniqueValuesFig) {

this.table = table;
this.uniqueValuesFig = uniqueValuesFig;
this.table = table;
this.actorSystemManager = actorSystemManager;
}

Expand Down Expand Up @@ -86,8 +89,12 @@ public void onReceive(Object message) {

getSender().tell( new Response( Response.Status.IS_UNIQUE, res.getConsistentHashKey() ),
getSender() );

actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );

if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", new Reservation( res ), getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", new Reservation( res ), getSelf() );
}

} catch (Throwable t) {

Expand All @@ -111,14 +118,24 @@ public void onReceive(Object message) {
// cannot reserve, somebody else owns the unique value
Response response = new Response( Response.Status.NOT_UNIQUE, con.getConsistentHashKey());
getSender().tell( response, getSender() );
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
}
return;

} else if ( owner == null ) {
// cannot commit without first reserving
Response response = new Response( Response.Status.BAD_REQUEST, con.getConsistentHashKey());
getSender().tell( response, getSender() );
actorSystemManager.publishToAllRegions( "content", response, getSelf() );

if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
}

return;
}

Expand All @@ -127,7 +144,11 @@ public void onReceive(Object message) {
Response response = new Response( Response.Status.IS_UNIQUE, con.getConsistentHashKey() );
getSender().tell( response, getSender() );

actorSystemManager.publishToAllRegions( "content", response, getSelf() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", response, getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", response, getSelf() );
}

} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, con.getConsistentHashKey() ),
Expand Down Expand Up @@ -158,7 +179,11 @@ public void onReceive(Object message) {
getSender() );

// unique value record may have already been cleaned up, also clear cache
actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
}

return;
}
Expand All @@ -170,7 +195,11 @@ public void onReceive(Object message) {
getSender().tell( new Response( Response.Status.SUCCESS, can.getConsistentHashKey() ),
getSender() );

actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
if(uniqueValuesFig.getSkipRemoteRegions()) {
actorSystemManager.publishToLocalRegion( "content", new Cancellation( can ), getSelf() );
} else {
actorSystemManager.publishToAllRegions( "content", new Cancellation( can ), getSelf() );
}

} catch (Throwable t) {
getSender().tell( new Response( Response.Status.ERROR, can.getConsistentHashKey() ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
public interface UniqueValuesFig extends GuicyFig, Serializable {

String UNIQUEVALUE_USE_CLUSTER = "collection.uniquevalues.usecluster";

String UNIQUEVALUE_SKIP_REMOTE_REGIONS = "collection.uniquevalues.skip.remote.regions";

String UNIQUEVALUE_ACTORS = "collection.uniquevalues.actors";

Expand All @@ -41,14 +43,25 @@ public interface UniqueValuesFig extends GuicyFig, Serializable {
String UNIQUEVALUE_REQUEST_TIMEOUT = "collection.uniquevalues.request.timeout";

String UNIQUEVALUE_REQUEST_RETRY_COUNT = "collection.uniquevalues.request.retrycount";




/**
* Tells Usergrid whether or not to use the Akka Cluster sytem to verify unique values ( more consistent)
* Tells Usergrid whether or not to use the Akka Cluster system to verify unique values ( more consistent)
* Setting this to false by default to avoid extra complications by default.
*/
@Key(UNIQUEVALUE_USE_CLUSTER)
@Default("true")
@Default("false")
boolean getUnqiueValueViaCluster();

/**
* Tells Usergrid to restrict UniqueValue related chatter to local Akka Cluster only. Skips remote regions
* Setting this to true by default to avoid extra complications by default.
*/
@Key(UNIQUEVALUE_SKIP_REMOTE_REGIONS)
@Default("true")
boolean getSkipRemoteRegions();

/**
* Unique Value cache TTL in seconds.
Expand Down