Skip to content

Made entity export threads configurable #620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 30, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 81 additions & 10 deletions stack/tools/src/main/java/org/apache/usergrid/tools/Export.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
Expand All @@ -59,7 +60,6 @@
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
Expand All @@ -78,6 +78,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.base.Optional;
import com.google.common.collect.BiMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

Expand All @@ -89,7 +90,8 @@
public class Export extends ExportingToolBase {

static final Logger logger = LoggerFactory.getLogger( Export.class );
public static final String LAST_ID = "lastId";
private static final String ENTITY_FETCHER_THREADS = "entityFetchThreads";
private static final String ENTITY_MEMBER_FETCHER_MULT = "entityThreadMult";


@Autowired
Expand All @@ -100,11 +102,23 @@ public class Export extends ExportingToolBase {
private AllEntityIdsObservable allEntityIdsObs;
private SimpleEdge lastEdge = null;

//number of threads for fetching entity contents. Each thread will handle a batch of 1000 entity ids
private int entityFetcherThreads = 50;
//after an individual entity is fetched, the entity members like assets, connections etc need to be fetched
//depending on how heavy the assets/connections might be, we might need to multiply the factor so that more threads are allocated
//for pulling the members quickly without the queue backing up.
private int entityMemberFetcherMultiplier = 1;


//TODO : Add blocking queues for these executors where appropriate
private ExecutorService orgAppCollParallelizer = Executors.newFixedThreadPool(3);
private ExecutorService entityFetcher = Executors.newFixedThreadPool(10);
private ExecutorService enitityMemberFetcher = Executors.newFixedThreadPool(10);
private ExecutorService assetsFetcher = Executors.newFixedThreadPool(10);
private ExecutorService orgAppCollParallelizer;

//fetches the entity content
private ExecutorService entityFetcher;
//fetches the entity members like connections etc for a given entity
private ExecutorService entityMemberFetcher;
//fetches the assets for a given entity
private ExecutorService assetsFetcher;


@Override
Expand All @@ -113,13 +127,70 @@ public Options createOptions() {

Options options = super.createOptions();

Option lastId = OptionBuilder.withArgName( LAST_ID ).hasArg()
.withDescription( "Last Entity Id to resume from" ).create( LAST_ID );
options.addOption( lastId);

Option entityFetcherThreads = OptionBuilder.withArgName( ENTITY_FETCHER_THREADS ).hasArg()
.withDescription( "Number of threads to fetch entities in parallel (defaults to 50)" ).create( ENTITY_FETCHER_THREADS );
options.addOption( entityFetcherThreads);

Option entityMemberFetcherMultiplier = OptionBuilder.withArgName( ENTITY_MEMBER_FETCHER_MULT ).hasArg()
.withDescription( "This defines the number of threads for fetching entity members like assets/collections by multiplying the number of entity fetcher threads. Defaults to 1" ).create( ENTITY_MEMBER_FETCHER_MULT );
options.addOption( entityMemberFetcherMultiplier);

return options;
}

@Override
protected void validateOptions(CommandLine line) throws MissingOptionException {
super.validateOptions(line);

if (line.hasOption(ENTITY_FETCHER_THREADS)) {
try {
Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));
} catch (NumberFormatException e) {
throw new MissingOptionException("Entity fetcher threads need to be a positive integer");
}
}

if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
try {
Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));
} catch (NumberFormatException e) {
throw new MissingOptionException("Entity member thread multiplier needs to be a positive integer");
}
}

}

@Override
protected void applyExportParams(CommandLine line) {

super.applyExportParams(line);

if (line.hasOption(ENTITY_FETCHER_THREADS)) {
entityFetcherThreads = Integer.parseInt(line.getOptionValue(ENTITY_FETCHER_THREADS));

if (entityFetcherThreads < 1) {
entityFetcherThreads = 50;
}
}

if (line.hasOption(ENTITY_MEMBER_FETCHER_MULT)) {
entityMemberFetcherMultiplier = Integer.parseInt(line.getOptionValue(ENTITY_MEMBER_FETCHER_MULT));

if (entityMemberFetcherMultiplier < 1) {
entityMemberFetcherMultiplier = 1;
}
if (entityMemberFetcherMultiplier > 5) {
entityMemberFetcherMultiplier = 5;
}
}

orgAppCollParallelizer = Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("OrgAppColl-Parallelizer-%d").build());
entityFetcher = Executors.newFixedThreadPool(entityFetcherThreads, new ThreadFactoryBuilder().setNameFormat("Export-EntityFetcher-%d").build());
entityMemberFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-EntityMemberFetcher-%d").build());
assetsFetcher = Executors.newFixedThreadPool(entityFetcherThreads * entityMemberFetcherMultiplier, new ThreadFactoryBuilder().setNameFormat("Export-AssetFetcher-%d").build());

}

@Override
public void runTool( CommandLine line ) throws Exception {
Expand Down Expand Up @@ -432,7 +503,7 @@ private void extractEntityIdsForCollection(File collectionDir, UUID applicationI

ConnectableObservable<Results> entityObs = Observable.just(entities)
.publish();
entityObs.subscribeOn(Schedulers.from(enitityMemberFetcher));
entityObs.subscribeOn(Schedulers.from(entityMemberFetcher));


// fetch and write connections
Expand Down