Skip to content
This repository was archived by the owner on Sep 16, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions appengine-managed-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4.1</version>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${jetty.version}</version>
</dependency>

<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,32 @@
import com.google.apphosting.api.UserServicePb.CreateLogoutURLResponse;
import com.google.apphosting.utils.remoteapi.RemoteApiPb;

import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.params.ConnManagerPNames;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.PoolingClientConnectionManager;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpContext;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.ssl.SslContextFactory;


import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -98,29 +97,46 @@ public class VmApiProxyDelegate implements ApiProxy.Delegate<VmApiProxyEnvironme
static final int ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS = 1000;

protected int defaultTimeoutMs;

protected final ExecutorService executor;

protected final HttpClient httpclient;

final IdleConnectionMonitorThread monitorThread;
//final IdleConnectionMonitorThread monitorThread;

private static ClientConnectionManager createConnectionManager() {
PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager();
connectionManager.setMaxTotal(VmApiProxyEnvironment.MAX_CONCURRENT_API_CALLS);
connectionManager.setDefaultMaxPerRoute(VmApiProxyEnvironment.MAX_CONCURRENT_API_CALLS);
return connectionManager;
public VmApiProxyDelegate() {
this(createHttpClient());
}

public VmApiProxyDelegate() {
this(new DefaultHttpClient(createConnectionManager()));
private static HttpClient createHttpClient() {
try {
HttpClientTransportOverHTTP transport = new HttpClientTransportOverHTTP();
// FIXME what sort of customize SslContextFactory we want to use here??
SslContextFactory sslContextFactory = new SslContextFactory();
HttpClient httpClient = new HttpClient( transport, sslContextFactory );
httpClient.setMaxConnectionsPerDestination( VmApiProxyEnvironment.MAX_CONCURRENT_API_CALLS );
httpClient.setConnectTimeout( ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS );
httpClient.setTCPNoDelay( true );
/*
HttpParams params = new BasicHttpParams();
params.setLongParameter(ConnManagerPNames.TIMEOUT,
timeoutMs + ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS);
params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,
timeoutMs + ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS);
*/
httpClient.start();
return httpClient;
} catch ( Exception e ) {
throw new RuntimeException( "cannot crete http client: " + e.getMessage(), e );
}
}

VmApiProxyDelegate(HttpClient httpclient) {
this.defaultTimeoutMs = DEFAULT_RPC_TIMEOUT_MS;
this.executor = Executors.newCachedThreadPool();
this.httpclient = httpclient;
this.monitorThread = new IdleConnectionMonitorThread(httpclient.getConnectionManager());
this.monitorThread.start();
this.executor = Executors.newCachedThreadPool();
//this.monitorThread = new IdleConnectionMonitorThread(httpclient.getConnectionManager());
//this.monitorThread.start();
}

@Override
Expand Down Expand Up @@ -263,22 +279,24 @@ protected byte[] runSyncCall(
String methodName,
byte[] requestData,
int timeoutMs) {
HttpPost request = createRequest(environment, packageName, methodName, requestData, timeoutMs);
try {
// Create a new http context for each call as the default context is not thread safe.
BasicHttpContext context = new BasicHttpContext();
HttpResponse response = httpclient.execute(request, context);

Request request =
createRequest(environment, packageName, methodName, requestData, timeoutMs, httpclient);

try {
ContentResponse contentResponse = request.send();
// Check for HTTP error status and return early.
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
int httpStatus = contentResponse.getStatus();
if (httpStatus != HttpStatus.OK_200) {
byte[] content = contentResponse.getContent();
try (Scanner errorStreamScanner =
new Scanner(new BufferedInputStream(response.getEntity().getContent()))) {
new Scanner(new BufferedInputStream(new ByteArrayInputStream(content)))) {
logger.warning("Error body: " + errorStreamScanner.useDelimiter("\\Z").next());
throw new RPCFailedStatusException(
packageName, methodName, response.getStatusLine().getStatusCode());
throw new RPCFailedStatusException( packageName, methodName, httpStatus);
}
}
try (BufferedInputStream bis = new BufferedInputStream(response.getEntity().getContent())) {
try (BufferedInputStream bis =
new BufferedInputStream(new ByteArrayInputStream( contentResponse.getContent()))) {
RemoteApiPb.Response remoteResponse = new RemoteApiPb.Response();
if (!remoteResponse.parseFrom(bis)) {
logger.warning(
Expand All @@ -296,8 +314,11 @@ protected byte[] runSyncCall(
logger.warning(
"HTTP ApiProxy I/O error for " + packageName + "." + methodName + ": " + e.getMessage());
throw constructApiException(packageName, methodName);
} finally {
request.releaseConnection();
} catch ( InterruptedException | TimeoutException | ExecutionException e ) {
logger.info(
"HTTP ApiProxy Client error for " + packageName + "." + methodName //
+ ": " + e.getMessage());
throw constructApiException(packageName, methodName);
}
}

Expand Down Expand Up @@ -360,48 +381,34 @@ RuntimeException constructApiException(String packageName, String methodName) {
* @param methodName The API call method
* @param requestData The POST payload.
* @param timeoutMs The timeout for this request
* @param client The {@link HttpClient} instance
* @return an HttpPost object to send to the API.
*/
//
static HttpPost createRequest(
VmApiProxyEnvironment environment,
String packageName,
String methodName,
byte[] requestData,
int timeoutMs) {
static Request createRequest( VmApiProxyEnvironment environment, String packageName,
String methodName, byte[] requestData, int timeoutMs,
HttpClient client) {
// Wrap the payload in a RemoteApi Request.
RemoteApiPb.Request remoteRequest = new RemoteApiPb.Request();
remoteRequest.setServiceName(packageName);
remoteRequest.setMethod(methodName);
remoteRequest.setRequestId(environment.getTicket());
remoteRequest.setRequestAsBytes(requestData);

HttpPost request = new HttpPost("http://" + environment.getServer() + REQUEST_ENDPOINT);
request.setHeader(RPC_STUB_ID_HEADER, REQUEST_STUB_ID);
request.setHeader(RPC_METHOD_HEADER, REQUEST_STUB_METHOD);
Request request = client.POST( "http://" + environment.getServer() + REQUEST_ENDPOINT );

// Set TCP connection timeouts.
HttpParams params = new BasicHttpParams();
params.setLongParameter(
ConnManagerPNames.TIMEOUT, timeoutMs + ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS);
params.setIntParameter(
CoreConnectionPNames.CONNECTION_TIMEOUT, timeoutMs + ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS);
params.setIntParameter(
CoreConnectionPNames.SO_TIMEOUT, timeoutMs + ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS);
request.header(RPC_STUB_ID_HEADER, REQUEST_STUB_ID);
request.header(RPC_METHOD_HEADER, REQUEST_STUB_METHOD);

// Performance tweaks.
params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, Boolean.TRUE);
params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, Boolean.FALSE);
request.setParams(params);

request.timeout( ADDITIONAL_HTTP_TIMEOUT_BUFFER_MS, TimeUnit.MILLISECONDS );

// The request deadline can be overwritten by the environment, read deadline if available.
Double deadline = (Double) (environment.getAttributes().get(API_DEADLINE_KEY));
if (deadline == null) {
request.setHeader(
RPC_DEADLINE_HEADER,
request.header(RPC_DEADLINE_HEADER,
Double.toString(TimeUnit.SECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS)));
} else {
request.setHeader(RPC_DEADLINE_HEADER, Double.toString(deadline));
request.header(RPC_DEADLINE_HEADER, Double.toString(deadline));
}

// If the incoming request has a dapper trace header: set it on outgoing API calls
Expand All @@ -411,7 +418,7 @@ static HttpPost createRequest(
.getAttributes()
.get(VmApiProxyEnvironment.AttributeMapping.DAPPER_ID.attributeKey);
if (dapperHeader instanceof String) {
request.setHeader(
request.header(
VmApiProxyEnvironment.AttributeMapping.DAPPER_ID.headerKey, (String) dapperHeader);
}

Expand All @@ -423,16 +430,14 @@ static HttpPost createRequest(
.getAttributes()
.get(VmApiProxyEnvironment.AttributeMapping.CLOUD_TRACE_CONTEXT.attributeKey);
if (traceHeader instanceof String) {
request.setHeader(
request.header(
VmApiProxyEnvironment.AttributeMapping.CLOUD_TRACE_CONTEXT.headerKey,
(String) traceHeader);
}

ByteArrayEntity postPayload =
new ByteArrayEntity(remoteRequest.toByteArray(), ContentType.APPLICATION_OCTET_STREAM);
postPayload.setChunked(false);
request.setEntity(postPayload);

BytesContentProvider bytesContentProvider =
new BytesContentProvider( remoteRequest.toByteArray() );
request.content( bytesContentProvider );
return request;
}

Expand Down Expand Up @@ -599,6 +604,7 @@ public List<Thread> getRequestThreads(VmApiProxyEnvironment environment) {
* Simple connection watchdog verifying that our connections are alive. Any stale connections are
* cleared as well.
*/
/*
class IdleConnectionMonitorThread extends Thread {

private final ClientConnectionManager connectionManager;
Expand All @@ -624,4 +630,5 @@ public void run() {
}
}
}
*/
}
Loading