-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Enhance QueryRunner to support single thread, multi threads and target QPS modes. #254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,58 +19,124 @@ | |
| import java.io.File; | ||
| import java.io.FileInputStream; | ||
| import java.io.FileReader; | ||
| import java.lang.reflect.Field; | ||
| import java.util.List; | ||
| import java.util.Random; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import org.apache.commons.io.IOUtils; | ||
| import org.json.JSONObject; | ||
| import org.kohsuke.args4j.CmdLineParser; | ||
| import org.kohsuke.args4j.Option; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| /** | ||
| * Simple code to run queries against a server | ||
| * USAGE: QueryRunner QueryFile ({BrokerHost} {BrokerPort} <single-threaded/multi-threaded>) | ||
| */ | ||
|
|
||
| public class QueryRunner { | ||
| private QueryRunner() { | ||
| } | ||
| @Option(name = "-queryFile", required = true, usage = "query file path") | ||
| private String _queryFile; | ||
| @Option(name = "-mode", required = true, usage = "query runner mode (singleThread|multiThreads|targetQPS)") | ||
| private String _mode; | ||
| @Option(name = "-numThreads", required = false, | ||
| usage = "number of threads sending queries for multiThread mode and targetQPS mode") | ||
| private int _numThreads; | ||
| @Option(name = "-startQPS", required = false, usage = "start QPS for targetQPS mode") | ||
| private double _startQPS; | ||
| @Option(name = "-deltaQPS", required = false, usage = "delta QPS for targetQPS mode") | ||
| private double _deltaQPS; | ||
| @Option(name = "-brokerHost", required = false, usage = "broker host name (default: localhost)") | ||
| private String _brokerHost = "localhost"; | ||
| @Option(name = "-brokerPort", required = false, usage = "broker port number (default: 8099)") | ||
| private String _brokerPort = "8099"; | ||
| @Option(name = "-help", required = false, help = true, aliases = { "-h" }, usage = "print this message") | ||
| private boolean _help; | ||
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class); | ||
| private static final int MILLIS_PER_SECOND = 1000; | ||
|
|
||
| /** | ||
| * Use single thread to run queries as fast as possible. | ||
| * | ||
| * Use a single thread to send queries back to back and log statistic information periodically. | ||
| * | ||
| * @param conf perf benchmark driver config. | ||
| * @param queryFile query file. | ||
| * @throws Exception | ||
| */ | ||
| public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, String queryFile) | ||
| throws Exception { | ||
| final PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); | ||
|
|
||
| try (BufferedReader bufferedReader = new BufferedReader(new FileReader(queryFile))) { | ||
| int numQueries = 0; | ||
| int totalServerTime = 0; | ||
| int totalBrokerTime = 0; | ||
| int totalClientTime = 0; | ||
|
|
||
| String query; | ||
| while ((query = bufferedReader.readLine()) != null) { | ||
| long startTime = System.currentTimeMillis(); | ||
| JSONObject response = driver.postQuery(query); | ||
| numQueries++; | ||
| totalClientTime += System.currentTimeMillis() - startTime; | ||
| totalServerTime += response.getLong("timeUsedMs"); | ||
| totalBrokerTime += response.getLong("totalTime"); | ||
|
|
||
| if (numQueries % 1000 == 0) { | ||
| LOGGER.info( | ||
| "Processed {} Queries, Total Server Time: {}ms, Total Broker Time: {}ms, Total Client Time : {}ms.", | ||
| numQueries, totalServerTime, totalBrokerTime, totalClientTime); | ||
| } | ||
| } | ||
|
|
||
| LOGGER.info("Processed {} Queries, Total Server Time: {}ms, Total Broker Time: {}ms, Total Client Time : {}ms.", | ||
| numQueries, totalServerTime, totalBrokerTime, totalClientTime); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Use multiple threads to run queries as fast as possible. | ||
| * | ||
| * Start {numThreads} worker threads to send queries (blocking call) back to back, and use the main thread to collect | ||
| * the statistic information and log them periodically. | ||
| * | ||
| * @param conf perf benchmark driver config. | ||
| * @param queryFile query file. | ||
| * @param numThreads number of threads sending queries. | ||
| * @throws Exception | ||
| */ | ||
| @SuppressWarnings("InfiniteLoopStatement") | ||
| public static void multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, String queryFile) | ||
| public static void multiThreadedsQueryRunner(PerfBenchmarkDriverConf conf, String queryFile, int numThreads) | ||
| throws Exception { | ||
| final long randomSeed = 123456789L; | ||
| final int numClients = 10; | ||
| final Random random = new Random(randomSeed); | ||
| final int reportIntervalMillis = 3000; | ||
|
|
||
| final List<String> queries; | ||
| try (FileInputStream input = new FileInputStream(new File(queryFile))) { | ||
| queries = IOUtils.readLines(input); | ||
| } | ||
|
|
||
| final int queryNum = queries.size(); | ||
| final int numQueries = queries.size(); | ||
| final PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); | ||
| final Random random = new Random(randomSeed); | ||
| final AtomicInteger counter = new AtomicInteger(0); | ||
| final AtomicLong totalResponseTime = new AtomicLong(0L); | ||
| final ExecutorService executorService = Executors.newFixedThreadPool(numClients); | ||
| final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); | ||
|
|
||
| for (int i = 0; i < numClients; i++) { | ||
| for (int i = 0; i < numThreads; i++) { | ||
| executorService.submit(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| while (true) { | ||
| String query = queries.get(random.nextInt(queryNum)); | ||
| long start = System.currentTimeMillis(); | ||
| String query = queries.get(random.nextInt(numQueries)); | ||
| long startTime = System.currentTimeMillis(); | ||
| try { | ||
| driver.postQuery(query); | ||
| counter.getAndIncrement(); | ||
| totalResponseTime.getAndAdd(System.currentTimeMillis() - start); | ||
| totalResponseTime.getAndAdd(System.currentTimeMillis() - startTime); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Caught exception while running query: {}", query, e); | ||
| return; | ||
|
|
@@ -91,92 +157,168 @@ public void run() { | |
| } | ||
| } | ||
|
|
||
| public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, String queryFile) | ||
| /** | ||
| * Use multiple threads to run query at an increasing target QPS. | ||
| * | ||
| * Use a concurrent linked queue to buffer the queries to be sent. Use the main thread to insert queries into the | ||
| * queue at the target QPS, and start {numThreads} worker threads to fetch queries from the queue and send them. | ||
| * We start with the start QPS, and keep adding delta QPS to the start QPS during the test. The main thread is | ||
| * responsible for collecting the statistic information and log them periodically. | ||
| * | ||
| * @param conf perf benchmark driver config. | ||
| * @param queryFile query file. | ||
| * @param numThreads number of threads sending queries. | ||
| * @param startQPS start QPS | ||
| * @param deltaQPS delta QPS | ||
| * @throws Exception | ||
| */ | ||
| @SuppressWarnings("InfiniteLoopStatement") | ||
| public static void targetQPSQueryRunner(PerfBenchmarkDriverConf conf, String queryFile, int numThreads, | ||
| double startQPS, double deltaQPS) | ||
| throws Exception { | ||
| File file = new File(queryFile); | ||
| FileReader fileReader = new FileReader(file); | ||
| BufferedReader bufferedReader = new BufferedReader(fileReader); | ||
| String query; | ||
| final long randomSeed = 123456789L; | ||
| final Random random = new Random(randomSeed); | ||
| final int timePerTargetQPSMillis = 60000; | ||
| final int queueLengthThreshold = Math.max(20, (int) startQPS); | ||
|
|
||
| int numQueries = 0; | ||
| long totalQueryTime = 0; | ||
| long totalClientTime = 0; | ||
| final List<String> queries; | ||
| try (FileInputStream input = new FileInputStream(new File(queryFile))) { | ||
| queries = IOUtils.readLines(input); | ||
| } | ||
| final int numQueries = queries.size(); | ||
|
|
||
| while ((query = bufferedReader.readLine()) != null) { | ||
| JSONObject response = runSingleQuery(conf, query, 1); | ||
| final PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); | ||
| final AtomicInteger counter = new AtomicInteger(0); | ||
| final AtomicLong totalResponseTime = new AtomicLong(0L); | ||
| final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); | ||
|
|
||
| totalQueryTime += response.getLong("timeUsedMs"); | ||
| totalClientTime += response.getLong("totalTime"); | ||
| final ConcurrentLinkedQueue<String> queryQueue = new ConcurrentLinkedQueue<>(); | ||
| double currentQPS = startQPS; | ||
| int intervalMillis = (int) (MILLIS_PER_SECOND / currentQPS); | ||
|
|
||
| if ((numQueries > 0) && (numQueries % 1000) == 0) { | ||
| LOGGER.info( | ||
| "Processed {} queries, Total Query time: {} ms Total Client side time : {} ms.", | ||
| numQueries, totalQueryTime, totalClientTime); | ||
| for (int i = 0; i < numThreads; i++) { | ||
| executorService.submit(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| while (true) { | ||
| String query = queryQueue.poll(); | ||
| if (query == null) { | ||
| try { | ||
| Thread.sleep(1); | ||
| continue; | ||
| } catch (InterruptedException e) { | ||
| LOGGER.error("Interrupted.", e); | ||
| return; | ||
| } | ||
| } | ||
| long startTime = System.currentTimeMillis(); | ||
| try { | ||
| System.out.println(driver.postQuery(query)); | ||
| counter.getAndIncrement(); | ||
| totalResponseTime.getAndAdd(System.currentTimeMillis() - startTime); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Caught exception while running query: {}", query, e); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| LOGGER.info("Start with QPS: {}, delta QPS: {}", startQPS, deltaQPS); | ||
| while (true) { | ||
| long startTime = System.currentTimeMillis(); | ||
| while (System.currentTimeMillis() - startTime <= timePerTargetQPSMillis) { | ||
| if (queryQueue.size() > queueLengthThreshold) { | ||
| executorService.shutdownNow(); | ||
| throw new RuntimeException("Cannot achieve target QPS of: " + currentQPS); | ||
| } | ||
| queryQueue.add(queries.get(random.nextInt(numQueries))); | ||
| Thread.sleep(intervalMillis); | ||
| } | ||
| double timePassedSeconds = ((double) (System.currentTimeMillis() - startTime)) / MILLIS_PER_SECOND; | ||
| int count = counter.getAndSet(0); | ||
| double avgResponseTime = ((double) totalResponseTime.getAndSet(0)) / count; | ||
| LOGGER.info("Target QPS: {}, Interval: {}ms, Actual QPS: {}, Avg Response Time: {}ms", currentQPS, intervalMillis, | ||
| count / timePassedSeconds, avgResponseTime); | ||
|
|
||
| ++numQueries; | ||
| // Find a new interval | ||
| int newIntervalMillis; | ||
| do { | ||
| currentQPS += deltaQPS; | ||
| newIntervalMillis = (int) (MILLIS_PER_SECOND / currentQPS); | ||
| } while (newIntervalMillis == intervalMillis); | ||
| intervalMillis = newIntervalMillis; | ||
| } | ||
| LOGGER.info("Processed {} queries, Total Query time: {} Total Client side time : {}.", | ||
| numQueries, totalQueryTime, totalClientTime); | ||
| fileReader.close(); | ||
| } | ||
|
|
||
| public static JSONObject runSingleQuery(PerfBenchmarkDriverConf conf, String query, int numRuns) | ||
| throws Exception { | ||
| PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf); | ||
| JSONObject response = null; | ||
|
|
||
| for (int i = 0; i < numRuns; i++) { | ||
| response = driver.postQuery(query); | ||
| private static void printUsage() { | ||
| System.out.println("Usage: QueryRunner"); | ||
|
||
| for (Field field : QueryRunner.class.getDeclaredFields()) { | ||
| if (field.isAnnotationPresent(Option.class)) { | ||
| Option option = field.getAnnotation(Option.class); | ||
| System.out.println(String.format("\t%-15s: %s (required=%s)", option.name(), option.usage(), option.required())); | ||
| } | ||
| } | ||
| return response; | ||
| } | ||
|
|
||
| /* | ||
| * USAGE: QueryRunner <queryFile> <brokerHost(optional. default=localhost)> <brokerPort (optional. | ||
| * default=8099)> <mode(optional.(single-threaded(optional)|multi-threaded))>) | ||
| */ | ||
|
|
||
| public static void main(String[] args) throws Exception { | ||
| public static void main(String[] args) | ||
| throws Exception { | ||
| QueryRunner queryRunner = new QueryRunner(); | ||
| CmdLineParser parser = new CmdLineParser(queryRunner); | ||
| parser.parseArgument(args); | ||
|
|
||
| String queryFile = null; | ||
| if (args.length >= 1) { | ||
| queryFile = args[0]; | ||
| } else { | ||
| System.out.println( | ||
| "QueryRunner <queryFile> <brokerHost(optional. default=localhost)> <brokerPort (optional. default=8099)> <mode(optional.(single-threaded(optional)|multi-threaded))>)"); | ||
| System.exit(1); | ||
| if (queryRunner._help) { | ||
| printUsage(); | ||
| return; | ||
| } | ||
|
|
||
| String brokerHost = null; | ||
| String brokerPort = null; | ||
| if (args.length >= 3) { | ||
| brokerHost = args[1]; | ||
| brokerPort = args[2]; | ||
| } | ||
| boolean multiThreaded = false; | ||
| if (args.length >= 4) { | ||
| if ("multi-threaded".equals(args[3])) { | ||
| multiThreaded = true; | ||
| } | ||
| } | ||
| PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf(); | ||
| // since its only to run queries, we should ensure no services get started | ||
| if (brokerHost != null) { | ||
| conf.setBrokerHost(brokerHost); | ||
| conf.setBrokerPort(Integer.parseInt(brokerPort)); | ||
| } | ||
| conf.setStartBroker(false); | ||
| conf.setBrokerHost(queryRunner._brokerHost); | ||
| conf.setBrokerPort(Integer.parseInt(queryRunner._brokerPort)); | ||
| conf.setRunQueries(true); | ||
| conf.setStartZookeeper(false); | ||
| conf.setStartController(false); | ||
| conf.setStartBroker(false); | ||
| conf.setStartServer(false); | ||
| conf.setStartZookeeper(false); | ||
| conf.setUploadIndexes(false); | ||
| conf.setRunQueries(true); | ||
| conf.setConfigureResources(false); | ||
| if (multiThreaded) { | ||
| multiThreadedQueryRunner(conf, queryFile); | ||
| } else { | ||
| singleThreadedQueryRunner(conf, queryFile); | ||
|
|
||
| switch (queryRunner._mode) { | ||
| case "singleThread": | ||
| singleThreadedQueryRunner(conf, queryRunner._queryFile); | ||
| break; | ||
| case "multiThreads": | ||
| if (queryRunner._numThreads <= 0) { | ||
| System.out.println("For multiThreads mode, need to specify a positive numThreads"); | ||
| printUsage(); | ||
| return; | ||
| } | ||
| multiThreadedsQueryRunner(conf, queryRunner._queryFile, queryRunner._numThreads); | ||
| break; | ||
| case "targetQPS": | ||
| if (queryRunner._numThreads <= 0) { | ||
| System.out.println("For targetQPS mode, need to specify a positive numThreads"); | ||
| printUsage(); | ||
| return; | ||
| } | ||
| if (queryRunner._startQPS <= 0) { | ||
| System.out.println("For targetQPS mode, need to specify a positive startQPS"); | ||
| printUsage(); | ||
| return; | ||
| } | ||
| if (queryRunner._deltaQPS <= 0) { | ||
| System.out.println("For targetQPS mode, need to specify a positive deltaQPS"); | ||
| printUsage(); | ||
| return; | ||
| } | ||
| targetQPSQueryRunner(conf, queryRunner._queryFile, queryRunner._numThreads, queryRunner._startQPS, | ||
| queryRunner._deltaQPS); | ||
| break; | ||
| default: | ||
| System.out.println("Invalid mode: " + queryRunner._mode); | ||
| printUsage(); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more detailed explanation describing the algorithm, eg if/when threads wait etc.