Skip to content

Commit

Permalink
Merge pull request square#509 from square/jwilson_0202_targetbacklog
Browse files Browse the repository at this point in the history
Promote the target backlog to a parameter.
  • Loading branch information
Adrian Cole committed Feb 3, 2014
2 parents 8be6646 + 4313b7a commit b149fec
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public class Benchmark extends com.google.caliper.Benchmark {
@Param({ "1", "10" })
int concurrencyLevel;

/** How many requests to enqueue to await threads to execute them. */
@Param({ "10" })
int targetBacklog;

/** True to use TLS. */
// TODO: compare different ciphers?
@Param
Expand Down Expand Up @@ -129,7 +133,7 @@ public double run() throws Exception {
}

// The job queue is full. Take a break.
sleep(10);
sleep(1);
}

return best;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ enum Client {
}
},

OkHttpAsync {
@Override HttpClient create() {
return new OkHttpAsync();
}
},

Apache {
@Override HttpClient create() {
return new ApacheHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -43,8 +42,8 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslHandler;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
Expand All @@ -54,14 +53,17 @@ class NettyHttpClient implements HttpClient {
private static final boolean VERBOSE = false;

// Guarded by this. Real apps need more capable connection management.
private final List<HttpChannel> freeChannels = new ArrayList<HttpChannel>();
private int totalChannels = 0;
private final Deque<HttpChannel> freeChannels = new ArrayDeque<HttpChannel>();
private final Deque<URL> backlog = new ArrayDeque<URL>();

private int totalChannels = 0;
private int concurrencyLevel;
private int targetBacklog;
private Bootstrap bootstrap;

@Override public void prepare(final Benchmark benchmark) {
this.concurrencyLevel = benchmark.concurrencyLevel;
this.targetBacklog = benchmark.targetBacklog;

ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() {
@Override public void initChannel(SocketChannel channel) throws Exception {
Expand All @@ -80,38 +82,55 @@ class NettyHttpClient implements HttpClient {
}
};

EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
bootstrap.group(new NioEventLoopGroup(concurrencyLevel))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class)
.handler(channelInitializer);
}

@Override public void enqueue(URL url) throws Exception {
acquireChannel(url).sendRequest(url);
HttpChannel httpChannel = null;
synchronized (this) {
if (!freeChannels.isEmpty()) {
httpChannel = freeChannels.pop();
} else if (totalChannels < concurrencyLevel) {
totalChannels++; // Create a new channel. (outside of the synchronized block).
} else {
backlog.add(url); // Enqueue this for later, to be picked up when another request completes.
return;
}
}
if (httpChannel == null) {
Channel channel = bootstrap.connect(url.getHost(), Util.getEffectivePort(url))
.sync().channel();
httpChannel = (HttpChannel) channel.pipeline().last();
}
httpChannel.sendRequest(url);
}

@Override public synchronized boolean acceptingJobs() {
return backlog.size() < targetBacklog || hasFreeChannels();
}

private boolean hasFreeChannels() {
int activeChannels = totalChannels - freeChannels.size();
return activeChannels < concurrencyLevel;
}

private HttpChannel acquireChannel(URL url) throws InterruptedException {
private void release(HttpChannel httpChannel) {
URL url;
synchronized (this) {
if (!freeChannels.isEmpty()) {
return freeChannels.remove(freeChannels.size() - 1);
} else {
totalChannels++;
url = backlog.pop();
if (url == null) {
// There were no URLs in the backlog. Pool this channel for later.
freeChannels.push(httpChannel);
return;
}
}

Channel channel = bootstrap.connect(url.getHost(), Util.getEffectivePort(url)).sync().channel();
return (HttpChannel) channel.pipeline().last();
}

private synchronized void release(HttpChannel httpChannel) {
freeChannels.add(httpChannel);
// We removed a URL from the backlog. Schedule it right away.
httpChannel.sendRequest(url);
}

class HttpChannel extends SimpleChannelInboundHandler<HttpObject> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (C) 2014 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.squareup.okhttp.benchmarks;

import com.squareup.okhttp.Dispatcher;
import com.squareup.okhttp.Failure;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.internal.SslContextBuilder;
import java.io.IOException;
import java.net.URL;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocketFactory;

class OkHttpAsync implements HttpClient {
private static final boolean VERBOSE = false;

private final AtomicInteger requestsInFlight = new AtomicInteger();

private OkHttpClient client;
private Response.Receiver receiver;
private int concurrencyLevel;
private int targetBacklog;

@Override public void prepare(final Benchmark benchmark) {
concurrencyLevel = benchmark.concurrencyLevel;
targetBacklog = benchmark.targetBacklog;

client = new OkHttpClient();
client.setProtocols(benchmark.protocols);
client.setDispatcher(new Dispatcher(new ThreadPoolExecutor(benchmark.concurrencyLevel,
benchmark.concurrencyLevel, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>())));

if (benchmark.tls) {
SSLContext sslContext = SslContextBuilder.localhost();
SSLSocketFactory socketFactory = sslContext.getSocketFactory();
HostnameVerifier hostnameVerifier = new HostnameVerifier() {
@Override public boolean verify(String s, SSLSession session) {
return true;
}
};
client.setSslSocketFactory(socketFactory);
client.setHostnameVerifier(hostnameVerifier);
}

receiver = new Response.Receiver() {
@Override public void onFailure(Failure failure) {
System.out.println("Failed: " + failure.exception());
}

@Override public boolean onResponse(Response response) throws IOException {
Response.Body body = response.body();
long total = SynchronousHttpClient.readAllAndClose(body.byteStream());
long finish = System.nanoTime();
if (VERBOSE) {
long start = (Long) response.request().tag();
System.out.printf("Transferred % 8d bytes in %4d ms%n",
total, TimeUnit.NANOSECONDS.toMillis(finish - start));
}
requestsInFlight.decrementAndGet();
return true;
}
};
}

@Override public void enqueue(URL url) throws Exception {
requestsInFlight.incrementAndGet();
client.enqueue(new Request.Builder().tag(System.nanoTime()).url(url).build(), receiver);
}

@Override public synchronized boolean acceptingJobs() {
return requestsInFlight.get() < (concurrencyLevel + targetBacklog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@

/** Any HTTP client with a blocking API. */
abstract class SynchronousHttpClient implements HttpClient {
int targetBacklog = 10;
ThreadPoolExecutor executor;
int targetBacklog;

@Override public void prepare(Benchmark benchmark) {
this.targetBacklog = benchmark.targetBacklog;
executor = new ThreadPoolExecutor(benchmark.concurrencyLevel, benchmark.concurrencyLevel,
1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}
Expand All @@ -40,7 +41,7 @@ abstract class SynchronousHttpClient implements HttpClient {
return executor.getQueue().size() < targetBacklog;
}

protected long readAllAndClose(InputStream in) throws IOException {
static long readAllAndClose(InputStream in) throws IOException {
byte[] buffer = new byte[1024];
long total = 0;
for (int count; (count = in.read(buffer)) != -1; ) {
Expand Down

0 comments on commit b149fec

Please sign in to comment.